Module meshtastic.node

Node class

Expand source code
"""Node class
"""

import base64
import logging
import time

from typing import Optional, Union, List

from meshtastic.protobuf import admin_pb2, apponly_pb2, channel_pb2, localonly_pb2, mesh_pb2, portnums_pb2
from meshtastic.util import (
    Timeout,
    camel_to_snake,
    fromPSK,
    our_exit,
    pskToString,
    stripnl,
    message_to_json,
)


class Node:
    """A model of a (local or remote) node in the mesh

    Includes methods for localConfig, moduleConfig and channels
    """

    def __init__(self, iface, nodeNum, noProto=False, timeout: int = 300):
        """Constructor"""
        self.iface = iface
        self.nodeNum = nodeNum
        self.localConfig = localonly_pb2.LocalConfig()
        self.moduleConfig = localonly_pb2.LocalModuleConfig()
        self.channels = None
        self._timeout = Timeout(maxSecs=timeout)
        self.partialChannels: Optional[List] = None
        self.noProto = noProto
        self.cannedPluginMessage = None
        self.cannedPluginMessageMessages = None
        self.ringtone = None
        self.ringtonePart = None

        self.gotResponse = None

    def showChannels(self):
        """Show human readable description of our channels."""
        print("Channels:")
        if self.channels:
            logging.debug(f"self.channels:{self.channels}")
            for c in self.channels:
                cStr = message_to_json(c.settings)
                # don't show disabled channels
                if channel_pb2.Channel.Role.Name(c.role) != "DISABLED":
                    print(
                        f"  Index {c.index}: {channel_pb2.Channel.Role.Name(c.role)} psk={pskToString(c.settings.psk)} {cStr}"
                    )
        publicURL = self.getURL(includeAll=False)
        adminURL = self.getURL(includeAll=True)
        print(f"\nPrimary channel URL: {publicURL}")
        if adminURL != publicURL:
            print(f"Complete URL (includes all channels): {adminURL}")

    def showInfo(self):
        """Show human readable description of our node"""
        prefs = ""
        if self.localConfig:
            prefs = message_to_json(self.localConfig, multiline=True)
        print(f"Preferences: {prefs}\n")
        prefs = ""
        if self.moduleConfig:
            prefs = message_to_json(self.moduleConfig, multiline=True)
        print(f"Module preferences: {prefs}\n")
        self.showChannels()

    def setChannels(self, channels):
        """Set the channels for this node"""
        self.channels = channels
        self._fixupChannels()

    def requestChannels(self, startingIndex: int = 0):
        """Send regular MeshPackets to ask channels."""
        logging.debug(f"requestChannels for nodeNum:{self.nodeNum}")
        # only initialize if we're starting out fresh
        if startingIndex == 0:
            self.channels = None
            self.partialChannels = []  # We keep our channels in a temp array until finished
        self._requestChannel(startingIndex)

    def onResponseRequestSettings(self, p):
        """Handle the response packets for requesting settings _requestSettings()"""
        logging.debug(f"onResponseRequestSetting() p:{p}")
        config_values = None
        if "routing" in p["decoded"]:
            if p["decoded"]["routing"]["errorReason"] != "NONE":
                print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}')
                self.iface._acknowledgment.receivedNak = True
        else:
            self.iface._acknowledgment.receivedAck = True
            print("")
            adminMessage = p["decoded"]["admin"]
            if "getConfigResponse" in adminMessage:
                oneof = "get_config_response"
                resp = adminMessage["getConfigResponse"]
                field = list(resp.keys())[0]
                config_type = self.localConfig.DESCRIPTOR.fields_by_name.get(
                    camel_to_snake(field)
                )
                if config_type is not None:
                    config_values = getattr(self.localConfig, config_type.name)
            elif "getModuleConfigResponse" in adminMessage:
                oneof = "get_module_config_response"
                resp = adminMessage["getModuleConfigResponse"]
                field = list(resp.keys())[0]
                config_type = self.moduleConfig.DESCRIPTOR.fields_by_name.get(
                    camel_to_snake(field)
                )
                config_values = getattr(self.moduleConfig, config_type.name)
            else:
                print(
                    "Did not receive a valid response. Make sure to have a shared channel named 'admin'."
                )
                return
            if config_values is not None:
                raw_config = getattr(getattr(adminMessage['raw'], oneof), camel_to_snake(field))
                config_values.CopyFrom(raw_config)
                print(f"{str(camel_to_snake(field))}:\n{str(config_values)}")

    def requestConfig(self, configType):
        """Request the config from the node via admin message"""
        if self == self.iface.localNode:
            onResponse = None
        else:
            onResponse = self.onResponseRequestSettings
            print("Requesting current config from remote node (this can take a while).")
        p = admin_pb2.AdminMessage()
        if isinstance(configType, int):
            p.get_config_request = configType

        else:
            msgIndex = configType.index
            if configType.containing_type.name == "LocalConfig":
                p.get_config_request = msgIndex
            else:
                p.get_module_config_request = msgIndex

        self._sendAdmin(p, wantResponse=True, onResponse=onResponse)
        if onResponse:
            self.iface.waitForAckNak()

    def turnOffEncryptionOnPrimaryChannel(self):
        """Turn off encryption on primary channel."""
        self.channels[0].settings.psk = fromPSK("none")
        print("Writing modified channels to device")
        self.writeChannel(0)

    def waitForConfig(self, attribute="channels"):
        """Block until radio config is received. Returns True if config has been received."""
        return self._timeout.waitForSet(self, attrs=("localConfig", attribute))

    def writeConfig(self, config_name):
        """Write the current (edited) localConfig to the device"""
        if self.localConfig is None:
            our_exit("Error: No localConfig has been read")

        p = admin_pb2.AdminMessage()

        if config_name == "device":
            p.set_config.device.CopyFrom(self.localConfig.device)
        elif config_name == "position":
            p.set_config.position.CopyFrom(self.localConfig.position)
        elif config_name == "power":
            p.set_config.power.CopyFrom(self.localConfig.power)
        elif config_name == "network":
            p.set_config.network.CopyFrom(self.localConfig.network)
        elif config_name == "display":
            p.set_config.display.CopyFrom(self.localConfig.display)
        elif config_name == "lora":
            p.set_config.lora.CopyFrom(self.localConfig.lora)
        elif config_name == "bluetooth":
            p.set_config.bluetooth.CopyFrom(self.localConfig.bluetooth)
        elif config_name == "security":
            p.set_config.security.CopyFrom(self.localConfig.security)
        elif config_name == "mqtt":
            p.set_module_config.mqtt.CopyFrom(self.moduleConfig.mqtt)
        elif config_name == "serial":
            p.set_module_config.serial.CopyFrom(self.moduleConfig.serial)
        elif config_name == "external_notification":
            p.set_module_config.external_notification.CopyFrom(
                self.moduleConfig.external_notification
            )
        elif config_name == "store_forward":
            p.set_module_config.store_forward.CopyFrom(self.moduleConfig.store_forward)
        elif config_name == "range_test":
            p.set_module_config.range_test.CopyFrom(self.moduleConfig.range_test)
        elif config_name == "telemetry":
            p.set_module_config.telemetry.CopyFrom(self.moduleConfig.telemetry)
        elif config_name == "canned_message":
            p.set_module_config.canned_message.CopyFrom(
                self.moduleConfig.canned_message
            )
        elif config_name == "audio":
            p.set_module_config.audio.CopyFrom(self.moduleConfig.audio)
        elif config_name == "remote_hardware":
            p.set_module_config.remote_hardware.CopyFrom(
                self.moduleConfig.remote_hardware
            )
        elif config_name == "neighbor_info":
            p.set_module_config.neighbor_info.CopyFrom(self.moduleConfig.neighbor_info)
        elif config_name == "detection_sensor":
            p.set_module_config.detection_sensor.CopyFrom(self.moduleConfig.detection_sensor)
        elif config_name == "ambient_lighting":
            p.set_module_config.ambient_lighting.CopyFrom(self.moduleConfig.ambient_lighting)
        elif config_name == "paxcounter":
            p.set_module_config.paxcounter.CopyFrom(self.moduleConfig.paxcounter)
        else:
            our_exit(f"Error: No valid config with name {config_name}")

        logging.debug(f"Wrote: {config_name}")
        if self == self.iface.localNode:
            onResponse = None
        else:
            onResponse = self.onAckNak
        self._sendAdmin(p, onResponse=onResponse)

    def writeChannel(self, channelIndex, adminIndex=0):
        """Write the current (edited) channel to the device"""
        self.ensureSessionKey()
        p = admin_pb2.AdminMessage()
        p.set_channel.CopyFrom(self.channels[channelIndex])
        self._sendAdmin(p, adminIndex=adminIndex)
        logging.debug(f"Wrote channel {channelIndex}")

    def getChannelByChannelIndex(self, channelIndex):
        """Get channel by channelIndex
        channelIndex: number, typically 0-7; based on max number channels
        returns: None if there is no channel found
        """
        ch = None
        if self.channels and 0 <= channelIndex < len(self.channels):
            ch = self.channels[channelIndex]
        return ch

    def deleteChannel(self, channelIndex):
        """Delete the specified channelIndex and shift other channels up"""
        ch = self.channels[channelIndex]
        if ch.role not in (
            channel_pb2.Channel.Role.SECONDARY,
            channel_pb2.Channel.Role.DISABLED,
        ):
            our_exit("Warning: Only SECONDARY channels can be deleted")

        # we are careful here because if we move the "admin" channel the channelIndex we need to use
        # for sending admin channels will also change
        adminIndex = self.iface.localNode._getAdminChannelIndex()

        self.channels.pop(channelIndex)
        self._fixupChannels()  # expand back to 8 channels

        index = channelIndex
        while index < 8:
            self.writeChannel(index, adminIndex=adminIndex)
            index += 1

            # if we are updating the local node, we might end up
            # *moving* the admin channel index as we are writing
            if (self.iface.localNode == self) and index >= adminIndex:
                # We've now passed the old location for admin index
                # (and written it), so we can start finding it by name again
                adminIndex = 0

    def getChannelByName(self, name):
        """Try to find the named channel or return None"""
        for c in self.channels or []:
            if c.settings and c.settings.name == name:
                return c
        return None

    def getDisabledChannel(self):
        """Return the first channel that is disabled (i.e. available for some new use)"""
        for c in self.channels:
            if c.role == channel_pb2.Channel.Role.DISABLED:
                return c
        return None

    def _getAdminChannelIndex(self):
        """Return the channel number of the admin channel, or 0 if no reserved channel"""
        for c in self.channels or []:
            if c.settings and c.settings.name.lower() == "admin":
                return c.index
        return 0

    def setOwner(self, long_name: Optional[str]=None, short_name: Optional[str]=None, is_licensed: bool=False):
        """Set device owner name"""
        logging.debug(f"in setOwner nodeNum:{self.nodeNum}")
        self.ensureSessionKey()
        p = admin_pb2.AdminMessage()

        nChars = 4
        if long_name is not None:
            long_name = long_name.strip()
            p.set_owner.long_name = long_name
            p.set_owner.is_licensed = is_licensed
        if short_name is not None:
            short_name = short_name.strip()
            if len(short_name) > nChars:
                short_name = short_name[:nChars]
                print(f"Maximum is 4 characters, truncated to {short_name}")
            p.set_owner.short_name = short_name

        # Note: These debug lines are used in unit tests
        logging.debug(f"p.set_owner.long_name:{p.set_owner.long_name}:")
        logging.debug(f"p.set_owner.short_name:{p.set_owner.short_name}:")
        logging.debug(f"p.set_owner.is_licensed:{p.set_owner.is_licensed}")
        # If sending to a remote node, wait for ACK/NAK
        if self == self.iface.localNode:
            onResponse = None
        else:
            onResponse = self.onAckNak
        return self._sendAdmin(p, onResponse=onResponse)

    def getURL(self, includeAll: bool = True):
        """The sharable URL that describes the current channel"""
        # Only keep the primary/secondary channels, assume primary is first
        channelSet = apponly_pb2.ChannelSet()
        if self.channels:
            for c in self.channels:
                if c.role == channel_pb2.Channel.Role.PRIMARY or (
                    includeAll and c.role == channel_pb2.Channel.Role.SECONDARY
                ):
                    channelSet.settings.append(c.settings)

        if len(self.localConfig.ListFields()) == 0:
            self.requestConfig(self.localConfig.DESCRIPTOR.fields_by_name.get('lora'))
        channelSet.lora_config.CopyFrom(self.localConfig.lora)
        some_bytes = channelSet.SerializeToString()
        s = base64.urlsafe_b64encode(some_bytes).decode("ascii")
        s = s.replace("=", "").replace("+", "-").replace("/", "_")
        return f"https://meshtastic.org/e/#{s}"

    def setURL(self, url):
        """Set mesh network URL"""
        if self.localConfig is None:
            our_exit("Warning: No Config has been read")

        # URLs are of the form https://meshtastic.org/d/#{base64_channel_set}
        # Split on '/#' to find the base64 encoded channel settings
        splitURL = url.split("/#")
        b64 = splitURL[-1]

        # We normally strip padding to make for a shorter URL, but the python parser doesn't like
        # that.  So add back any missing padding
        # per https://stackoverflow.com/a/9807138
        missing_padding = len(b64) % 4
        if missing_padding:
            b64 += "=" * (4 - missing_padding)

        decodedURL = base64.urlsafe_b64decode(b64)
        channelSet = apponly_pb2.ChannelSet()
        channelSet.ParseFromString(decodedURL)

        if len(channelSet.settings) == 0:
            our_exit("Warning: There were no settings.")

        i = 0
        for chs in channelSet.settings:
            ch = channel_pb2.Channel()
            ch.role = (
                channel_pb2.Channel.Role.PRIMARY
                if i == 0
                else channel_pb2.Channel.Role.SECONDARY
            )
            ch.index = i
            ch.settings.CopyFrom(chs)
            self.channels[ch.index] = ch
            logging.debug(f"Channel i:{i} ch:{ch}")
            self.writeChannel(ch.index)
            i = i + 1

        p = admin_pb2.AdminMessage()
        p.set_config.lora.CopyFrom(channelSet.lora_config)
        self.ensureSessionKey()
        self._sendAdmin(p)

    def onResponseRequestRingtone(self, p):
        """Handle the response packet for requesting ringtone part 1"""
        logging.debug(f"onResponseRequestRingtone() p:{p}")
        errorFound = False
        if "routing" in p["decoded"]:
            if p["decoded"]["routing"]["errorReason"] != "NONE":
                errorFound = True
                print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}')
        if errorFound is False:
            if "decoded" in p:
                if "admin" in p["decoded"]:
                    if "raw" in p["decoded"]["admin"]:
                        self.ringtonePart = p["decoded"]["admin"][
                            "raw"
                        ].get_ringtone_response
                        logging.debug(f"self.ringtonePart:{self.ringtonePart}")
                        self.gotResponse = True

    def get_ringtone(self):
        """Get the ringtone. Concatenate all pieces together and return a single string."""
        logging.debug(f"in get_ringtone()")
        if not self.ringtone:
            p1 = admin_pb2.AdminMessage()
            p1.get_ringtone_request = True
            self.gotResponse = False
            self._sendAdmin(
                p1, wantResponse=True, onResponse=self.onResponseRequestRingtone
            )
            while self.gotResponse is False:
                time.sleep(0.1)

            logging.debug(f"self.ringtone:{self.ringtone}")

            self.ringtone = ""
            if self.ringtonePart:
                self.ringtone += self.ringtonePart

        print(f"ringtone:{self.ringtone}")
        logging.debug(f"ringtone:{self.ringtone}")
        return self.ringtone

    def set_ringtone(self, ringtone):
        """Set the ringtone. The ringtone length must be less than 230 character."""

        if len(ringtone) > 230:
            our_exit("Warning: The ringtone must be less than 230 characters.")
        self.ensureSessionKey()
        # split into chunks
        chunks = []
        chunks_size = 230
        for i in range(0, len(ringtone), chunks_size):
            chunks.append(ringtone[i : i + chunks_size])

        # for each chunk, send a message to set the values
        # for i in range(0, len(chunks)):
        for i, chunk in enumerate(chunks):
            p = admin_pb2.AdminMessage()

            # TODO: should be a way to improve this
            if i == 0:
                p.set_ringtone_message = chunk

            logging.debug(f"Setting ringtone '{chunk}' part {i+1}")
            # If sending to a remote node, wait for ACK/NAK
            if self == self.iface.localNode:
                onResponse = None
            else:
                onResponse = self.onAckNak
            return self._sendAdmin(p, onResponse=onResponse)

    def onResponseRequestCannedMessagePluginMessageMessages(self, p):
        """Handle the response packet for requesting canned message plugin message part 1"""
        logging.debug(f"onResponseRequestCannedMessagePluginMessageMessages() p:{p}")
        errorFound = False
        if "routing" in p["decoded"]:
            if p["decoded"]["routing"]["errorReason"] != "NONE":
                errorFound = True
                print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}')
        if errorFound is False:
            if "decoded" in p:
                if "admin" in p["decoded"]:
                    if "raw" in p["decoded"]["admin"]:
                        self.cannedPluginMessageMessages = p["decoded"]["admin"][
                            "raw"
                        ].get_canned_message_module_messages_response
                        logging.debug(
                            f"self.cannedPluginMessageMessages:{self.cannedPluginMessageMessages}"
                        )
                        self.gotResponse = True

    def get_canned_message(self):
        """Get the canned message string. Concatenate all pieces together and return a single string."""
        logging.debug(f"in get_canned_message()")
        if not self.cannedPluginMessage:
            p1 = admin_pb2.AdminMessage()
            p1.get_canned_message_module_messages_request = True
            self.gotResponse = False
            self._sendAdmin(
                p1,
                wantResponse=True,
                onResponse=self.onResponseRequestCannedMessagePluginMessageMessages,
            )
            while self.gotResponse is False:
                time.sleep(0.1)

            logging.debug(
                f"self.cannedPluginMessageMessages:{self.cannedPluginMessageMessages}"
            )

            self.cannedPluginMessage = ""
            if self.cannedPluginMessageMessages:
                self.cannedPluginMessage += self.cannedPluginMessageMessages

        print(f"canned_plugin_message:{self.cannedPluginMessage}")
        logging.debug(f"canned_plugin_message:{self.cannedPluginMessage}")
        return self.cannedPluginMessage

    def set_canned_message(self, message):
        """Set the canned message. The canned messages length must be less than 200 character."""

        if len(message) > 200:
            our_exit("Warning: The canned message must be less than 200 characters.")
        self.ensureSessionKey()
        # split into chunks
        chunks = []
        chunks_size = 200
        for i in range(0, len(message), chunks_size):
            chunks.append(message[i : i + chunks_size])

        # for each chunk, send a message to set the values
        # for i in range(0, len(chunks)):
        for i, chunk in enumerate(chunks):
            p = admin_pb2.AdminMessage()

            # TODO: should be a way to improve this
            if i == 0:
                p.set_canned_message_module_messages = chunk

            logging.debug(f"Setting canned message '{chunk}' part {i+1}")
            # If sending to a remote node, wait for ACK/NAK
            if self == self.iface.localNode:
                onResponse = None
            else:
                onResponse = self.onAckNak
            return self._sendAdmin(p, onResponse=onResponse)

    def exitSimulator(self):
        """Tell a simulator node to exit (this message
        is ignored for other nodes)"""
        self.ensureSessionKey()
        p = admin_pb2.AdminMessage()
        p.exit_simulator = True
        logging.debug("in exitSimulator()")

        return self._sendAdmin(p)

    def reboot(self, secs: int = 10):
        """Tell the node to reboot."""
        self.ensureSessionKey()
        p = admin_pb2.AdminMessage()
        p.reboot_seconds = secs
        logging.info(f"Telling node to reboot in {secs} seconds")

        # If sending to a remote node, wait for ACK/NAK
        if self == self.iface.localNode:
            onResponse = None
        else:
            onResponse = self.onAckNak
        return self._sendAdmin(p, onResponse=onResponse)

    def beginSettingsTransaction(self):
        """Tell the node to open a transaction to edit settings."""
        self.ensureSessionKey()
        p = admin_pb2.AdminMessage()
        p.begin_edit_settings = True
        logging.info(f"Telling open a transaction to edit settings")

        # If sending to a remote node, wait for ACK/NAK
        if self == self.iface.localNode:
            onResponse = None
        else:
            onResponse = self.onAckNak
        return self._sendAdmin(p, onResponse=onResponse)

    def commitSettingsTransaction(self):
        """Tell the node to commit the open transaction for editing settings."""
        self.ensureSessionKey()
        p = admin_pb2.AdminMessage()
        p.commit_edit_settings = True
        logging.info(f"Telling node to commit open transaction for editing settings")

        # If sending to a remote node, wait for ACK/NAK
        if self == self.iface.localNode:
            onResponse = None
        else:
            onResponse = self.onAckNak
        return self._sendAdmin(p, onResponse=onResponse)

    def rebootOTA(self, secs: int = 10):
        """Tell the node to reboot into factory firmware."""
        self.ensureSessionKey()
        p = admin_pb2.AdminMessage()
        p.reboot_ota_seconds = secs
        logging.info(f"Telling node to reboot to OTA in {secs} seconds")

        # If sending to a remote node, wait for ACK/NAK
        if self == self.iface.localNode:
            onResponse = None
        else:
            onResponse = self.onAckNak
        return self._sendAdmin(p, onResponse=onResponse)

    def enterDFUMode(self):
        """Tell the node to enter DFU mode (NRF52)."""
        self.ensureSessionKey()
        p = admin_pb2.AdminMessage()
        p.enter_dfu_mode_request = True
        logging.info(f"Telling node to enable DFU mode")

        # If sending to a remote node, wait for ACK/NAK
        if self == self.iface.localNode:
            onResponse = None
        else:
            onResponse = self.onAckNak
        return self._sendAdmin(p, onResponse=onResponse)

    def shutdown(self, secs: int = 10):
        """Tell the node to shutdown."""
        self.ensureSessionKey()
        p = admin_pb2.AdminMessage()
        p.shutdown_seconds = secs
        logging.info(f"Telling node to shutdown in {secs} seconds")

        # If sending to a remote node, wait for ACK/NAK
        if self == self.iface.localNode:
            onResponse = None
        else:
            onResponse = self.onAckNak
        return self._sendAdmin(p, onResponse=onResponse)

    def getMetadata(self):
        """Get the node's metadata."""
        p = admin_pb2.AdminMessage()
        p.get_device_metadata_request = True
        logging.info(f"Requesting device metadata")

        self._sendAdmin(
            p, wantResponse=True, onResponse=self.onRequestGetMetadata
        )
        self.iface.waitForAckNak()

    def factoryReset(self, full: bool = False):
        """Tell the node to factory reset."""
        self.ensureSessionKey()
        p = admin_pb2.AdminMessage()
        if full:
            p.factory_reset_device = True
            logging.info(f"Telling node to factory reset (full device reset)")
        else:
            p.factory_reset_config = True
            logging.info(f"Telling node to factory reset (config reset)")

        # If sending to a remote node, wait for ACK/NAK
        if self == self.iface.localNode:
            onResponse = None
        else:
            onResponse = self.onAckNak
        return self._sendAdmin(p, onResponse=onResponse)

    def removeNode(self, nodeId: Union[int, str]):
        """Tell the node to remove a specific node by ID"""
        self.ensureSessionKey()
        if isinstance(nodeId, str):
            if nodeId.startswith("!"):
                nodeId = int(nodeId[1:], 16)
            else:
                nodeId = int(nodeId)

        p = admin_pb2.AdminMessage()
        p.remove_by_nodenum = nodeId

        if self == self.iface.localNode:
            onResponse = None
        else:
            onResponse = self.onAckNak
        return self._sendAdmin(p, onResponse=onResponse)

    def resetNodeDb(self):
        """Tell the node to reset its list of nodes."""
        self.ensureSessionKey()
        p = admin_pb2.AdminMessage()
        p.nodedb_reset = True
        logging.info(f"Telling node to reset the NodeDB")

        # If sending to a remote node, wait for ACK/NAK
        if self == self.iface.localNode:
            onResponse = None
        else:
            onResponse = self.onAckNak
        return self._sendAdmin(p, onResponse=onResponse)

    def setFixedPosition(self, lat: Union[int, float], lon: Union[int, float], alt: int):
        """Tell the node to set fixed position to the provided value and enable the fixed position setting"""
        self.ensureSessionKey()

        p = mesh_pb2.Position()
        if isinstance(lat, float) and lat != 0.0:
            p.latitude_i = int(lat / 1e-7)
        elif isinstance(lat, int) and lat != 0:
            p.latitude_i = lat

        if isinstance(lon, float) and lon != 0.0:
            p.longitude_i = int(lon / 1e-7)
        elif isinstance(lon, int) and lon != 0:
            p.longitude_i = lon

        if alt != 0:
            p.altitude = alt

        a = admin_pb2.AdminMessage()
        a.set_fixed_position.CopyFrom(p)

        if self == self.iface.localNode:
            onResponse = None
        else:
            onResponse = self.onAckNak
        return self._sendAdmin(a, onResponse=onResponse)

    def removeFixedPosition(self):
        """Tell the node to remove the fixed position and set the fixed position setting to false"""
        self.ensureSessionKey()
        p = admin_pb2.AdminMessage()
        p.remove_fixed_position = True
        logging.info(f"Telling node to remove fixed position")

        if self == self.iface.localNode:
            onResponse = None
        else:
            onResponse = self.onAckNak
        return self._sendAdmin(p, onResponse=onResponse)

    def setTime(self, timeSec: int = 0):
        """Tell the node to set its time to the provided timestamp, or the system's current time if not provided or 0."""
        self.ensureSessionKey()
        if timeSec == 0:
            timeSec = int(time.time())
        p = admin_pb2.AdminMessage()
        p.set_time_only = timeSec
        logging.info(f"Setting node time to {timeSec}")

        if self == self.iface.localNode:
            onResponse = None
        else:
            onResponse = self.onAckNak
        return self._sendAdmin(p, onResponse=onResponse)

    def _fixupChannels(self):
        """Fixup indexes and add disabled channels as needed"""

        # Add extra disabled channels as needed
        # This is needed because the protobufs will have index **missing** if the channel number is zero
        for index, ch in enumerate(self.channels):
            ch.index = index  # fixup indexes

        self._fillChannels()

    def _fillChannels(self):
        """Mark unused channels as disabled"""

        # Add extra disabled channels as needed
        index = len(self.channels)
        while index < 8:
            ch = channel_pb2.Channel()
            ch.role = channel_pb2.Channel.Role.DISABLED
            ch.index = index
            self.channels.append(ch)
            index += 1

    def onRequestGetMetadata(self, p):
        """Handle the response packet for requesting device metadata getMetadata()"""
        logging.debug(f"onRequestGetMetadata() p:{p}")

        if "routing" in p["decoded"]:
            if p["decoded"]["routing"]["errorReason"] != "NONE":
                print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}')
                self.iface._acknowledgment.receivedNak = True
        else:
            self.iface._acknowledgment.receivedAck = True
            if p["decoded"]["portnum"] == portnums_pb2.PortNum.Name(
                portnums_pb2.PortNum.ROUTING_APP
            ):
                if p["decoded"]["routing"]["errorReason"] != "NONE":
                    logging.warning(
                        f'Metadata request failed, error reason: {p["decoded"]["routing"]["errorReason"]}'
                    )
                    self._timeout.expireTime = time.time()  # Do not wait any longer
                    return  # Don't try to parse this routing message
                logging.debug(f"Retrying metadata request.")
                self.getMetadata()
                return

            c = p["decoded"]["admin"]["raw"].get_device_metadata_response
            self._timeout.reset()  # We made forward progress
            logging.debug(f"Received metadata {stripnl(c)}")
            print(f"\nfirmware_version: {c.firmware_version}")
            print(f"device_state_version: {c.device_state_version}")

    def onResponseRequestChannel(self, p):
        """Handle the response packet for requesting a channel _requestChannel()"""
        logging.debug(f"onResponseRequestChannel() p:{p}")

        if p["decoded"]["portnum"] == portnums_pb2.PortNum.Name(
            portnums_pb2.PortNum.ROUTING_APP
        ):
            if p["decoded"]["routing"]["errorReason"] != "NONE":
                logging.warning(
                    f'Channel request failed, error reason: {p["decoded"]["routing"]["errorReason"]}'
                )
                self._timeout.expireTime = time.time()  # Do not wait any longer
                return  # Don't try to parse this routing message
            lastTried = 0
            if len(self.partialChannels) > 0:
                lastTried = self.partialChannels[-1].index
            logging.debug(f"Retrying previous channel request.")
            self._requestChannel(lastTried)
            return

        c = p["decoded"]["admin"]["raw"].get_channel_response
        self.partialChannels.append(c)
        self._timeout.reset()  # We made forward progress
        logging.debug(f"Received channel {stripnl(c)}")
        index = c.index

        if index >= 8 - 1:
            logging.debug("Finished downloading channels")

            self.channels = self.partialChannels
            self._fixupChannels()
        else:
            self._requestChannel(index + 1)

    def onAckNak(self, p):
        """Informative handler for ACK/NAK responses"""
        if p["decoded"]["routing"]["errorReason"] != "NONE":
            print(
                f'Received a NAK, error reason: {p["decoded"]["routing"]["errorReason"]}'
            )
            self.iface._acknowledgment.receivedNak = True
        else:
            if int(p["from"]) == self.iface.localNode.nodeNum:
                print(
                    f"Received an implicit ACK. Packet will likely arrive, but cannot be guaranteed."
                )
                self.iface._acknowledgment.receivedImplAck = True
            else:
                print(f"Received an ACK.")
                self.iface._acknowledgment.receivedAck = True

    def _requestChannel(self, channelNum: int):
        """Done with initial config messages, now send regular
        MeshPackets to ask for settings"""
        p = admin_pb2.AdminMessage()
        p.get_channel_request = channelNum + 1

        # Show progress message for super slow operations
        if self != self.iface.localNode:
            print(
                f"Requesting channel {channelNum} info from remote node (this could take a while)"
            )
            logging.debug(
                f"Requesting channel {channelNum} info from remote node (this could take a while)"
            )
        else:
            logging.debug(f"Requesting channel {channelNum}")

        return self._sendAdmin(
            p, wantResponse=True, onResponse=self.onResponseRequestChannel
        )

    # pylint: disable=R1710
    def _sendAdmin(
        self,
        p: admin_pb2.AdminMessage,
        wantResponse: bool=True,
        onResponse=None,
        adminIndex: int=0,
    ):
        """Send an admin message to the specified node (or the local node if destNodeNum is zero)"""

        if self.noProto:
            logging.warning(
                f"Not sending packet because protocol use is disabled by noProto"
            )
        else:
            if (
                adminIndex == 0
            ):  # unless a special channel index was used, we want to use the admin index
                adminIndex = self.iface.localNode._getAdminChannelIndex()
            logging.debug(f"adminIndex:{adminIndex}")
            if isinstance(self.nodeNum, int):
                nodeid = self.nodeNum
            else: # assume string starting with !
                nodeid = int(self.nodeNum[1:],16)
            if "adminSessionPassKey" in self.iface._getOrCreateByNum(nodeid):
                p.session_passkey = self.iface._getOrCreateByNum(nodeid).get("adminSessionPassKey")
            return self.iface.sendData(
                p,
                self.nodeNum,
                portNum=portnums_pb2.PortNum.ADMIN_APP,
                wantAck=False,
                wantResponse=wantResponse,
                onResponse=onResponse,
                channelIndex=adminIndex,
                pkiEncrypted=True,
            )

    def ensureSessionKey(self):
        """If our entry in iface.nodesByNum doesn't already have an adminSessionPassKey, make a request to get one"""
        if self.noProto:
            logging.warning(
                f"Not ensuring session key, because protocol use is disabled by noProto"
            )
        else:
            if isinstance(self.nodeNum, int):
                nodeid = self.nodeNum
            else: # assume string starting with !
                nodeid = int(self.nodeNum[1:],16)
            if self.iface._getOrCreateByNum(nodeid).get("adminSessionPassKey") is None:
                self.requestConfig(admin_pb2.AdminMessage.SESSIONKEY_CONFIG)

Classes

class Node (iface, nodeNum, noProto=False, timeout: int = 300)

A model of a (local or remote) node in the mesh

Includes methods for localConfig, moduleConfig and channels

Constructor

Expand source code
class Node:
    """A model of a (local or remote) node in the mesh

    Includes methods for localConfig, moduleConfig and channels
    """

    def __init__(self, iface, nodeNum, noProto=False, timeout: int = 300):
        """Constructor"""
        self.iface = iface
        self.nodeNum = nodeNum
        self.localConfig = localonly_pb2.LocalConfig()
        self.moduleConfig = localonly_pb2.LocalModuleConfig()
        self.channels = None
        self._timeout = Timeout(maxSecs=timeout)
        self.partialChannels: Optional[List] = None
        self.noProto = noProto
        self.cannedPluginMessage = None
        self.cannedPluginMessageMessages = None
        self.ringtone = None
        self.ringtonePart = None

        self.gotResponse = None

    def showChannels(self):
        """Show human readable description of our channels."""
        print("Channels:")
        if self.channels:
            logging.debug(f"self.channels:{self.channels}")
            for c in self.channels:
                cStr = message_to_json(c.settings)
                # don't show disabled channels
                if channel_pb2.Channel.Role.Name(c.role) != "DISABLED":
                    print(
                        f"  Index {c.index}: {channel_pb2.Channel.Role.Name(c.role)} psk={pskToString(c.settings.psk)} {cStr}"
                    )
        publicURL = self.getURL(includeAll=False)
        adminURL = self.getURL(includeAll=True)
        print(f"\nPrimary channel URL: {publicURL}")
        if adminURL != publicURL:
            print(f"Complete URL (includes all channels): {adminURL}")

    def showInfo(self):
        """Show human readable description of our node"""
        prefs = ""
        if self.localConfig:
            prefs = message_to_json(self.localConfig, multiline=True)
        print(f"Preferences: {prefs}\n")
        prefs = ""
        if self.moduleConfig:
            prefs = message_to_json(self.moduleConfig, multiline=True)
        print(f"Module preferences: {prefs}\n")
        self.showChannels()

    def setChannels(self, channels):
        """Set the channels for this node"""
        self.channels = channels
        self._fixupChannels()

    def requestChannels(self, startingIndex: int = 0):
        """Send regular MeshPackets to ask channels."""
        logging.debug(f"requestChannels for nodeNum:{self.nodeNum}")
        # only initialize if we're starting out fresh
        if startingIndex == 0:
            self.channels = None
            self.partialChannels = []  # We keep our channels in a temp array until finished
        self._requestChannel(startingIndex)

    def onResponseRequestSettings(self, p):
        """Handle the response packets for requesting settings _requestSettings()"""
        logging.debug(f"onResponseRequestSetting() p:{p}")
        config_values = None
        if "routing" in p["decoded"]:
            if p["decoded"]["routing"]["errorReason"] != "NONE":
                print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}')
                self.iface._acknowledgment.receivedNak = True
        else:
            self.iface._acknowledgment.receivedAck = True
            print("")
            adminMessage = p["decoded"]["admin"]
            if "getConfigResponse" in adminMessage:
                oneof = "get_config_response"
                resp = adminMessage["getConfigResponse"]
                field = list(resp.keys())[0]
                config_type = self.localConfig.DESCRIPTOR.fields_by_name.get(
                    camel_to_snake(field)
                )
                if config_type is not None:
                    config_values = getattr(self.localConfig, config_type.name)
            elif "getModuleConfigResponse" in adminMessage:
                oneof = "get_module_config_response"
                resp = adminMessage["getModuleConfigResponse"]
                field = list(resp.keys())[0]
                config_type = self.moduleConfig.DESCRIPTOR.fields_by_name.get(
                    camel_to_snake(field)
                )
                config_values = getattr(self.moduleConfig, config_type.name)
            else:
                print(
                    "Did not receive a valid response. Make sure to have a shared channel named 'admin'."
                )
                return
            if config_values is not None:
                raw_config = getattr(getattr(adminMessage['raw'], oneof), camel_to_snake(field))
                config_values.CopyFrom(raw_config)
                print(f"{str(camel_to_snake(field))}:\n{str(config_values)}")

    def requestConfig(self, configType):
        """Request the config from the node via admin message"""
        if self == self.iface.localNode:
            onResponse = None
        else:
            onResponse = self.onResponseRequestSettings
            print("Requesting current config from remote node (this can take a while).")
        p = admin_pb2.AdminMessage()
        if isinstance(configType, int):
            p.get_config_request = configType

        else:
            msgIndex = configType.index
            if configType.containing_type.name == "LocalConfig":
                p.get_config_request = msgIndex
            else:
                p.get_module_config_request = msgIndex

        self._sendAdmin(p, wantResponse=True, onResponse=onResponse)
        if onResponse:
            self.iface.waitForAckNak()

    def turnOffEncryptionOnPrimaryChannel(self):
        """Turn off encryption on primary channel."""
        self.channels[0].settings.psk = fromPSK("none")
        print("Writing modified channels to device")
        self.writeChannel(0)

    def waitForConfig(self, attribute="channels"):
        """Block until radio config is received. Returns True if config has been received."""
        return self._timeout.waitForSet(self, attrs=("localConfig", attribute))

    def writeConfig(self, config_name):
        """Write the current (edited) localConfig to the device"""
        if self.localConfig is None:
            our_exit("Error: No localConfig has been read")

        p = admin_pb2.AdminMessage()

        if config_name == "device":
            p.set_config.device.CopyFrom(self.localConfig.device)
        elif config_name == "position":
            p.set_config.position.CopyFrom(self.localConfig.position)
        elif config_name == "power":
            p.set_config.power.CopyFrom(self.localConfig.power)
        elif config_name == "network":
            p.set_config.network.CopyFrom(self.localConfig.network)
        elif config_name == "display":
            p.set_config.display.CopyFrom(self.localConfig.display)
        elif config_name == "lora":
            p.set_config.lora.CopyFrom(self.localConfig.lora)
        elif config_name == "bluetooth":
            p.set_config.bluetooth.CopyFrom(self.localConfig.bluetooth)
        elif config_name == "security":
            p.set_config.security.CopyFrom(self.localConfig.security)
        elif config_name == "mqtt":
            p.set_module_config.mqtt.CopyFrom(self.moduleConfig.mqtt)
        elif config_name == "serial":
            p.set_module_config.serial.CopyFrom(self.moduleConfig.serial)
        elif config_name == "external_notification":
            p.set_module_config.external_notification.CopyFrom(
                self.moduleConfig.external_notification
            )
        elif config_name == "store_forward":
            p.set_module_config.store_forward.CopyFrom(self.moduleConfig.store_forward)
        elif config_name == "range_test":
            p.set_module_config.range_test.CopyFrom(self.moduleConfig.range_test)
        elif config_name == "telemetry":
            p.set_module_config.telemetry.CopyFrom(self.moduleConfig.telemetry)
        elif config_name == "canned_message":
            p.set_module_config.canned_message.CopyFrom(
                self.moduleConfig.canned_message
            )
        elif config_name == "audio":
            p.set_module_config.audio.CopyFrom(self.moduleConfig.audio)
        elif config_name == "remote_hardware":
            p.set_module_config.remote_hardware.CopyFrom(
                self.moduleConfig.remote_hardware
            )
        elif config_name == "neighbor_info":
            p.set_module_config.neighbor_info.CopyFrom(self.moduleConfig.neighbor_info)
        elif config_name == "detection_sensor":
            p.set_module_config.detection_sensor.CopyFrom(self.moduleConfig.detection_sensor)
        elif config_name == "ambient_lighting":
            p.set_module_config.ambient_lighting.CopyFrom(self.moduleConfig.ambient_lighting)
        elif config_name == "paxcounter":
            p.set_module_config.paxcounter.CopyFrom(self.moduleConfig.paxcounter)
        else:
            our_exit(f"Error: No valid config with name {config_name}")

        logging.debug(f"Wrote: {config_name}")
        if self == self.iface.localNode:
            onResponse = None
        else:
            onResponse = self.onAckNak
        self._sendAdmin(p, onResponse=onResponse)

    def writeChannel(self, channelIndex, adminIndex=0):
        """Write the current (edited) channel to the device"""
        self.ensureSessionKey()
        p = admin_pb2.AdminMessage()
        p.set_channel.CopyFrom(self.channels[channelIndex])
        self._sendAdmin(p, adminIndex=adminIndex)
        logging.debug(f"Wrote channel {channelIndex}")

    def getChannelByChannelIndex(self, channelIndex):
        """Get channel by channelIndex
        channelIndex: number, typically 0-7; based on max number channels
        returns: None if there is no channel found
        """
        ch = None
        if self.channels and 0 <= channelIndex < len(self.channels):
            ch = self.channels[channelIndex]
        return ch

    def deleteChannel(self, channelIndex):
        """Delete the specified channelIndex and shift other channels up"""
        ch = self.channels[channelIndex]
        if ch.role not in (
            channel_pb2.Channel.Role.SECONDARY,
            channel_pb2.Channel.Role.DISABLED,
        ):
            our_exit("Warning: Only SECONDARY channels can be deleted")

        # we are careful here because if we move the "admin" channel the channelIndex we need to use
        # for sending admin channels will also change
        adminIndex = self.iface.localNode._getAdminChannelIndex()

        self.channels.pop(channelIndex)
        self._fixupChannels()  # expand back to 8 channels

        index = channelIndex
        while index < 8:
            self.writeChannel(index, adminIndex=adminIndex)
            index += 1

            # if we are updating the local node, we might end up
            # *moving* the admin channel index as we are writing
            if (self.iface.localNode == self) and index >= adminIndex:
                # We've now passed the old location for admin index
                # (and written it), so we can start finding it by name again
                adminIndex = 0

    def getChannelByName(self, name):
        """Try to find the named channel or return None"""
        for c in self.channels or []:
            if c.settings and c.settings.name == name:
                return c
        return None

    def getDisabledChannel(self):
        """Return the first channel that is disabled (i.e. available for some new use)"""
        for c in self.channels:
            if c.role == channel_pb2.Channel.Role.DISABLED:
                return c
        return None

    def _getAdminChannelIndex(self):
        """Return the channel number of the admin channel, or 0 if no reserved channel"""
        for c in self.channels or []:
            if c.settings and c.settings.name.lower() == "admin":
                return c.index
        return 0

    def setOwner(self, long_name: Optional[str]=None, short_name: Optional[str]=None, is_licensed: bool=False):
        """Set device owner name"""
        logging.debug(f"in setOwner nodeNum:{self.nodeNum}")
        self.ensureSessionKey()
        p = admin_pb2.AdminMessage()

        nChars = 4
        if long_name is not None:
            long_name = long_name.strip()
            p.set_owner.long_name = long_name
            p.set_owner.is_licensed = is_licensed
        if short_name is not None:
            short_name = short_name.strip()
            if len(short_name) > nChars:
                short_name = short_name[:nChars]
                print(f"Maximum is 4 characters, truncated to {short_name}")
            p.set_owner.short_name = short_name

        # Note: These debug lines are used in unit tests
        logging.debug(f"p.set_owner.long_name:{p.set_owner.long_name}:")
        logging.debug(f"p.set_owner.short_name:{p.set_owner.short_name}:")
        logging.debug(f"p.set_owner.is_licensed:{p.set_owner.is_licensed}")
        # If sending to a remote node, wait for ACK/NAK
        if self == self.iface.localNode:
            onResponse = None
        else:
            onResponse = self.onAckNak
        return self._sendAdmin(p, onResponse=onResponse)

    def getURL(self, includeAll: bool = True):
        """The sharable URL that describes the current channel"""
        # Only keep the primary/secondary channels, assume primary is first
        channelSet = apponly_pb2.ChannelSet()
        if self.channels:
            for c in self.channels:
                if c.role == channel_pb2.Channel.Role.PRIMARY or (
                    includeAll and c.role == channel_pb2.Channel.Role.SECONDARY
                ):
                    channelSet.settings.append(c.settings)

        if len(self.localConfig.ListFields()) == 0:
            self.requestConfig(self.localConfig.DESCRIPTOR.fields_by_name.get('lora'))
        channelSet.lora_config.CopyFrom(self.localConfig.lora)
        some_bytes = channelSet.SerializeToString()
        s = base64.urlsafe_b64encode(some_bytes).decode("ascii")
        s = s.replace("=", "").replace("+", "-").replace("/", "_")
        return f"https://meshtastic.org/e/#{s}"

    def setURL(self, url):
        """Set mesh network URL"""
        if self.localConfig is None:
            our_exit("Warning: No Config has been read")

        # URLs are of the form https://meshtastic.org/d/#{base64_channel_set}
        # Split on '/#' to find the base64 encoded channel settings
        splitURL = url.split("/#")
        b64 = splitURL[-1]

        # We normally strip padding to make for a shorter URL, but the python parser doesn't like
        # that.  So add back any missing padding
        # per https://stackoverflow.com/a/9807138
        missing_padding = len(b64) % 4
        if missing_padding:
            b64 += "=" * (4 - missing_padding)

        decodedURL = base64.urlsafe_b64decode(b64)
        channelSet = apponly_pb2.ChannelSet()
        channelSet.ParseFromString(decodedURL)

        if len(channelSet.settings) == 0:
            our_exit("Warning: There were no settings.")

        i = 0
        for chs in channelSet.settings:
            ch = channel_pb2.Channel()
            ch.role = (
                channel_pb2.Channel.Role.PRIMARY
                if i == 0
                else channel_pb2.Channel.Role.SECONDARY
            )
            ch.index = i
            ch.settings.CopyFrom(chs)
            self.channels[ch.index] = ch
            logging.debug(f"Channel i:{i} ch:{ch}")
            self.writeChannel(ch.index)
            i = i + 1

        p = admin_pb2.AdminMessage()
        p.set_config.lora.CopyFrom(channelSet.lora_config)
        self.ensureSessionKey()
        self._sendAdmin(p)

    def onResponseRequestRingtone(self, p):
        """Handle the response packet for requesting ringtone part 1"""
        logging.debug(f"onResponseRequestRingtone() p:{p}")
        errorFound = False
        if "routing" in p["decoded"]:
            if p["decoded"]["routing"]["errorReason"] != "NONE":
                errorFound = True
                print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}')
        if errorFound is False:
            if "decoded" in p:
                if "admin" in p["decoded"]:
                    if "raw" in p["decoded"]["admin"]:
                        self.ringtonePart = p["decoded"]["admin"][
                            "raw"
                        ].get_ringtone_response
                        logging.debug(f"self.ringtonePart:{self.ringtonePart}")
                        self.gotResponse = True

    def get_ringtone(self):
        """Get the ringtone. Concatenate all pieces together and return a single string."""
        logging.debug(f"in get_ringtone()")
        if not self.ringtone:
            p1 = admin_pb2.AdminMessage()
            p1.get_ringtone_request = True
            self.gotResponse = False
            self._sendAdmin(
                p1, wantResponse=True, onResponse=self.onResponseRequestRingtone
            )
            while self.gotResponse is False:
                time.sleep(0.1)

            logging.debug(f"self.ringtone:{self.ringtone}")

            self.ringtone = ""
            if self.ringtonePart:
                self.ringtone += self.ringtonePart

        print(f"ringtone:{self.ringtone}")
        logging.debug(f"ringtone:{self.ringtone}")
        return self.ringtone

    def set_ringtone(self, ringtone):
        """Set the ringtone. The ringtone length must be less than 230 character."""

        if len(ringtone) > 230:
            our_exit("Warning: The ringtone must be less than 230 characters.")
        self.ensureSessionKey()
        # split into chunks
        chunks = []
        chunks_size = 230
        for i in range(0, len(ringtone), chunks_size):
            chunks.append(ringtone[i : i + chunks_size])

        # for each chunk, send a message to set the values
        # for i in range(0, len(chunks)):
        for i, chunk in enumerate(chunks):
            p = admin_pb2.AdminMessage()

            # TODO: should be a way to improve this
            if i == 0:
                p.set_ringtone_message = chunk

            logging.debug(f"Setting ringtone '{chunk}' part {i+1}")
            # If sending to a remote node, wait for ACK/NAK
            if self == self.iface.localNode:
                onResponse = None
            else:
                onResponse = self.onAckNak
            return self._sendAdmin(p, onResponse=onResponse)

    def onResponseRequestCannedMessagePluginMessageMessages(self, p):
        """Handle the response packet for requesting canned message plugin message part 1"""
        logging.debug(f"onResponseRequestCannedMessagePluginMessageMessages() p:{p}")
        errorFound = False
        if "routing" in p["decoded"]:
            if p["decoded"]["routing"]["errorReason"] != "NONE":
                errorFound = True
                print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}')
        if errorFound is False:
            if "decoded" in p:
                if "admin" in p["decoded"]:
                    if "raw" in p["decoded"]["admin"]:
                        self.cannedPluginMessageMessages = p["decoded"]["admin"][
                            "raw"
                        ].get_canned_message_module_messages_response
                        logging.debug(
                            f"self.cannedPluginMessageMessages:{self.cannedPluginMessageMessages}"
                        )
                        self.gotResponse = True

    def get_canned_message(self):
        """Get the canned message string. Concatenate all pieces together and return a single string."""
        logging.debug(f"in get_canned_message()")
        if not self.cannedPluginMessage:
            p1 = admin_pb2.AdminMessage()
            p1.get_canned_message_module_messages_request = True
            self.gotResponse = False
            self._sendAdmin(
                p1,
                wantResponse=True,
                onResponse=self.onResponseRequestCannedMessagePluginMessageMessages,
            )
            while self.gotResponse is False:
                time.sleep(0.1)

            logging.debug(
                f"self.cannedPluginMessageMessages:{self.cannedPluginMessageMessages}"
            )

            self.cannedPluginMessage = ""
            if self.cannedPluginMessageMessages:
                self.cannedPluginMessage += self.cannedPluginMessageMessages

        print(f"canned_plugin_message:{self.cannedPluginMessage}")
        logging.debug(f"canned_plugin_message:{self.cannedPluginMessage}")
        return self.cannedPluginMessage

    def set_canned_message(self, message):
        """Set the canned message. The canned messages length must be less than 200 character."""

        if len(message) > 200:
            our_exit("Warning: The canned message must be less than 200 characters.")
        self.ensureSessionKey()
        # split into chunks
        chunks = []
        chunks_size = 200
        for i in range(0, len(message), chunks_size):
            chunks.append(message[i : i + chunks_size])

        # for each chunk, send a message to set the values
        # for i in range(0, len(chunks)):
        for i, chunk in enumerate(chunks):
            p = admin_pb2.AdminMessage()

            # TODO: should be a way to improve this
            if i == 0:
                p.set_canned_message_module_messages = chunk

            logging.debug(f"Setting canned message '{chunk}' part {i+1}")
            # If sending to a remote node, wait for ACK/NAK
            if self == self.iface.localNode:
                onResponse = None
            else:
                onResponse = self.onAckNak
            return self._sendAdmin(p, onResponse=onResponse)

    def exitSimulator(self):
        """Tell a simulator node to exit (this message
        is ignored for other nodes)"""
        self.ensureSessionKey()
        p = admin_pb2.AdminMessage()
        p.exit_simulator = True
        logging.debug("in exitSimulator()")

        return self._sendAdmin(p)

    def reboot(self, secs: int = 10):
        """Tell the node to reboot."""
        self.ensureSessionKey()
        p = admin_pb2.AdminMessage()
        p.reboot_seconds = secs
        logging.info(f"Telling node to reboot in {secs} seconds")

        # If sending to a remote node, wait for ACK/NAK
        if self == self.iface.localNode:
            onResponse = None
        else:
            onResponse = self.onAckNak
        return self._sendAdmin(p, onResponse=onResponse)

    def beginSettingsTransaction(self):
        """Tell the node to open a transaction to edit settings."""
        self.ensureSessionKey()
        p = admin_pb2.AdminMessage()
        p.begin_edit_settings = True
        logging.info(f"Telling open a transaction to edit settings")

        # If sending to a remote node, wait for ACK/NAK
        if self == self.iface.localNode:
            onResponse = None
        else:
            onResponse = self.onAckNak
        return self._sendAdmin(p, onResponse=onResponse)

    def commitSettingsTransaction(self):
        """Tell the node to commit the open transaction for editing settings."""
        self.ensureSessionKey()
        p = admin_pb2.AdminMessage()
        p.commit_edit_settings = True
        logging.info(f"Telling node to commit open transaction for editing settings")

        # If sending to a remote node, wait for ACK/NAK
        if self == self.iface.localNode:
            onResponse = None
        else:
            onResponse = self.onAckNak
        return self._sendAdmin(p, onResponse=onResponse)

    def rebootOTA(self, secs: int = 10):
        """Tell the node to reboot into factory firmware."""
        self.ensureSessionKey()
        p = admin_pb2.AdminMessage()
        p.reboot_ota_seconds = secs
        logging.info(f"Telling node to reboot to OTA in {secs} seconds")

        # If sending to a remote node, wait for ACK/NAK
        if self == self.iface.localNode:
            onResponse = None
        else:
            onResponse = self.onAckNak
        return self._sendAdmin(p, onResponse=onResponse)

    def enterDFUMode(self):
        """Tell the node to enter DFU mode (NRF52)."""
        self.ensureSessionKey()
        p = admin_pb2.AdminMessage()
        p.enter_dfu_mode_request = True
        logging.info(f"Telling node to enable DFU mode")

        # If sending to a remote node, wait for ACK/NAK
        if self == self.iface.localNode:
            onResponse = None
        else:
            onResponse = self.onAckNak
        return self._sendAdmin(p, onResponse=onResponse)

    def shutdown(self, secs: int = 10):
        """Tell the node to shutdown."""
        self.ensureSessionKey()
        p = admin_pb2.AdminMessage()
        p.shutdown_seconds = secs
        logging.info(f"Telling node to shutdown in {secs} seconds")

        # If sending to a remote node, wait for ACK/NAK
        if self == self.iface.localNode:
            onResponse = None
        else:
            onResponse = self.onAckNak
        return self._sendAdmin(p, onResponse=onResponse)

    def getMetadata(self):
        """Get the node's metadata."""
        p = admin_pb2.AdminMessage()
        p.get_device_metadata_request = True
        logging.info(f"Requesting device metadata")

        self._sendAdmin(
            p, wantResponse=True, onResponse=self.onRequestGetMetadata
        )
        self.iface.waitForAckNak()

    def factoryReset(self, full: bool = False):
        """Tell the node to factory reset."""
        self.ensureSessionKey()
        p = admin_pb2.AdminMessage()
        if full:
            p.factory_reset_device = True
            logging.info(f"Telling node to factory reset (full device reset)")
        else:
            p.factory_reset_config = True
            logging.info(f"Telling node to factory reset (config reset)")

        # If sending to a remote node, wait for ACK/NAK
        if self == self.iface.localNode:
            onResponse = None
        else:
            onResponse = self.onAckNak
        return self._sendAdmin(p, onResponse=onResponse)

    def removeNode(self, nodeId: Union[int, str]):
        """Tell the node to remove a specific node by ID"""
        self.ensureSessionKey()
        if isinstance(nodeId, str):
            if nodeId.startswith("!"):
                nodeId = int(nodeId[1:], 16)
            else:
                nodeId = int(nodeId)

        p = admin_pb2.AdminMessage()
        p.remove_by_nodenum = nodeId

        if self == self.iface.localNode:
            onResponse = None
        else:
            onResponse = self.onAckNak
        return self._sendAdmin(p, onResponse=onResponse)

    def resetNodeDb(self):
        """Tell the node to reset its list of nodes."""
        self.ensureSessionKey()
        p = admin_pb2.AdminMessage()
        p.nodedb_reset = True
        logging.info(f"Telling node to reset the NodeDB")

        # If sending to a remote node, wait for ACK/NAK
        if self == self.iface.localNode:
            onResponse = None
        else:
            onResponse = self.onAckNak
        return self._sendAdmin(p, onResponse=onResponse)

    def setFixedPosition(self, lat: Union[int, float], lon: Union[int, float], alt: int):
        """Tell the node to set fixed position to the provided value and enable the fixed position setting"""
        self.ensureSessionKey()

        p = mesh_pb2.Position()
        if isinstance(lat, float) and lat != 0.0:
            p.latitude_i = int(lat / 1e-7)
        elif isinstance(lat, int) and lat != 0:
            p.latitude_i = lat

        if isinstance(lon, float) and lon != 0.0:
            p.longitude_i = int(lon / 1e-7)
        elif isinstance(lon, int) and lon != 0:
            p.longitude_i = lon

        if alt != 0:
            p.altitude = alt

        a = admin_pb2.AdminMessage()
        a.set_fixed_position.CopyFrom(p)

        if self == self.iface.localNode:
            onResponse = None
        else:
            onResponse = self.onAckNak
        return self._sendAdmin(a, onResponse=onResponse)

    def removeFixedPosition(self):
        """Tell the node to remove the fixed position and set the fixed position setting to false"""
        self.ensureSessionKey()
        p = admin_pb2.AdminMessage()
        p.remove_fixed_position = True
        logging.info(f"Telling node to remove fixed position")

        if self == self.iface.localNode:
            onResponse = None
        else:
            onResponse = self.onAckNak
        return self._sendAdmin(p, onResponse=onResponse)

    def setTime(self, timeSec: int = 0):
        """Tell the node to set its time to the provided timestamp, or the system's current time if not provided or 0."""
        self.ensureSessionKey()
        if timeSec == 0:
            timeSec = int(time.time())
        p = admin_pb2.AdminMessage()
        p.set_time_only = timeSec
        logging.info(f"Setting node time to {timeSec}")

        if self == self.iface.localNode:
            onResponse = None
        else:
            onResponse = self.onAckNak
        return self._sendAdmin(p, onResponse=onResponse)

    def _fixupChannels(self):
        """Fixup indexes and add disabled channels as needed"""

        # Add extra disabled channels as needed
        # This is needed because the protobufs will have index **missing** if the channel number is zero
        for index, ch in enumerate(self.channels):
            ch.index = index  # fixup indexes

        self._fillChannels()

    def _fillChannels(self):
        """Mark unused channels as disabled"""

        # Add extra disabled channels as needed
        index = len(self.channels)
        while index < 8:
            ch = channel_pb2.Channel()
            ch.role = channel_pb2.Channel.Role.DISABLED
            ch.index = index
            self.channels.append(ch)
            index += 1

    def onRequestGetMetadata(self, p):
        """Handle the response packet for requesting device metadata getMetadata()"""
        logging.debug(f"onRequestGetMetadata() p:{p}")

        if "routing" in p["decoded"]:
            if p["decoded"]["routing"]["errorReason"] != "NONE":
                print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}')
                self.iface._acknowledgment.receivedNak = True
        else:
            self.iface._acknowledgment.receivedAck = True
            if p["decoded"]["portnum"] == portnums_pb2.PortNum.Name(
                portnums_pb2.PortNum.ROUTING_APP
            ):
                if p["decoded"]["routing"]["errorReason"] != "NONE":
                    logging.warning(
                        f'Metadata request failed, error reason: {p["decoded"]["routing"]["errorReason"]}'
                    )
                    self._timeout.expireTime = time.time()  # Do not wait any longer
                    return  # Don't try to parse this routing message
                logging.debug(f"Retrying metadata request.")
                self.getMetadata()
                return

            c = p["decoded"]["admin"]["raw"].get_device_metadata_response
            self._timeout.reset()  # We made forward progress
            logging.debug(f"Received metadata {stripnl(c)}")
            print(f"\nfirmware_version: {c.firmware_version}")
            print(f"device_state_version: {c.device_state_version}")

    def onResponseRequestChannel(self, p):
        """Handle the response packet for requesting a channel _requestChannel()"""
        logging.debug(f"onResponseRequestChannel() p:{p}")

        if p["decoded"]["portnum"] == portnums_pb2.PortNum.Name(
            portnums_pb2.PortNum.ROUTING_APP
        ):
            if p["decoded"]["routing"]["errorReason"] != "NONE":
                logging.warning(
                    f'Channel request failed, error reason: {p["decoded"]["routing"]["errorReason"]}'
                )
                self._timeout.expireTime = time.time()  # Do not wait any longer
                return  # Don't try to parse this routing message
            lastTried = 0
            if len(self.partialChannels) > 0:
                lastTried = self.partialChannels[-1].index
            logging.debug(f"Retrying previous channel request.")
            self._requestChannel(lastTried)
            return

        c = p["decoded"]["admin"]["raw"].get_channel_response
        self.partialChannels.append(c)
        self._timeout.reset()  # We made forward progress
        logging.debug(f"Received channel {stripnl(c)}")
        index = c.index

        if index >= 8 - 1:
            logging.debug("Finished downloading channels")

            self.channels = self.partialChannels
            self._fixupChannels()
        else:
            self._requestChannel(index + 1)

    def onAckNak(self, p):
        """Informative handler for ACK/NAK responses"""
        if p["decoded"]["routing"]["errorReason"] != "NONE":
            print(
                f'Received a NAK, error reason: {p["decoded"]["routing"]["errorReason"]}'
            )
            self.iface._acknowledgment.receivedNak = True
        else:
            if int(p["from"]) == self.iface.localNode.nodeNum:
                print(
                    f"Received an implicit ACK. Packet will likely arrive, but cannot be guaranteed."
                )
                self.iface._acknowledgment.receivedImplAck = True
            else:
                print(f"Received an ACK.")
                self.iface._acknowledgment.receivedAck = True

    def _requestChannel(self, channelNum: int):
        """Done with initial config messages, now send regular
        MeshPackets to ask for settings"""
        p = admin_pb2.AdminMessage()
        p.get_channel_request = channelNum + 1

        # Show progress message for super slow operations
        if self != self.iface.localNode:
            print(
                f"Requesting channel {channelNum} info from remote node (this could take a while)"
            )
            logging.debug(
                f"Requesting channel {channelNum} info from remote node (this could take a while)"
            )
        else:
            logging.debug(f"Requesting channel {channelNum}")

        return self._sendAdmin(
            p, wantResponse=True, onResponse=self.onResponseRequestChannel
        )

    # pylint: disable=R1710
    def _sendAdmin(
        self,
        p: admin_pb2.AdminMessage,
        wantResponse: bool=True,
        onResponse=None,
        adminIndex: int=0,
    ):
        """Send an admin message to the specified node (or the local node if destNodeNum is zero)"""

        if self.noProto:
            logging.warning(
                f"Not sending packet because protocol use is disabled by noProto"
            )
        else:
            if (
                adminIndex == 0
            ):  # unless a special channel index was used, we want to use the admin index
                adminIndex = self.iface.localNode._getAdminChannelIndex()
            logging.debug(f"adminIndex:{adminIndex}")
            if isinstance(self.nodeNum, int):
                nodeid = self.nodeNum
            else: # assume string starting with !
                nodeid = int(self.nodeNum[1:],16)
            if "adminSessionPassKey" in self.iface._getOrCreateByNum(nodeid):
                p.session_passkey = self.iface._getOrCreateByNum(nodeid).get("adminSessionPassKey")
            return self.iface.sendData(
                p,
                self.nodeNum,
                portNum=portnums_pb2.PortNum.ADMIN_APP,
                wantAck=False,
                wantResponse=wantResponse,
                onResponse=onResponse,
                channelIndex=adminIndex,
                pkiEncrypted=True,
            )

    def ensureSessionKey(self):
        """If our entry in iface.nodesByNum doesn't already have an adminSessionPassKey, make a request to get one"""
        if self.noProto:
            logging.warning(
                f"Not ensuring session key, because protocol use is disabled by noProto"
            )
        else:
            if isinstance(self.nodeNum, int):
                nodeid = self.nodeNum
            else: # assume string starting with !
                nodeid = int(self.nodeNum[1:],16)
            if self.iface._getOrCreateByNum(nodeid).get("adminSessionPassKey") is None:
                self.requestConfig(admin_pb2.AdminMessage.SESSIONKEY_CONFIG)

Methods

def beginSettingsTransaction(self)

Tell the node to open a transaction to edit settings.

Expand source code
def beginSettingsTransaction(self):
    """Tell the node to open a transaction to edit settings."""
    self.ensureSessionKey()
    p = admin_pb2.AdminMessage()
    p.begin_edit_settings = True
    logging.info(f"Telling open a transaction to edit settings")

    # If sending to a remote node, wait for ACK/NAK
    if self == self.iface.localNode:
        onResponse = None
    else:
        onResponse = self.onAckNak
    return self._sendAdmin(p, onResponse=onResponse)
def commitSettingsTransaction(self)

Tell the node to commit the open transaction for editing settings.

Expand source code
def commitSettingsTransaction(self):
    """Tell the node to commit the open transaction for editing settings."""
    self.ensureSessionKey()
    p = admin_pb2.AdminMessage()
    p.commit_edit_settings = True
    logging.info(f"Telling node to commit open transaction for editing settings")

    # If sending to a remote node, wait for ACK/NAK
    if self == self.iface.localNode:
        onResponse = None
    else:
        onResponse = self.onAckNak
    return self._sendAdmin(p, onResponse=onResponse)
def deleteChannel(self, channelIndex)

Delete the specified channelIndex and shift other channels up

Expand source code
def deleteChannel(self, channelIndex):
    """Delete the specified channelIndex and shift other channels up"""
    ch = self.channels[channelIndex]
    if ch.role not in (
        channel_pb2.Channel.Role.SECONDARY,
        channel_pb2.Channel.Role.DISABLED,
    ):
        our_exit("Warning: Only SECONDARY channels can be deleted")

    # we are careful here because if we move the "admin" channel the channelIndex we need to use
    # for sending admin channels will also change
    adminIndex = self.iface.localNode._getAdminChannelIndex()

    self.channels.pop(channelIndex)
    self._fixupChannels()  # expand back to 8 channels

    index = channelIndex
    while index < 8:
        self.writeChannel(index, adminIndex=adminIndex)
        index += 1

        # if we are updating the local node, we might end up
        # *moving* the admin channel index as we are writing
        if (self.iface.localNode == self) and index >= adminIndex:
            # We've now passed the old location for admin index
            # (and written it), so we can start finding it by name again
            adminIndex = 0
def ensureSessionKey(self)

If our entry in iface.nodesByNum doesn't already have an adminSessionPassKey, make a request to get one

Expand source code
def ensureSessionKey(self):
    """If our entry in iface.nodesByNum doesn't already have an adminSessionPassKey, make a request to get one"""
    if self.noProto:
        logging.warning(
            f"Not ensuring session key, because protocol use is disabled by noProto"
        )
    else:
        if isinstance(self.nodeNum, int):
            nodeid = self.nodeNum
        else: # assume string starting with !
            nodeid = int(self.nodeNum[1:],16)
        if self.iface._getOrCreateByNum(nodeid).get("adminSessionPassKey") is None:
            self.requestConfig(admin_pb2.AdminMessage.SESSIONKEY_CONFIG)
def enterDFUMode(self)

Tell the node to enter DFU mode (NRF52).

Expand source code
def enterDFUMode(self):
    """Tell the node to enter DFU mode (NRF52)."""
    self.ensureSessionKey()
    p = admin_pb2.AdminMessage()
    p.enter_dfu_mode_request = True
    logging.info(f"Telling node to enable DFU mode")

    # If sending to a remote node, wait for ACK/NAK
    if self == self.iface.localNode:
        onResponse = None
    else:
        onResponse = self.onAckNak
    return self._sendAdmin(p, onResponse=onResponse)
def exitSimulator(self)

Tell a simulator node to exit (this message is ignored for other nodes)

Expand source code
def exitSimulator(self):
    """Tell a simulator node to exit (this message
    is ignored for other nodes)"""
    self.ensureSessionKey()
    p = admin_pb2.AdminMessage()
    p.exit_simulator = True
    logging.debug("in exitSimulator()")

    return self._sendAdmin(p)
def factoryReset(self, full: bool = False)

Tell the node to factory reset.

Expand source code
def factoryReset(self, full: bool = False):
    """Tell the node to factory reset."""
    self.ensureSessionKey()
    p = admin_pb2.AdminMessage()
    if full:
        p.factory_reset_device = True
        logging.info(f"Telling node to factory reset (full device reset)")
    else:
        p.factory_reset_config = True
        logging.info(f"Telling node to factory reset (config reset)")

    # If sending to a remote node, wait for ACK/NAK
    if self == self.iface.localNode:
        onResponse = None
    else:
        onResponse = self.onAckNak
    return self._sendAdmin(p, onResponse=onResponse)
def getChannelByChannelIndex(self, channelIndex)

Get channel by channelIndex channelIndex: number, typically 0-7; based on max number channels returns: None if there is no channel found

Expand source code
def getChannelByChannelIndex(self, channelIndex):
    """Get channel by channelIndex
    channelIndex: number, typically 0-7; based on max number channels
    returns: None if there is no channel found
    """
    ch = None
    if self.channels and 0 <= channelIndex < len(self.channels):
        ch = self.channels[channelIndex]
    return ch
def getChannelByName(self, name)

Try to find the named channel or return None

Expand source code
def getChannelByName(self, name):
    """Try to find the named channel or return None"""
    for c in self.channels or []:
        if c.settings and c.settings.name == name:
            return c
    return None
def getDisabledChannel(self)

Return the first channel that is disabled (i.e. available for some new use)

Expand source code
def getDisabledChannel(self):
    """Return the first channel that is disabled (i.e. available for some new use)"""
    for c in self.channels:
        if c.role == channel_pb2.Channel.Role.DISABLED:
            return c
    return None
def getMetadata(self)

Get the node's metadata.

Expand source code
def getMetadata(self):
    """Get the node's metadata."""
    p = admin_pb2.AdminMessage()
    p.get_device_metadata_request = True
    logging.info(f"Requesting device metadata")

    self._sendAdmin(
        p, wantResponse=True, onResponse=self.onRequestGetMetadata
    )
    self.iface.waitForAckNak()
def getURL(self, includeAll: bool = True)

The sharable URL that describes the current channel

Expand source code
def getURL(self, includeAll: bool = True):
    """The sharable URL that describes the current channel"""
    # Only keep the primary/secondary channels, assume primary is first
    channelSet = apponly_pb2.ChannelSet()
    if self.channels:
        for c in self.channels:
            if c.role == channel_pb2.Channel.Role.PRIMARY or (
                includeAll and c.role == channel_pb2.Channel.Role.SECONDARY
            ):
                channelSet.settings.append(c.settings)

    if len(self.localConfig.ListFields()) == 0:
        self.requestConfig(self.localConfig.DESCRIPTOR.fields_by_name.get('lora'))
    channelSet.lora_config.CopyFrom(self.localConfig.lora)
    some_bytes = channelSet.SerializeToString()
    s = base64.urlsafe_b64encode(some_bytes).decode("ascii")
    s = s.replace("=", "").replace("+", "-").replace("/", "_")
    return f"https://meshtastic.org/e/#{s}"
def get_canned_message(self)

Get the canned message string. Concatenate all pieces together and return a single string.

Expand source code
def get_canned_message(self):
    """Get the canned message string. Concatenate all pieces together and return a single string."""
    logging.debug(f"in get_canned_message()")
    if not self.cannedPluginMessage:
        p1 = admin_pb2.AdminMessage()
        p1.get_canned_message_module_messages_request = True
        self.gotResponse = False
        self._sendAdmin(
            p1,
            wantResponse=True,
            onResponse=self.onResponseRequestCannedMessagePluginMessageMessages,
        )
        while self.gotResponse is False:
            time.sleep(0.1)

        logging.debug(
            f"self.cannedPluginMessageMessages:{self.cannedPluginMessageMessages}"
        )

        self.cannedPluginMessage = ""
        if self.cannedPluginMessageMessages:
            self.cannedPluginMessage += self.cannedPluginMessageMessages

    print(f"canned_plugin_message:{self.cannedPluginMessage}")
    logging.debug(f"canned_plugin_message:{self.cannedPluginMessage}")
    return self.cannedPluginMessage
def get_ringtone(self)

Get the ringtone. Concatenate all pieces together and return a single string.

Expand source code
def get_ringtone(self):
    """Get the ringtone. Concatenate all pieces together and return a single string."""
    logging.debug(f"in get_ringtone()")
    if not self.ringtone:
        p1 = admin_pb2.AdminMessage()
        p1.get_ringtone_request = True
        self.gotResponse = False
        self._sendAdmin(
            p1, wantResponse=True, onResponse=self.onResponseRequestRingtone
        )
        while self.gotResponse is False:
            time.sleep(0.1)

        logging.debug(f"self.ringtone:{self.ringtone}")

        self.ringtone = ""
        if self.ringtonePart:
            self.ringtone += self.ringtonePart

    print(f"ringtone:{self.ringtone}")
    logging.debug(f"ringtone:{self.ringtone}")
    return self.ringtone
def onAckNak(self, p)

Informative handler for ACK/NAK responses

Expand source code
def onAckNak(self, p):
    """Informative handler for ACK/NAK responses"""
    if p["decoded"]["routing"]["errorReason"] != "NONE":
        print(
            f'Received a NAK, error reason: {p["decoded"]["routing"]["errorReason"]}'
        )
        self.iface._acknowledgment.receivedNak = True
    else:
        if int(p["from"]) == self.iface.localNode.nodeNum:
            print(
                f"Received an implicit ACK. Packet will likely arrive, but cannot be guaranteed."
            )
            self.iface._acknowledgment.receivedImplAck = True
        else:
            print(f"Received an ACK.")
            self.iface._acknowledgment.receivedAck = True
def onRequestGetMetadata(self, p)

Handle the response packet for requesting device metadata getMetadata()

Expand source code
def onRequestGetMetadata(self, p):
    """Handle the response packet for requesting device metadata getMetadata()"""
    logging.debug(f"onRequestGetMetadata() p:{p}")

    if "routing" in p["decoded"]:
        if p["decoded"]["routing"]["errorReason"] != "NONE":
            print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}')
            self.iface._acknowledgment.receivedNak = True
    else:
        self.iface._acknowledgment.receivedAck = True
        if p["decoded"]["portnum"] == portnums_pb2.PortNum.Name(
            portnums_pb2.PortNum.ROUTING_APP
        ):
            if p["decoded"]["routing"]["errorReason"] != "NONE":
                logging.warning(
                    f'Metadata request failed, error reason: {p["decoded"]["routing"]["errorReason"]}'
                )
                self._timeout.expireTime = time.time()  # Do not wait any longer
                return  # Don't try to parse this routing message
            logging.debug(f"Retrying metadata request.")
            self.getMetadata()
            return

        c = p["decoded"]["admin"]["raw"].get_device_metadata_response
        self._timeout.reset()  # We made forward progress
        logging.debug(f"Received metadata {stripnl(c)}")
        print(f"\nfirmware_version: {c.firmware_version}")
        print(f"device_state_version: {c.device_state_version}")
def onResponseRequestCannedMessagePluginMessageMessages(self, p)

Handle the response packet for requesting canned message plugin message part 1

Expand source code
def onResponseRequestCannedMessagePluginMessageMessages(self, p):
    """Handle the response packet for requesting canned message plugin message part 1"""
    logging.debug(f"onResponseRequestCannedMessagePluginMessageMessages() p:{p}")
    errorFound = False
    if "routing" in p["decoded"]:
        if p["decoded"]["routing"]["errorReason"] != "NONE":
            errorFound = True
            print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}')
    if errorFound is False:
        if "decoded" in p:
            if "admin" in p["decoded"]:
                if "raw" in p["decoded"]["admin"]:
                    self.cannedPluginMessageMessages = p["decoded"]["admin"][
                        "raw"
                    ].get_canned_message_module_messages_response
                    logging.debug(
                        f"self.cannedPluginMessageMessages:{self.cannedPluginMessageMessages}"
                    )
                    self.gotResponse = True
def onResponseRequestChannel(self, p)

Handle the response packet for requesting a channel _requestChannel()

Expand source code
def onResponseRequestChannel(self, p):
    """Handle the response packet for requesting a channel _requestChannel()"""
    logging.debug(f"onResponseRequestChannel() p:{p}")

    if p["decoded"]["portnum"] == portnums_pb2.PortNum.Name(
        portnums_pb2.PortNum.ROUTING_APP
    ):
        if p["decoded"]["routing"]["errorReason"] != "NONE":
            logging.warning(
                f'Channel request failed, error reason: {p["decoded"]["routing"]["errorReason"]}'
            )
            self._timeout.expireTime = time.time()  # Do not wait any longer
            return  # Don't try to parse this routing message
        lastTried = 0
        if len(self.partialChannels) > 0:
            lastTried = self.partialChannels[-1].index
        logging.debug(f"Retrying previous channel request.")
        self._requestChannel(lastTried)
        return

    c = p["decoded"]["admin"]["raw"].get_channel_response
    self.partialChannels.append(c)
    self._timeout.reset()  # We made forward progress
    logging.debug(f"Received channel {stripnl(c)}")
    index = c.index

    if index >= 8 - 1:
        logging.debug("Finished downloading channels")

        self.channels = self.partialChannels
        self._fixupChannels()
    else:
        self._requestChannel(index + 1)
def onResponseRequestRingtone(self, p)

Handle the response packet for requesting ringtone part 1

Expand source code
def onResponseRequestRingtone(self, p):
    """Handle the response packet for requesting ringtone part 1"""
    logging.debug(f"onResponseRequestRingtone() p:{p}")
    errorFound = False
    if "routing" in p["decoded"]:
        if p["decoded"]["routing"]["errorReason"] != "NONE":
            errorFound = True
            print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}')
    if errorFound is False:
        if "decoded" in p:
            if "admin" in p["decoded"]:
                if "raw" in p["decoded"]["admin"]:
                    self.ringtonePart = p["decoded"]["admin"][
                        "raw"
                    ].get_ringtone_response
                    logging.debug(f"self.ringtonePart:{self.ringtonePart}")
                    self.gotResponse = True
def onResponseRequestSettings(self, p)

Handle the response packets for requesting settings _requestSettings()

Expand source code
def onResponseRequestSettings(self, p):
    """Handle the response packets for requesting settings _requestSettings()"""
    logging.debug(f"onResponseRequestSetting() p:{p}")
    config_values = None
    if "routing" in p["decoded"]:
        if p["decoded"]["routing"]["errorReason"] != "NONE":
            print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}')
            self.iface._acknowledgment.receivedNak = True
    else:
        self.iface._acknowledgment.receivedAck = True
        print("")
        adminMessage = p["decoded"]["admin"]
        if "getConfigResponse" in adminMessage:
            oneof = "get_config_response"
            resp = adminMessage["getConfigResponse"]
            field = list(resp.keys())[0]
            config_type = self.localConfig.DESCRIPTOR.fields_by_name.get(
                camel_to_snake(field)
            )
            if config_type is not None:
                config_values = getattr(self.localConfig, config_type.name)
        elif "getModuleConfigResponse" in adminMessage:
            oneof = "get_module_config_response"
            resp = adminMessage["getModuleConfigResponse"]
            field = list(resp.keys())[0]
            config_type = self.moduleConfig.DESCRIPTOR.fields_by_name.get(
                camel_to_snake(field)
            )
            config_values = getattr(self.moduleConfig, config_type.name)
        else:
            print(
                "Did not receive a valid response. Make sure to have a shared channel named 'admin'."
            )
            return
        if config_values is not None:
            raw_config = getattr(getattr(adminMessage['raw'], oneof), camel_to_snake(field))
            config_values.CopyFrom(raw_config)
            print(f"{str(camel_to_snake(field))}:\n{str(config_values)}")
def reboot(self, secs: int = 10)

Tell the node to reboot.

Expand source code
def reboot(self, secs: int = 10):
    """Tell the node to reboot."""
    self.ensureSessionKey()
    p = admin_pb2.AdminMessage()
    p.reboot_seconds = secs
    logging.info(f"Telling node to reboot in {secs} seconds")

    # If sending to a remote node, wait for ACK/NAK
    if self == self.iface.localNode:
        onResponse = None
    else:
        onResponse = self.onAckNak
    return self._sendAdmin(p, onResponse=onResponse)
def rebootOTA(self, secs: int = 10)

Tell the node to reboot into factory firmware.

Expand source code
def rebootOTA(self, secs: int = 10):
    """Tell the node to reboot into factory firmware."""
    self.ensureSessionKey()
    p = admin_pb2.AdminMessage()
    p.reboot_ota_seconds = secs
    logging.info(f"Telling node to reboot to OTA in {secs} seconds")

    # If sending to a remote node, wait for ACK/NAK
    if self == self.iface.localNode:
        onResponse = None
    else:
        onResponse = self.onAckNak
    return self._sendAdmin(p, onResponse=onResponse)
def removeFixedPosition(self)

Tell the node to remove the fixed position and set the fixed position setting to false

Expand source code
def removeFixedPosition(self):
    """Tell the node to remove the fixed position and set the fixed position setting to false"""
    self.ensureSessionKey()
    p = admin_pb2.AdminMessage()
    p.remove_fixed_position = True
    logging.info(f"Telling node to remove fixed position")

    if self == self.iface.localNode:
        onResponse = None
    else:
        onResponse = self.onAckNak
    return self._sendAdmin(p, onResponse=onResponse)
def removeNode(self, nodeId: Union[int, str])

Tell the node to remove a specific node by ID

Expand source code
def removeNode(self, nodeId: Union[int, str]):
    """Tell the node to remove a specific node by ID"""
    self.ensureSessionKey()
    if isinstance(nodeId, str):
        if nodeId.startswith("!"):
            nodeId = int(nodeId[1:], 16)
        else:
            nodeId = int(nodeId)

    p = admin_pb2.AdminMessage()
    p.remove_by_nodenum = nodeId

    if self == self.iface.localNode:
        onResponse = None
    else:
        onResponse = self.onAckNak
    return self._sendAdmin(p, onResponse=onResponse)
def requestChannels(self, startingIndex: int = 0)

Send regular MeshPackets to ask channels.

Expand source code
def requestChannels(self, startingIndex: int = 0):
    """Send regular MeshPackets to ask channels."""
    logging.debug(f"requestChannels for nodeNum:{self.nodeNum}")
    # only initialize if we're starting out fresh
    if startingIndex == 0:
        self.channels = None
        self.partialChannels = []  # We keep our channels in a temp array until finished
    self._requestChannel(startingIndex)
def requestConfig(self, configType)

Request the config from the node via admin message

Expand source code
def requestConfig(self, configType):
    """Request the config from the node via admin message"""
    if self == self.iface.localNode:
        onResponse = None
    else:
        onResponse = self.onResponseRequestSettings
        print("Requesting current config from remote node (this can take a while).")
    p = admin_pb2.AdminMessage()
    if isinstance(configType, int):
        p.get_config_request = configType

    else:
        msgIndex = configType.index
        if configType.containing_type.name == "LocalConfig":
            p.get_config_request = msgIndex
        else:
            p.get_module_config_request = msgIndex

    self._sendAdmin(p, wantResponse=True, onResponse=onResponse)
    if onResponse:
        self.iface.waitForAckNak()
def resetNodeDb(self)

Tell the node to reset its list of nodes.

Expand source code
def resetNodeDb(self):
    """Tell the node to reset its list of nodes."""
    self.ensureSessionKey()
    p = admin_pb2.AdminMessage()
    p.nodedb_reset = True
    logging.info(f"Telling node to reset the NodeDB")

    # If sending to a remote node, wait for ACK/NAK
    if self == self.iface.localNode:
        onResponse = None
    else:
        onResponse = self.onAckNak
    return self._sendAdmin(p, onResponse=onResponse)
def setChannels(self, channels)

Set the channels for this node

Expand source code
def setChannels(self, channels):
    """Set the channels for this node"""
    self.channels = channels
    self._fixupChannels()
def setFixedPosition(self, lat: Union[int, float], lon: Union[int, float], alt: int)

Tell the node to set fixed position to the provided value and enable the fixed position setting

Expand source code
def setFixedPosition(self, lat: Union[int, float], lon: Union[int, float], alt: int):
    """Tell the node to set fixed position to the provided value and enable the fixed position setting"""
    self.ensureSessionKey()

    p = mesh_pb2.Position()
    if isinstance(lat, float) and lat != 0.0:
        p.latitude_i = int(lat / 1e-7)
    elif isinstance(lat, int) and lat != 0:
        p.latitude_i = lat

    if isinstance(lon, float) and lon != 0.0:
        p.longitude_i = int(lon / 1e-7)
    elif isinstance(lon, int) and lon != 0:
        p.longitude_i = lon

    if alt != 0:
        p.altitude = alt

    a = admin_pb2.AdminMessage()
    a.set_fixed_position.CopyFrom(p)

    if self == self.iface.localNode:
        onResponse = None
    else:
        onResponse = self.onAckNak
    return self._sendAdmin(a, onResponse=onResponse)
def setOwner(self, long_name: Optional[str] = None, short_name: Optional[str] = None, is_licensed: bool = False)

Set device owner name

Expand source code
def setOwner(self, long_name: Optional[str]=None, short_name: Optional[str]=None, is_licensed: bool=False):
    """Set device owner name"""
    logging.debug(f"in setOwner nodeNum:{self.nodeNum}")
    self.ensureSessionKey()
    p = admin_pb2.AdminMessage()

    nChars = 4
    if long_name is not None:
        long_name = long_name.strip()
        p.set_owner.long_name = long_name
        p.set_owner.is_licensed = is_licensed
    if short_name is not None:
        short_name = short_name.strip()
        if len(short_name) > nChars:
            short_name = short_name[:nChars]
            print(f"Maximum is 4 characters, truncated to {short_name}")
        p.set_owner.short_name = short_name

    # Note: These debug lines are used in unit tests
    logging.debug(f"p.set_owner.long_name:{p.set_owner.long_name}:")
    logging.debug(f"p.set_owner.short_name:{p.set_owner.short_name}:")
    logging.debug(f"p.set_owner.is_licensed:{p.set_owner.is_licensed}")
    # If sending to a remote node, wait for ACK/NAK
    if self == self.iface.localNode:
        onResponse = None
    else:
        onResponse = self.onAckNak
    return self._sendAdmin(p, onResponse=onResponse)
def setTime(self, timeSec: int = 0)

Tell the node to set its time to the provided timestamp, or the system's current time if not provided or 0.

Expand source code
def setTime(self, timeSec: int = 0):
    """Tell the node to set its time to the provided timestamp, or the system's current time if not provided or 0."""
    self.ensureSessionKey()
    if timeSec == 0:
        timeSec = int(time.time())
    p = admin_pb2.AdminMessage()
    p.set_time_only = timeSec
    logging.info(f"Setting node time to {timeSec}")

    if self == self.iface.localNode:
        onResponse = None
    else:
        onResponse = self.onAckNak
    return self._sendAdmin(p, onResponse=onResponse)
def setURL(self, url)

Set mesh network URL

Expand source code
def setURL(self, url):
    """Set mesh network URL"""
    if self.localConfig is None:
        our_exit("Warning: No Config has been read")

    # URLs are of the form https://meshtastic.org/d/#{base64_channel_set}
    # Split on '/#' to find the base64 encoded channel settings
    splitURL = url.split("/#")
    b64 = splitURL[-1]

    # We normally strip padding to make for a shorter URL, but the python parser doesn't like
    # that.  So add back any missing padding
    # per https://stackoverflow.com/a/9807138
    missing_padding = len(b64) % 4
    if missing_padding:
        b64 += "=" * (4 - missing_padding)

    decodedURL = base64.urlsafe_b64decode(b64)
    channelSet = apponly_pb2.ChannelSet()
    channelSet.ParseFromString(decodedURL)

    if len(channelSet.settings) == 0:
        our_exit("Warning: There were no settings.")

    i = 0
    for chs in channelSet.settings:
        ch = channel_pb2.Channel()
        ch.role = (
            channel_pb2.Channel.Role.PRIMARY
            if i == 0
            else channel_pb2.Channel.Role.SECONDARY
        )
        ch.index = i
        ch.settings.CopyFrom(chs)
        self.channels[ch.index] = ch
        logging.debug(f"Channel i:{i} ch:{ch}")
        self.writeChannel(ch.index)
        i = i + 1

    p = admin_pb2.AdminMessage()
    p.set_config.lora.CopyFrom(channelSet.lora_config)
    self.ensureSessionKey()
    self._sendAdmin(p)
def set_canned_message(self, message)

Set the canned message. The canned messages length must be less than 200 character.

Expand source code
def set_canned_message(self, message):
    """Set the canned message. The canned messages length must be less than 200 character."""

    if len(message) > 200:
        our_exit("Warning: The canned message must be less than 200 characters.")
    self.ensureSessionKey()
    # split into chunks
    chunks = []
    chunks_size = 200
    for i in range(0, len(message), chunks_size):
        chunks.append(message[i : i + chunks_size])

    # for each chunk, send a message to set the values
    # for i in range(0, len(chunks)):
    for i, chunk in enumerate(chunks):
        p = admin_pb2.AdminMessage()

        # TODO: should be a way to improve this
        if i == 0:
            p.set_canned_message_module_messages = chunk

        logging.debug(f"Setting canned message '{chunk}' part {i+1}")
        # If sending to a remote node, wait for ACK/NAK
        if self == self.iface.localNode:
            onResponse = None
        else:
            onResponse = self.onAckNak
        return self._sendAdmin(p, onResponse=onResponse)
def set_ringtone(self, ringtone)

Set the ringtone. The ringtone length must be less than 230 character.

Expand source code
def set_ringtone(self, ringtone):
    """Set the ringtone. The ringtone length must be less than 230 character."""

    if len(ringtone) > 230:
        our_exit("Warning: The ringtone must be less than 230 characters.")
    self.ensureSessionKey()
    # split into chunks
    chunks = []
    chunks_size = 230
    for i in range(0, len(ringtone), chunks_size):
        chunks.append(ringtone[i : i + chunks_size])

    # for each chunk, send a message to set the values
    # for i in range(0, len(chunks)):
    for i, chunk in enumerate(chunks):
        p = admin_pb2.AdminMessage()

        # TODO: should be a way to improve this
        if i == 0:
            p.set_ringtone_message = chunk

        logging.debug(f"Setting ringtone '{chunk}' part {i+1}")
        # If sending to a remote node, wait for ACK/NAK
        if self == self.iface.localNode:
            onResponse = None
        else:
            onResponse = self.onAckNak
        return self._sendAdmin(p, onResponse=onResponse)
def showChannels(self)

Show human readable description of our channels.

Expand source code
def showChannels(self):
    """Show human readable description of our channels."""
    print("Channels:")
    if self.channels:
        logging.debug(f"self.channels:{self.channels}")
        for c in self.channels:
            cStr = message_to_json(c.settings)
            # don't show disabled channels
            if channel_pb2.Channel.Role.Name(c.role) != "DISABLED":
                print(
                    f"  Index {c.index}: {channel_pb2.Channel.Role.Name(c.role)} psk={pskToString(c.settings.psk)} {cStr}"
                )
    publicURL = self.getURL(includeAll=False)
    adminURL = self.getURL(includeAll=True)
    print(f"\nPrimary channel URL: {publicURL}")
    if adminURL != publicURL:
        print(f"Complete URL (includes all channels): {adminURL}")
def showInfo(self)

Show human readable description of our node

Expand source code
def showInfo(self):
    """Show human readable description of our node"""
    prefs = ""
    if self.localConfig:
        prefs = message_to_json(self.localConfig, multiline=True)
    print(f"Preferences: {prefs}\n")
    prefs = ""
    if self.moduleConfig:
        prefs = message_to_json(self.moduleConfig, multiline=True)
    print(f"Module preferences: {prefs}\n")
    self.showChannels()
def shutdown(self, secs: int = 10)

Tell the node to shutdown.

Expand source code
def shutdown(self, secs: int = 10):
    """Tell the node to shutdown."""
    self.ensureSessionKey()
    p = admin_pb2.AdminMessage()
    p.shutdown_seconds = secs
    logging.info(f"Telling node to shutdown in {secs} seconds")

    # If sending to a remote node, wait for ACK/NAK
    if self == self.iface.localNode:
        onResponse = None
    else:
        onResponse = self.onAckNak
    return self._sendAdmin(p, onResponse=onResponse)
def turnOffEncryptionOnPrimaryChannel(self)

Turn off encryption on primary channel.

Expand source code
def turnOffEncryptionOnPrimaryChannel(self):
    """Turn off encryption on primary channel."""
    self.channels[0].settings.psk = fromPSK("none")
    print("Writing modified channels to device")
    self.writeChannel(0)
def waitForConfig(self, attribute='channels')

Block until radio config is received. Returns True if config has been received.

Expand source code
def waitForConfig(self, attribute="channels"):
    """Block until radio config is received. Returns True if config has been received."""
    return self._timeout.waitForSet(self, attrs=("localConfig", attribute))
def writeChannel(self, channelIndex, adminIndex=0)

Write the current (edited) channel to the device

Expand source code
def writeChannel(self, channelIndex, adminIndex=0):
    """Write the current (edited) channel to the device"""
    self.ensureSessionKey()
    p = admin_pb2.AdminMessage()
    p.set_channel.CopyFrom(self.channels[channelIndex])
    self._sendAdmin(p, adminIndex=adminIndex)
    logging.debug(f"Wrote channel {channelIndex}")
def writeConfig(self, config_name)

Write the current (edited) localConfig to the device

Expand source code
def writeConfig(self, config_name):
    """Write the current (edited) localConfig to the device"""
    if self.localConfig is None:
        our_exit("Error: No localConfig has been read")

    p = admin_pb2.AdminMessage()

    if config_name == "device":
        p.set_config.device.CopyFrom(self.localConfig.device)
    elif config_name == "position":
        p.set_config.position.CopyFrom(self.localConfig.position)
    elif config_name == "power":
        p.set_config.power.CopyFrom(self.localConfig.power)
    elif config_name == "network":
        p.set_config.network.CopyFrom(self.localConfig.network)
    elif config_name == "display":
        p.set_config.display.CopyFrom(self.localConfig.display)
    elif config_name == "lora":
        p.set_config.lora.CopyFrom(self.localConfig.lora)
    elif config_name == "bluetooth":
        p.set_config.bluetooth.CopyFrom(self.localConfig.bluetooth)
    elif config_name == "security":
        p.set_config.security.CopyFrom(self.localConfig.security)
    elif config_name == "mqtt":
        p.set_module_config.mqtt.CopyFrom(self.moduleConfig.mqtt)
    elif config_name == "serial":
        p.set_module_config.serial.CopyFrom(self.moduleConfig.serial)
    elif config_name == "external_notification":
        p.set_module_config.external_notification.CopyFrom(
            self.moduleConfig.external_notification
        )
    elif config_name == "store_forward":
        p.set_module_config.store_forward.CopyFrom(self.moduleConfig.store_forward)
    elif config_name == "range_test":
        p.set_module_config.range_test.CopyFrom(self.moduleConfig.range_test)
    elif config_name == "telemetry":
        p.set_module_config.telemetry.CopyFrom(self.moduleConfig.telemetry)
    elif config_name == "canned_message":
        p.set_module_config.canned_message.CopyFrom(
            self.moduleConfig.canned_message
        )
    elif config_name == "audio":
        p.set_module_config.audio.CopyFrom(self.moduleConfig.audio)
    elif config_name == "remote_hardware":
        p.set_module_config.remote_hardware.CopyFrom(
            self.moduleConfig.remote_hardware
        )
    elif config_name == "neighbor_info":
        p.set_module_config.neighbor_info.CopyFrom(self.moduleConfig.neighbor_info)
    elif config_name == "detection_sensor":
        p.set_module_config.detection_sensor.CopyFrom(self.moduleConfig.detection_sensor)
    elif config_name == "ambient_lighting":
        p.set_module_config.ambient_lighting.CopyFrom(self.moduleConfig.ambient_lighting)
    elif config_name == "paxcounter":
        p.set_module_config.paxcounter.CopyFrom(self.moduleConfig.paxcounter)
    else:
        our_exit(f"Error: No valid config with name {config_name}")

    logging.debug(f"Wrote: {config_name}")
    if self == self.iface.localNode:
        onResponse = None
    else:
        onResponse = self.onAckNak
    self._sendAdmin(p, onResponse=onResponse)