Module meshtastic.powermon.ppk2

Classes for logging power consumption of meshtastic devices.

Expand source code
"""Classes for logging power consumption of meshtastic devices."""

import logging
import threading
import time
from typing import Optional

from ppk2_api import ppk2_api  # type: ignore[import-untyped]

from .power_supply import PowerError, PowerSupply


class PPK2PowerSupply(PowerSupply):
    """Interface for talking with the NRF PPK2 high-resolution micro-power supply.
    Power Profiler Kit II is what you should google to find it for purchase.
    """

    def __init__(self, portName: Optional[str] = None):
        """Initialize the PowerSupply object.

        portName (str, optional): The port name of the power supply. Defaults to "/dev/ttyACM0".
        """
        if not portName:
            devs = ppk2_api.PPK2_API.list_devices()
            if not devs or len(devs) == 0:
                raise PowerError("No PPK2 devices found")
            elif len(devs) > 1:
                raise PowerError(
                    "Multiple PPK2 devices found, please specify the portName"
                )
            else:
                portName = devs[0]

        self.measuring = False
        self.current_max = 0
        self.current_min = 0
        self.current_sum = 0
        self.current_num_samples = 0
        self.current_average = 0

        # for tracking avera data read length (to determine if we are sleeping efficiently in measurement_loop)
        self.total_data_len = 0
        self.num_data_reads = 0
        self.max_data_len = 0

        # Normally we just sleep with a timeout on this condition (polling the power measurement data repeatedly)
        # but any time our measurements have been fully consumed (via reset_measurements) we notify() this condition
        # to trigger a new reading ASAP.
        self._want_measurement = threading.Condition()

        # To guard against a brief window while updating measured values
        self._result_lock = threading.Condition()

        self.r = r = ppk2_api.PPK2_API(
            portName
        )  # serial port will be different for you
        r.get_modifiers()

        self.measurement_thread = threading.Thread(
            target=self.measurement_loop, daemon=True, name="ppk2 measurement"
        )
        logging.info("Connected to Power Profiler Kit II (PPK2)")
        super().__init__()  # we call this late so that the port is already open and _getRawWattHour callback works

    def measurement_loop(self):
        """Endless measurement loop will run in a thread."""
        while self.measuring:
            with self._want_measurement:
                self._want_measurement.wait(
                    0.0001 if self.num_data_reads == 0 else 0.001
                )
                # normally we poll using this timeout, but sometimes
                # reset_measurement() will notify us to read immediately

                # always reads 4096 bytes, even if there is no new samples - or possibly the python single thread (because of global interpreter lock)
                # is always behind and thefore we are inherently dropping samples semi randomly!!!
                read_data = self.r.get_data()
                if read_data != b"":
                    samples, _ = self.r.get_samples(read_data)

                    # update invariants
                    if len(samples) > 0:
                        if self.current_num_samples == 0:
                            # First set of new reads, reset min/max
                            self.current_max = 0
                            self.current_min = samples[0]
                            # we need at least one sample to get an initial min

                        # The following operations could be expensive, so do outside of the lock
                        # FIXME - change all these lists into numpy arrays to use lots less CPU
                        self.current_max = max(self.current_max, max(samples))
                        self.current_min = min(self.current_min, min(samples))
                        latest_sum = sum(samples)
                        with self._result_lock:
                            self.current_sum += latest_sum
                            self.current_num_samples += len(samples)
                        # logging.debug(f"PPK2 data_len={len(read_data)}, sample_len={len(samples)}")

                self.num_data_reads += 1
                self.total_data_len += len(read_data)
                self.max_data_len = max(self.max_data_len, len(read_data))

    def get_min_current_mA(self):
        """Return the min current in mA."""
        return self.current_min / 1000

    def get_max_current_mA(self):
        """Return the max current in mA."""
        return self.current_max / 1000

    def get_average_current_mA(self):
        """Return the average current in mA."""
        with self._result_lock:
            if self.current_num_samples != 0:
                # If we have new samples, calculate a new average
                self.current_average = self.current_sum / self.current_num_samples

            # Even if we don't have new samples, return the last calculated average
            # measurements are in microamperes, divide by 1000
            return self.current_average / 1000

    def reset_measurements(self):
        """Reset current measurements."""
        # Use the last reading as the new only reading (to ensure we always have a valid current reading)
        self.current_sum = 0
        self.current_num_samples = 0

        # if self.num_data_reads:
        #    logging.debug(f"max data len = {self.max_data_len},avg {self.total_data_len/self.num_data_reads}, num reads={self.num_data_reads}")
        # Summary stats for performance monitoring
        self.num_data_reads = 0
        self.total_data_len = 0
        self.max_data_len = 0

        with self._want_measurement:
            self._want_measurement.notify()  # notify the measurement loop to read immediately

    def close(self) -> None:
        """Close the power meter."""
        self.measuring = False
        self.r.stop_measuring()  # send command to ppk2
        self.measurement_thread.join()  # wait for our thread to finish
        super().close()

    def setIsSupply(self, is_supply: bool):
        """If in supply mode we will provide power ourself, otherwise we are just an amp meter."""

        assert self.v > 0.8  # We must set a valid voltage before calling this method

        self.r.set_source_voltage(
            int(self.v * 1000)
        )  # set source voltage in mV BEFORE setting source mode
        # Note: source voltage must be set even if we are using the amp meter mode

        # must be after setting source voltage and before setting mode
        self.r.start_measuring()  # send command to ppk2

        if (
            not is_supply
        ):  # min power outpuf of PPK2.  If less than this assume we want just meter mode.
            self.r.use_ampere_meter()
        else:
            self.r.use_source_meter()  # set source meter mode

        if not self.measurement_thread.is_alive():
            self.measuring = True
            self.reset_measurements()

            # We can't start reading from the thread until vdd is set, so start running the thread now
            self.measurement_thread.start()
            time.sleep(
                0.2
            )  # FIXME - crufty way to ensure we do one set of reads to discard bogus fake power readings in the FIFO
            self.reset_measurements()

    def powerOn(self):
        """Power on the supply."""
        self.r.toggle_DUT_power("ON")

    def powerOff(self):
        """Power off the supply."""
        self.r.toggle_DUT_power("OFF")

Classes

class PPK2PowerSupply (portName: Optional[str] = None)

Interface for talking with the NRF PPK2 high-resolution micro-power supply. Power Profiler Kit II is what you should google to find it for purchase.

Initialize the PowerSupply object.

portName (str, optional): The port name of the power supply. Defaults to "/dev/ttyACM0".

Expand source code
class PPK2PowerSupply(PowerSupply):
    """Interface for talking with the NRF PPK2 high-resolution micro-power supply.
    Power Profiler Kit II is what you should google to find it for purchase.
    """

    def __init__(self, portName: Optional[str] = None):
        """Initialize the PowerSupply object.

        portName (str, optional): The port name of the power supply. Defaults to "/dev/ttyACM0".
        """
        if not portName:
            devs = ppk2_api.PPK2_API.list_devices()
            if not devs or len(devs) == 0:
                raise PowerError("No PPK2 devices found")
            elif len(devs) > 1:
                raise PowerError(
                    "Multiple PPK2 devices found, please specify the portName"
                )
            else:
                portName = devs[0]

        self.measuring = False
        self.current_max = 0
        self.current_min = 0
        self.current_sum = 0
        self.current_num_samples = 0
        self.current_average = 0

        # for tracking avera data read length (to determine if we are sleeping efficiently in measurement_loop)
        self.total_data_len = 0
        self.num_data_reads = 0
        self.max_data_len = 0

        # Normally we just sleep with a timeout on this condition (polling the power measurement data repeatedly)
        # but any time our measurements have been fully consumed (via reset_measurements) we notify() this condition
        # to trigger a new reading ASAP.
        self._want_measurement = threading.Condition()

        # To guard against a brief window while updating measured values
        self._result_lock = threading.Condition()

        self.r = r = ppk2_api.PPK2_API(
            portName
        )  # serial port will be different for you
        r.get_modifiers()

        self.measurement_thread = threading.Thread(
            target=self.measurement_loop, daemon=True, name="ppk2 measurement"
        )
        logging.info("Connected to Power Profiler Kit II (PPK2)")
        super().__init__()  # we call this late so that the port is already open and _getRawWattHour callback works

    def measurement_loop(self):
        """Endless measurement loop will run in a thread."""
        while self.measuring:
            with self._want_measurement:
                self._want_measurement.wait(
                    0.0001 if self.num_data_reads == 0 else 0.001
                )
                # normally we poll using this timeout, but sometimes
                # reset_measurement() will notify us to read immediately

                # always reads 4096 bytes, even if there is no new samples - or possibly the python single thread (because of global interpreter lock)
                # is always behind and thefore we are inherently dropping samples semi randomly!!!
                read_data = self.r.get_data()
                if read_data != b"":
                    samples, _ = self.r.get_samples(read_data)

                    # update invariants
                    if len(samples) > 0:
                        if self.current_num_samples == 0:
                            # First set of new reads, reset min/max
                            self.current_max = 0
                            self.current_min = samples[0]
                            # we need at least one sample to get an initial min

                        # The following operations could be expensive, so do outside of the lock
                        # FIXME - change all these lists into numpy arrays to use lots less CPU
                        self.current_max = max(self.current_max, max(samples))
                        self.current_min = min(self.current_min, min(samples))
                        latest_sum = sum(samples)
                        with self._result_lock:
                            self.current_sum += latest_sum
                            self.current_num_samples += len(samples)
                        # logging.debug(f"PPK2 data_len={len(read_data)}, sample_len={len(samples)}")

                self.num_data_reads += 1
                self.total_data_len += len(read_data)
                self.max_data_len = max(self.max_data_len, len(read_data))

    def get_min_current_mA(self):
        """Return the min current in mA."""
        return self.current_min / 1000

    def get_max_current_mA(self):
        """Return the max current in mA."""
        return self.current_max / 1000

    def get_average_current_mA(self):
        """Return the average current in mA."""
        with self._result_lock:
            if self.current_num_samples != 0:
                # If we have new samples, calculate a new average
                self.current_average = self.current_sum / self.current_num_samples

            # Even if we don't have new samples, return the last calculated average
            # measurements are in microamperes, divide by 1000
            return self.current_average / 1000

    def reset_measurements(self):
        """Reset current measurements."""
        # Use the last reading as the new only reading (to ensure we always have a valid current reading)
        self.current_sum = 0
        self.current_num_samples = 0

        # if self.num_data_reads:
        #    logging.debug(f"max data len = {self.max_data_len},avg {self.total_data_len/self.num_data_reads}, num reads={self.num_data_reads}")
        # Summary stats for performance monitoring
        self.num_data_reads = 0
        self.total_data_len = 0
        self.max_data_len = 0

        with self._want_measurement:
            self._want_measurement.notify()  # notify the measurement loop to read immediately

    def close(self) -> None:
        """Close the power meter."""
        self.measuring = False
        self.r.stop_measuring()  # send command to ppk2
        self.measurement_thread.join()  # wait for our thread to finish
        super().close()

    def setIsSupply(self, is_supply: bool):
        """If in supply mode we will provide power ourself, otherwise we are just an amp meter."""

        assert self.v > 0.8  # We must set a valid voltage before calling this method

        self.r.set_source_voltage(
            int(self.v * 1000)
        )  # set source voltage in mV BEFORE setting source mode
        # Note: source voltage must be set even if we are using the amp meter mode

        # must be after setting source voltage and before setting mode
        self.r.start_measuring()  # send command to ppk2

        if (
            not is_supply
        ):  # min power outpuf of PPK2.  If less than this assume we want just meter mode.
            self.r.use_ampere_meter()
        else:
            self.r.use_source_meter()  # set source meter mode

        if not self.measurement_thread.is_alive():
            self.measuring = True
            self.reset_measurements()

            # We can't start reading from the thread until vdd is set, so start running the thread now
            self.measurement_thread.start()
            time.sleep(
                0.2
            )  # FIXME - crufty way to ensure we do one set of reads to discard bogus fake power readings in the FIFO
            self.reset_measurements()

    def powerOn(self):
        """Power on the supply."""
        self.r.toggle_DUT_power("ON")

    def powerOff(self):
        """Power off the supply."""
        self.r.toggle_DUT_power("OFF")

Ancestors

Methods

def get_average_current_mA(self)

Return the average current in mA.

Expand source code
def get_average_current_mA(self):
    """Return the average current in mA."""
    with self._result_lock:
        if self.current_num_samples != 0:
            # If we have new samples, calculate a new average
            self.current_average = self.current_sum / self.current_num_samples

        # Even if we don't have new samples, return the last calculated average
        # measurements are in microamperes, divide by 1000
        return self.current_average / 1000
def get_max_current_mA(self)

Return the max current in mA.

Expand source code
def get_max_current_mA(self):
    """Return the max current in mA."""
    return self.current_max / 1000
def get_min_current_mA(self)

Return the min current in mA.

Expand source code
def get_min_current_mA(self):
    """Return the min current in mA."""
    return self.current_min / 1000
def measurement_loop(self)

Endless measurement loop will run in a thread.

Expand source code
def measurement_loop(self):
    """Endless measurement loop will run in a thread."""
    while self.measuring:
        with self._want_measurement:
            self._want_measurement.wait(
                0.0001 if self.num_data_reads == 0 else 0.001
            )
            # normally we poll using this timeout, but sometimes
            # reset_measurement() will notify us to read immediately

            # always reads 4096 bytes, even if there is no new samples - or possibly the python single thread (because of global interpreter lock)
            # is always behind and thefore we are inherently dropping samples semi randomly!!!
            read_data = self.r.get_data()
            if read_data != b"":
                samples, _ = self.r.get_samples(read_data)

                # update invariants
                if len(samples) > 0:
                    if self.current_num_samples == 0:
                        # First set of new reads, reset min/max
                        self.current_max = 0
                        self.current_min = samples[0]
                        # we need at least one sample to get an initial min

                    # The following operations could be expensive, so do outside of the lock
                    # FIXME - change all these lists into numpy arrays to use lots less CPU
                    self.current_max = max(self.current_max, max(samples))
                    self.current_min = min(self.current_min, min(samples))
                    latest_sum = sum(samples)
                    with self._result_lock:
                        self.current_sum += latest_sum
                        self.current_num_samples += len(samples)
                    # logging.debug(f"PPK2 data_len={len(read_data)}, sample_len={len(samples)}")

            self.num_data_reads += 1
            self.total_data_len += len(read_data)
            self.max_data_len = max(self.max_data_len, len(read_data))
def powerOff(self)

Power off the supply.

Expand source code
def powerOff(self):
    """Power off the supply."""
    self.r.toggle_DUT_power("OFF")
def powerOn(self)

Power on the supply.

Expand source code
def powerOn(self):
    """Power on the supply."""
    self.r.toggle_DUT_power("ON")
def setIsSupply(self, is_supply: bool)

If in supply mode we will provide power ourself, otherwise we are just an amp meter.

Expand source code
def setIsSupply(self, is_supply: bool):
    """If in supply mode we will provide power ourself, otherwise we are just an amp meter."""

    assert self.v > 0.8  # We must set a valid voltage before calling this method

    self.r.set_source_voltage(
        int(self.v * 1000)
    )  # set source voltage in mV BEFORE setting source mode
    # Note: source voltage must be set even if we are using the amp meter mode

    # must be after setting source voltage and before setting mode
    self.r.start_measuring()  # send command to ppk2

    if (
        not is_supply
    ):  # min power outpuf of PPK2.  If less than this assume we want just meter mode.
        self.r.use_ampere_meter()
    else:
        self.r.use_source_meter()  # set source meter mode

    if not self.measurement_thread.is_alive():
        self.measuring = True
        self.reset_measurements()

        # We can't start reading from the thread until vdd is set, so start running the thread now
        self.measurement_thread.start()
        time.sleep(
            0.2
        )  # FIXME - crufty way to ensure we do one set of reads to discard bogus fake power readings in the FIFO
        self.reset_measurements()

Inherited members