Module meshtastic.ble_interface

Bluetooth interface

Expand source code
"""Bluetooth interface
"""
import asyncio
import atexit
import logging
import struct
import time
import io
from threading import Thread
from typing import List, Optional

import google.protobuf
from bleak import BleakClient, BleakScanner, BLEDevice
from bleak.exc import BleakDBusError, BleakError

from meshtastic.mesh_interface import MeshInterface

from .protobuf import mesh_pb2

SERVICE_UUID = "6ba1b218-15a8-461f-9fa8-5dcae273eafd"
TORADIO_UUID = "f75c76d2-129e-4dad-a1dd-7866124401e7"
FROMRADIO_UUID = "2c55e69e-4993-11ed-b878-0242ac120002"
FROMNUM_UUID = "ed9da18c-a800-4f66-a670-aa7547e34453"
LEGACY_LOGRADIO_UUID = "6c6fd238-78fa-436b-aacf-15c5be1ef2e2"
LOGRADIO_UUID = "5a3d6e49-06e6-4423-9944-e9de8cdf9547"


class BLEInterface(MeshInterface):
    """MeshInterface using BLE to connect to devices."""

    class BLEError(Exception):
        """An exception class for BLE errors."""

    def __init__(
        self,
        address: Optional[str],
        noProto: bool = False,
        debugOut: Optional[io.TextIOWrapper]=None,
        noNodes: bool = False,
    ) -> None:
        MeshInterface.__init__(
            self, debugOut=debugOut, noProto=noProto, noNodes=noNodes
        )

        self.should_read = False

        logging.debug("Threads starting")
        self._want_receive = True
        self._receiveThread: Optional[Thread] = Thread(
            target=self._receiveFromRadioImpl, name="BLEReceive", daemon=True
        )
        self._receiveThread.start()
        logging.debug("Threads running")

        self.client: Optional[BLEClient] = None
        try:
            logging.debug(f"BLE connecting to: {address if address else 'any'}")
            self.client = self.connect(address)
            logging.debug("BLE connected")
        except BLEInterface.BLEError as e:
            self.close()
            raise e

        if self.client.has_characteristic(LEGACY_LOGRADIO_UUID):
            self.client.start_notify(
                LEGACY_LOGRADIO_UUID, self.legacy_log_radio_handler
            )

        if self.client.has_characteristic(LOGRADIO_UUID):
            self.client.start_notify(LOGRADIO_UUID, self.log_radio_handler)

        logging.debug("Mesh configure starting")
        self._startConfig()
        if not self.noProto:
            self._waitConnected(timeout=60.0)
            self.waitForConfig()

        logging.debug("Register FROMNUM notify callback")
        self.client.start_notify(FROMNUM_UUID, self.from_num_handler)

        # We MUST run atexit (if we can) because otherwise (at least on linux) the BLE device is not disconnected
        # and future connection attempts will fail.  (BlueZ kinda sucks)
        # Note: the on disconnected callback will call our self.close which will make us nicely wait for threads to exit
        self._exit_handler = atexit.register(self.client.disconnect)

    def from_num_handler(self, _, b: bytes) -> None:  # pylint: disable=C0116
        """Handle callbacks for fromnum notify.
        Note: this method does not need to be async because it is just setting a bool.
        """
        from_num = struct.unpack("<I", bytes(b))[0]
        logging.debug(f"FROMNUM notify: {from_num}")
        self.should_read = True

    async def log_radio_handler(self, _, b):  # pylint: disable=C0116
        log_record = mesh_pb2.LogRecord()
        try:
            log_record.ParseFromString(bytes(b))

            message = (
                f"[{log_record.source}] {log_record.message}"
                if log_record.source
                else log_record.message
            )
            self._handleLogLine(message)
        except google.protobuf.message.DecodeError:
            logging.warning("Malformed LogRecord received. Skipping.")

    async def legacy_log_radio_handler(self, _, b):  # pylint: disable=C0116
        log_radio = b.decode("utf-8").replace("\n", "")
        self._handleLogLine(log_radio)

    @staticmethod
    def scan() -> List[BLEDevice]:
        """Scan for available BLE devices."""
        with BLEClient() as client:
            logging.info("Scanning for BLE devices (takes 10 seconds)...")
            response = client.discover(
                timeout=10, return_adv=True, service_uuids=[SERVICE_UUID]
            )

            devices = response.values()

            # bleak sometimes returns devices we didn't ask for, so filter the response
            # to only return true meshtastic devices
            # d[0] is the device. d[1] is the advertisement data
            devices = list(
                filter(lambda d: SERVICE_UUID in d[1].service_uuids, devices)
            )
            return list(map(lambda d: d[0], devices))

    def find_device(self, address: Optional[str]) -> BLEDevice:
        """Find a device by address."""

        addressed_devices = BLEInterface.scan()

        if address:
            addressed_devices = list(
                filter(
                    lambda x: address in (x.name, x.address),
                    addressed_devices,
                )
            )

        if len(addressed_devices) == 0:
            raise BLEInterface.BLEError(
                f"No Meshtastic BLE peripheral with identifier or address '{address}' found. Try --ble-scan to find it."
            )
        if len(addressed_devices) > 1:
            raise BLEInterface.BLEError(
                f"More than one Meshtastic BLE peripheral with identifier or address '{address}' found."
            )
        return addressed_devices[0]

    def _sanitize_address(self, address: Optional[str]) -> Optional[str]:  # pylint: disable=E0213
        "Standardize BLE address by removing extraneous characters and lowercasing."
        if address is None:
            return None
        else:
            return address.replace("-", "").replace("_", "").replace(":", "").lower()

    def connect(self, address: Optional[str] = None) -> "BLEClient":
        "Connect to a device by address."

        # Bleak docs recommend always doing a scan before connecting (even if we know addr)
        device = self.find_device(address)
        client = BLEClient(device.address, disconnected_callback=lambda _: self.close)
        client.connect()
        client.discover()
        return client

    def _receiveFromRadioImpl(self) -> None:
        while self._want_receive:
            if self.should_read:
                self.should_read = False
                retries: int = 0
                while self._want_receive:
                    if self.client is None:
                        logging.debug(f"BLE client is None, shutting down")
                        self._want_receive = False
                        continue
                    try:
                        b = bytes(self.client.read_gatt_char(FROMRADIO_UUID))
                    except BleakDBusError as e:
                        # Device disconnected probably, so end our read loop immediately
                        logging.debug(f"Device disconnected, shutting down {e}")
                        self._want_receive = False
                    except BleakError as e:
                        # We were definitely disconnected
                        if "Not connected" in str(e):
                            logging.debug(f"Device disconnected, shutting down {e}")
                            self._want_receive = False
                        else:
                            raise BLEInterface.BLEError("Error reading BLE") from e
                    if not b:
                        if retries < 5:
                            time.sleep(0.1)
                            retries += 1
                            continue
                        break
                    logging.debug(f"FROMRADIO read: {b.hex()}")
                    self._handleFromRadio(b)
            else:
                time.sleep(0.01)

    def _sendToRadioImpl(self, toRadio) -> None:
        b: bytes = toRadio.SerializeToString()
        if b and self.client:  # we silently ignore writes while we are shutting down
            logging.debug(f"TORADIO write: {b.hex()}")
            try:
                self.client.write_gatt_char(
                    TORADIO_UUID, b, response=True
                )  # FIXME: or False?
                # search Bleak src for org.bluez.Error.InProgress
            except Exception as e:
                raise BLEInterface.BLEError(
                    "Error writing BLE (are you in the 'bluetooth' user group? did you enter the pairing PIN on your computer?)"
                ) from e
            # Allow to propagate and then make sure we read
            time.sleep(0.01)
            self.should_read = True

    def close(self) -> None:
        try:
            MeshInterface.close(self)
        except Exception as e:
            logging.error(f"Error closing mesh interface: {e}")

        if self._want_receive:
            self._want_receive = False  # Tell the thread we want it to stop
            if self._receiveThread:
                self._receiveThread.join(
                    timeout=2
                )  # If bleak is hung, don't wait for the thread to exit (it is critical we disconnect)
                self._receiveThread = None

        if self.client:
            atexit.unregister(self._exit_handler)
            self.client.disconnect()
            self.client.close()
            self.client = None
        self._disconnected() # send the disconnected indicator up to clients


class BLEClient:
    """Client for managing connection to a BLE device"""

    def __init__(self, address=None, **kwargs) -> None:
        self._eventLoop = asyncio.new_event_loop()
        self._eventThread = Thread(
            target=self._run_event_loop, name="BLEClient", daemon=True
        )
        self._eventThread.start()

        if not address:
            logging.debug("No address provided - only discover method will work.")
            return

        self.bleak_client = BleakClient(address, **kwargs)

    def discover(self, **kwargs):  # pylint: disable=C0116
        return self.async_await(BleakScanner.discover(**kwargs))

    def pair(self, **kwargs):  # pylint: disable=C0116
        return self.async_await(self.bleak_client.pair(**kwargs))

    def connect(self, **kwargs):  # pylint: disable=C0116
        return self.async_await(self.bleak_client.connect(**kwargs))

    def disconnect(self, **kwargs):  # pylint: disable=C0116
        self.async_await(self.bleak_client.disconnect(**kwargs))

    def read_gatt_char(self, *args, **kwargs):  # pylint: disable=C0116
        return self.async_await(self.bleak_client.read_gatt_char(*args, **kwargs))

    def write_gatt_char(self, *args, **kwargs):  # pylint: disable=C0116
        self.async_await(self.bleak_client.write_gatt_char(*args, **kwargs))

    def has_characteristic(self, specifier):
        """Check if the connected node supports a specified characteristic."""
        return bool(self.bleak_client.services.get_characteristic(specifier))

    def start_notify(self, *args, **kwargs):  # pylint: disable=C0116
        self.async_await(self.bleak_client.start_notify(*args, **kwargs))

    def close(self):  # pylint: disable=C0116
        self.async_run(self._stop_event_loop())
        self._eventThread.join()

    def __enter__(self):
        return self

    def __exit__(self, _type, _value, _traceback):
        self.close()

    def async_await(self, coro, timeout=None):  # pylint: disable=C0116
        return self.async_run(coro).result(timeout)

    def async_run(self, coro):  # pylint: disable=C0116
        return asyncio.run_coroutine_threadsafe(coro, self._eventLoop)

    def _run_event_loop(self):
        try:
            self._eventLoop.run_forever()
        finally:
            self._eventLoop.close()

    async def _stop_event_loop(self):
        self._eventLoop.stop()

Classes

class BLEClient (address=None, **kwargs)

Client for managing connection to a BLE device

Expand source code
class BLEClient:
    """Client for managing connection to a BLE device"""

    def __init__(self, address=None, **kwargs) -> None:
        self._eventLoop = asyncio.new_event_loop()
        self._eventThread = Thread(
            target=self._run_event_loop, name="BLEClient", daemon=True
        )
        self._eventThread.start()

        if not address:
            logging.debug("No address provided - only discover method will work.")
            return

        self.bleak_client = BleakClient(address, **kwargs)

    def discover(self, **kwargs):  # pylint: disable=C0116
        return self.async_await(BleakScanner.discover(**kwargs))

    def pair(self, **kwargs):  # pylint: disable=C0116
        return self.async_await(self.bleak_client.pair(**kwargs))

    def connect(self, **kwargs):  # pylint: disable=C0116
        return self.async_await(self.bleak_client.connect(**kwargs))

    def disconnect(self, **kwargs):  # pylint: disable=C0116
        self.async_await(self.bleak_client.disconnect(**kwargs))

    def read_gatt_char(self, *args, **kwargs):  # pylint: disable=C0116
        return self.async_await(self.bleak_client.read_gatt_char(*args, **kwargs))

    def write_gatt_char(self, *args, **kwargs):  # pylint: disable=C0116
        self.async_await(self.bleak_client.write_gatt_char(*args, **kwargs))

    def has_characteristic(self, specifier):
        """Check if the connected node supports a specified characteristic."""
        return bool(self.bleak_client.services.get_characteristic(specifier))

    def start_notify(self, *args, **kwargs):  # pylint: disable=C0116
        self.async_await(self.bleak_client.start_notify(*args, **kwargs))

    def close(self):  # pylint: disable=C0116
        self.async_run(self._stop_event_loop())
        self._eventThread.join()

    def __enter__(self):
        return self

    def __exit__(self, _type, _value, _traceback):
        self.close()

    def async_await(self, coro, timeout=None):  # pylint: disable=C0116
        return self.async_run(coro).result(timeout)

    def async_run(self, coro):  # pylint: disable=C0116
        return asyncio.run_coroutine_threadsafe(coro, self._eventLoop)

    def _run_event_loop(self):
        try:
            self._eventLoop.run_forever()
        finally:
            self._eventLoop.close()

    async def _stop_event_loop(self):
        self._eventLoop.stop()

Methods

def async_await(self, coro, timeout=None)
Expand source code
def async_await(self, coro, timeout=None):  # pylint: disable=C0116
    return self.async_run(coro).result(timeout)
def async_run(self, coro)
Expand source code
def async_run(self, coro):  # pylint: disable=C0116
    return asyncio.run_coroutine_threadsafe(coro, self._eventLoop)
def close(self)
Expand source code
def close(self):  # pylint: disable=C0116
    self.async_run(self._stop_event_loop())
    self._eventThread.join()
def connect(self, **kwargs)
Expand source code
def connect(self, **kwargs):  # pylint: disable=C0116
    return self.async_await(self.bleak_client.connect(**kwargs))
def disconnect(self, **kwargs)
Expand source code
def disconnect(self, **kwargs):  # pylint: disable=C0116
    self.async_await(self.bleak_client.disconnect(**kwargs))
def discover(self, **kwargs)
Expand source code
def discover(self, **kwargs):  # pylint: disable=C0116
    return self.async_await(BleakScanner.discover(**kwargs))
def has_characteristic(self, specifier)

Check if the connected node supports a specified characteristic.

Expand source code
def has_characteristic(self, specifier):
    """Check if the connected node supports a specified characteristic."""
    return bool(self.bleak_client.services.get_characteristic(specifier))
def pair(self, **kwargs)
Expand source code
def pair(self, **kwargs):  # pylint: disable=C0116
    return self.async_await(self.bleak_client.pair(**kwargs))
def read_gatt_char(self, *args, **kwargs)
Expand source code
def read_gatt_char(self, *args, **kwargs):  # pylint: disable=C0116
    return self.async_await(self.bleak_client.read_gatt_char(*args, **kwargs))
def start_notify(self, *args, **kwargs)
Expand source code
def start_notify(self, *args, **kwargs):  # pylint: disable=C0116
    self.async_await(self.bleak_client.start_notify(*args, **kwargs))
def write_gatt_char(self, *args, **kwargs)
Expand source code
def write_gatt_char(self, *args, **kwargs):  # pylint: disable=C0116
    self.async_await(self.bleak_client.write_gatt_char(*args, **kwargs))
class BLEInterface (address: Optional[str], noProto: bool = False, debugOut: Optional[_io.TextIOWrapper] = None, noNodes: bool = False)

MeshInterface using BLE to connect to devices.

Constructor

Keyword Arguments: noProto – If True, don't try to run our protocol on the link - just be a dumb serial client. noNodes – If True, instruct the node to not send its nodedb on startup, just other configuration information.

Expand source code
class BLEInterface(MeshInterface):
    """MeshInterface using BLE to connect to devices."""

    class BLEError(Exception):
        """An exception class for BLE errors."""

    def __init__(
        self,
        address: Optional[str],
        noProto: bool = False,
        debugOut: Optional[io.TextIOWrapper]=None,
        noNodes: bool = False,
    ) -> None:
        MeshInterface.__init__(
            self, debugOut=debugOut, noProto=noProto, noNodes=noNodes
        )

        self.should_read = False

        logging.debug("Threads starting")
        self._want_receive = True
        self._receiveThread: Optional[Thread] = Thread(
            target=self._receiveFromRadioImpl, name="BLEReceive", daemon=True
        )
        self._receiveThread.start()
        logging.debug("Threads running")

        self.client: Optional[BLEClient] = None
        try:
            logging.debug(f"BLE connecting to: {address if address else 'any'}")
            self.client = self.connect(address)
            logging.debug("BLE connected")
        except BLEInterface.BLEError as e:
            self.close()
            raise e

        if self.client.has_characteristic(LEGACY_LOGRADIO_UUID):
            self.client.start_notify(
                LEGACY_LOGRADIO_UUID, self.legacy_log_radio_handler
            )

        if self.client.has_characteristic(LOGRADIO_UUID):
            self.client.start_notify(LOGRADIO_UUID, self.log_radio_handler)

        logging.debug("Mesh configure starting")
        self._startConfig()
        if not self.noProto:
            self._waitConnected(timeout=60.0)
            self.waitForConfig()

        logging.debug("Register FROMNUM notify callback")
        self.client.start_notify(FROMNUM_UUID, self.from_num_handler)

        # We MUST run atexit (if we can) because otherwise (at least on linux) the BLE device is not disconnected
        # and future connection attempts will fail.  (BlueZ kinda sucks)
        # Note: the on disconnected callback will call our self.close which will make us nicely wait for threads to exit
        self._exit_handler = atexit.register(self.client.disconnect)

    def from_num_handler(self, _, b: bytes) -> None:  # pylint: disable=C0116
        """Handle callbacks for fromnum notify.
        Note: this method does not need to be async because it is just setting a bool.
        """
        from_num = struct.unpack("<I", bytes(b))[0]
        logging.debug(f"FROMNUM notify: {from_num}")
        self.should_read = True

    async def log_radio_handler(self, _, b):  # pylint: disable=C0116
        log_record = mesh_pb2.LogRecord()
        try:
            log_record.ParseFromString(bytes(b))

            message = (
                f"[{log_record.source}] {log_record.message}"
                if log_record.source
                else log_record.message
            )
            self._handleLogLine(message)
        except google.protobuf.message.DecodeError:
            logging.warning("Malformed LogRecord received. Skipping.")

    async def legacy_log_radio_handler(self, _, b):  # pylint: disable=C0116
        log_radio = b.decode("utf-8").replace("\n", "")
        self._handleLogLine(log_radio)

    @staticmethod
    def scan() -> List[BLEDevice]:
        """Scan for available BLE devices."""
        with BLEClient() as client:
            logging.info("Scanning for BLE devices (takes 10 seconds)...")
            response = client.discover(
                timeout=10, return_adv=True, service_uuids=[SERVICE_UUID]
            )

            devices = response.values()

            # bleak sometimes returns devices we didn't ask for, so filter the response
            # to only return true meshtastic devices
            # d[0] is the device. d[1] is the advertisement data
            devices = list(
                filter(lambda d: SERVICE_UUID in d[1].service_uuids, devices)
            )
            return list(map(lambda d: d[0], devices))

    def find_device(self, address: Optional[str]) -> BLEDevice:
        """Find a device by address."""

        addressed_devices = BLEInterface.scan()

        if address:
            addressed_devices = list(
                filter(
                    lambda x: address in (x.name, x.address),
                    addressed_devices,
                )
            )

        if len(addressed_devices) == 0:
            raise BLEInterface.BLEError(
                f"No Meshtastic BLE peripheral with identifier or address '{address}' found. Try --ble-scan to find it."
            )
        if len(addressed_devices) > 1:
            raise BLEInterface.BLEError(
                f"More than one Meshtastic BLE peripheral with identifier or address '{address}' found."
            )
        return addressed_devices[0]

    def _sanitize_address(self, address: Optional[str]) -> Optional[str]:  # pylint: disable=E0213
        "Standardize BLE address by removing extraneous characters and lowercasing."
        if address is None:
            return None
        else:
            return address.replace("-", "").replace("_", "").replace(":", "").lower()

    def connect(self, address: Optional[str] = None) -> "BLEClient":
        "Connect to a device by address."

        # Bleak docs recommend always doing a scan before connecting (even if we know addr)
        device = self.find_device(address)
        client = BLEClient(device.address, disconnected_callback=lambda _: self.close)
        client.connect()
        client.discover()
        return client

    def _receiveFromRadioImpl(self) -> None:
        while self._want_receive:
            if self.should_read:
                self.should_read = False
                retries: int = 0
                while self._want_receive:
                    if self.client is None:
                        logging.debug(f"BLE client is None, shutting down")
                        self._want_receive = False
                        continue
                    try:
                        b = bytes(self.client.read_gatt_char(FROMRADIO_UUID))
                    except BleakDBusError as e:
                        # Device disconnected probably, so end our read loop immediately
                        logging.debug(f"Device disconnected, shutting down {e}")
                        self._want_receive = False
                    except BleakError as e:
                        # We were definitely disconnected
                        if "Not connected" in str(e):
                            logging.debug(f"Device disconnected, shutting down {e}")
                            self._want_receive = False
                        else:
                            raise BLEInterface.BLEError("Error reading BLE") from e
                    if not b:
                        if retries < 5:
                            time.sleep(0.1)
                            retries += 1
                            continue
                        break
                    logging.debug(f"FROMRADIO read: {b.hex()}")
                    self._handleFromRadio(b)
            else:
                time.sleep(0.01)

    def _sendToRadioImpl(self, toRadio) -> None:
        b: bytes = toRadio.SerializeToString()
        if b and self.client:  # we silently ignore writes while we are shutting down
            logging.debug(f"TORADIO write: {b.hex()}")
            try:
                self.client.write_gatt_char(
                    TORADIO_UUID, b, response=True
                )  # FIXME: or False?
                # search Bleak src for org.bluez.Error.InProgress
            except Exception as e:
                raise BLEInterface.BLEError(
                    "Error writing BLE (are you in the 'bluetooth' user group? did you enter the pairing PIN on your computer?)"
                ) from e
            # Allow to propagate and then make sure we read
            time.sleep(0.01)
            self.should_read = True

    def close(self) -> None:
        try:
            MeshInterface.close(self)
        except Exception as e:
            logging.error(f"Error closing mesh interface: {e}")

        if self._want_receive:
            self._want_receive = False  # Tell the thread we want it to stop
            if self._receiveThread:
                self._receiveThread.join(
                    timeout=2
                )  # If bleak is hung, don't wait for the thread to exit (it is critical we disconnect)
                self._receiveThread = None

        if self.client:
            atexit.unregister(self._exit_handler)
            self.client.disconnect()
            self.client.close()
            self.client = None
        self._disconnected() # send the disconnected indicator up to clients

Ancestors

Class variables

var BLEError

An exception class for BLE errors.

Static methods

def scan() ‑> List[bleak.backends.device.BLEDevice]

Scan for available BLE devices.

Expand source code
@staticmethod
def scan() -> List[BLEDevice]:
    """Scan for available BLE devices."""
    with BLEClient() as client:
        logging.info("Scanning for BLE devices (takes 10 seconds)...")
        response = client.discover(
            timeout=10, return_adv=True, service_uuids=[SERVICE_UUID]
        )

        devices = response.values()

        # bleak sometimes returns devices we didn't ask for, so filter the response
        # to only return true meshtastic devices
        # d[0] is the device. d[1] is the advertisement data
        devices = list(
            filter(lambda d: SERVICE_UUID in d[1].service_uuids, devices)
        )
        return list(map(lambda d: d[0], devices))

Methods

def connect(self, address: Optional[str] = None) ‑> BLEClient

Connect to a device by address.

Expand source code
def connect(self, address: Optional[str] = None) -> "BLEClient":
    "Connect to a device by address."

    # Bleak docs recommend always doing a scan before connecting (even if we know addr)
    device = self.find_device(address)
    client = BLEClient(device.address, disconnected_callback=lambda _: self.close)
    client.connect()
    client.discover()
    return client
def find_device(self, address: Optional[str]) ‑> bleak.backends.device.BLEDevice

Find a device by address.

Expand source code
def find_device(self, address: Optional[str]) -> BLEDevice:
    """Find a device by address."""

    addressed_devices = BLEInterface.scan()

    if address:
        addressed_devices = list(
            filter(
                lambda x: address in (x.name, x.address),
                addressed_devices,
            )
        )

    if len(addressed_devices) == 0:
        raise BLEInterface.BLEError(
            f"No Meshtastic BLE peripheral with identifier or address '{address}' found. Try --ble-scan to find it."
        )
    if len(addressed_devices) > 1:
        raise BLEInterface.BLEError(
            f"More than one Meshtastic BLE peripheral with identifier or address '{address}' found."
        )
    return addressed_devices[0]
def from_num_handler(self, _, b: bytes) ‑> None

Handle callbacks for fromnum notify. Note: this method does not need to be async because it is just setting a bool.

Expand source code
def from_num_handler(self, _, b: bytes) -> None:  # pylint: disable=C0116
    """Handle callbacks for fromnum notify.
    Note: this method does not need to be async because it is just setting a bool.
    """
    from_num = struct.unpack("<I", bytes(b))[0]
    logging.debug(f"FROMNUM notify: {from_num}")
    self.should_read = True
async def legacy_log_radio_handler(self, _, b)
Expand source code
async def legacy_log_radio_handler(self, _, b):  # pylint: disable=C0116
    log_radio = b.decode("utf-8").replace("\n", "")
    self._handleLogLine(log_radio)
async def log_radio_handler(self, _, b)
Expand source code
async def log_radio_handler(self, _, b):  # pylint: disable=C0116
    log_record = mesh_pb2.LogRecord()
    try:
        log_record.ParseFromString(bytes(b))

        message = (
            f"[{log_record.source}] {log_record.message}"
            if log_record.source
            else log_record.message
        )
        self._handleLogLine(message)
    except google.protobuf.message.DecodeError:
        logging.warning("Malformed LogRecord received. Skipping.")

Inherited members