Module meshtastic.remote_hardware
Remote hardware
Expand source code
"""Remote hardware
"""
import logging
from pubsub import pub # type: ignore[import-untyped]
from meshtastic.protobuf import portnums_pb2, remote_hardware_pb2
from meshtastic.util import our_exit
def onGPIOreceive(packet, interface) -> None:
"""Callback for received GPIO responses"""
logging.debug(f"packet:{packet} interface:{interface}")
gpioValue = 0
hw = packet["decoded"]["remotehw"]
if "gpioValue" in hw:
gpioValue = hw["gpioValue"]
else:
if not "gpioMask" in hw:
# we did get a reply, but due to protobufs, 0 for numeric value is not sent
# see https://developers.google.com/protocol-buffers/docs/proto3#default
# so, we set it here
gpioValue = 0
# print(f'mask:{interface.mask}')
value = int(gpioValue) & int(interface.mask)
print(
f'Received RemoteHardware type={hw["type"]}, gpio_value={gpioValue} value={value}'
)
interface.gotResponse = True
class RemoteHardwareClient:
"""
This is the client code to control/monitor simple hardware built into the
meshtastic devices. It is intended to be both a useful API/service and example
code for how you can connect to your own custom meshtastic services
"""
def __init__(self, iface) -> None:
"""
Constructor
iface is the already open MeshInterface instance
"""
self.iface = iface
ch = iface.localNode.getChannelByName("gpio")
if not ch:
our_exit(
"Warning: No channel named 'gpio' was found.\n"
"On the sending and receive nodes create a channel named 'gpio'.\n"
"For example, run '--ch-add gpio' on one device, then '--seturl' on\n"
"the other devices using the url from the device where the channel was added."
)
self.channelIndex = ch.index
pub.subscribe(onGPIOreceive, "meshtastic.receive.remotehw")
def _sendHardware(self, nodeid, r, wantResponse=False, onResponse=None):
if not nodeid:
our_exit(
r"Warning: Must use a destination node ID for this operation (use --dest \!xxxxxxxxx)"
)
return self.iface.sendData(
r,
nodeid,
portnums_pb2.REMOTE_HARDWARE_APP,
wantAck=True,
channelIndex=self.channelIndex,
wantResponse=wantResponse,
onResponse=onResponse,
)
def writeGPIOs(self, nodeid, mask, vals):
"""
Write the specified vals bits to the device GPIOs. Only bits in mask that
are 1 will be changed
"""
logging.debug(f"writeGPIOs nodeid:{nodeid} mask:{mask} vals:{vals}")
r = remote_hardware_pb2.HardwareMessage()
r.type = remote_hardware_pb2.HardwareMessage.Type.WRITE_GPIOS
r.gpio_mask = mask
r.gpio_value = vals
return self._sendHardware(nodeid, r)
def readGPIOs(self, nodeid, mask, onResponse=None):
"""Read the specified bits from GPIO inputs on the device"""
logging.debug(f"readGPIOs nodeid:{nodeid} mask:{mask}")
r = remote_hardware_pb2.HardwareMessage()
r.type = remote_hardware_pb2.HardwareMessage.Type.READ_GPIOS
r.gpio_mask = mask
return self._sendHardware(nodeid, r, wantResponse=True, onResponse=onResponse)
def watchGPIOs(self, nodeid, mask):
"""Watch the specified bits from GPIO inputs on the device for changes"""
logging.debug(f"watchGPIOs nodeid:{nodeid} mask:{mask}")
r = remote_hardware_pb2.HardwareMessage()
r.type = remote_hardware_pb2.HardwareMessage.Type.WATCH_GPIOS
r.gpio_mask = mask
self.iface.mask = mask
return self._sendHardware(nodeid, r)
Functions
def onGPIOreceive(packet, interface) ‑> None
-
Callback for received GPIO responses
Expand source code
def onGPIOreceive(packet, interface) -> None: """Callback for received GPIO responses""" logging.debug(f"packet:{packet} interface:{interface}") gpioValue = 0 hw = packet["decoded"]["remotehw"] if "gpioValue" in hw: gpioValue = hw["gpioValue"] else: if not "gpioMask" in hw: # we did get a reply, but due to protobufs, 0 for numeric value is not sent # see https://developers.google.com/protocol-buffers/docs/proto3#default # so, we set it here gpioValue = 0 # print(f'mask:{interface.mask}') value = int(gpioValue) & int(interface.mask) print( f'Received RemoteHardware type={hw["type"]}, gpio_value={gpioValue} value={value}' ) interface.gotResponse = True
Classes
class RemoteHardwareClient (iface)
-
This is the client code to control/monitor simple hardware built into the meshtastic devices. It is intended to be both a useful API/service and example code for how you can connect to your own custom meshtastic services
Constructor
iface is the already open MeshInterface instance
Expand source code
class RemoteHardwareClient: """ This is the client code to control/monitor simple hardware built into the meshtastic devices. It is intended to be both a useful API/service and example code for how you can connect to your own custom meshtastic services """ def __init__(self, iface) -> None: """ Constructor iface is the already open MeshInterface instance """ self.iface = iface ch = iface.localNode.getChannelByName("gpio") if not ch: our_exit( "Warning: No channel named 'gpio' was found.\n" "On the sending and receive nodes create a channel named 'gpio'.\n" "For example, run '--ch-add gpio' on one device, then '--seturl' on\n" "the other devices using the url from the device where the channel was added." ) self.channelIndex = ch.index pub.subscribe(onGPIOreceive, "meshtastic.receive.remotehw") def _sendHardware(self, nodeid, r, wantResponse=False, onResponse=None): if not nodeid: our_exit( r"Warning: Must use a destination node ID for this operation (use --dest \!xxxxxxxxx)" ) return self.iface.sendData( r, nodeid, portnums_pb2.REMOTE_HARDWARE_APP, wantAck=True, channelIndex=self.channelIndex, wantResponse=wantResponse, onResponse=onResponse, ) def writeGPIOs(self, nodeid, mask, vals): """ Write the specified vals bits to the device GPIOs. Only bits in mask that are 1 will be changed """ logging.debug(f"writeGPIOs nodeid:{nodeid} mask:{mask} vals:{vals}") r = remote_hardware_pb2.HardwareMessage() r.type = remote_hardware_pb2.HardwareMessage.Type.WRITE_GPIOS r.gpio_mask = mask r.gpio_value = vals return self._sendHardware(nodeid, r) def readGPIOs(self, nodeid, mask, onResponse=None): """Read the specified bits from GPIO inputs on the device""" logging.debug(f"readGPIOs nodeid:{nodeid} mask:{mask}") r = remote_hardware_pb2.HardwareMessage() r.type = remote_hardware_pb2.HardwareMessage.Type.READ_GPIOS r.gpio_mask = mask return self._sendHardware(nodeid, r, wantResponse=True, onResponse=onResponse) def watchGPIOs(self, nodeid, mask): """Watch the specified bits from GPIO inputs on the device for changes""" logging.debug(f"watchGPIOs nodeid:{nodeid} mask:{mask}") r = remote_hardware_pb2.HardwareMessage() r.type = remote_hardware_pb2.HardwareMessage.Type.WATCH_GPIOS r.gpio_mask = mask self.iface.mask = mask return self._sendHardware(nodeid, r)
Methods
def readGPIOs(self, nodeid, mask, onResponse=None)
-
Read the specified bits from GPIO inputs on the device
Expand source code
def readGPIOs(self, nodeid, mask, onResponse=None): """Read the specified bits from GPIO inputs on the device""" logging.debug(f"readGPIOs nodeid:{nodeid} mask:{mask}") r = remote_hardware_pb2.HardwareMessage() r.type = remote_hardware_pb2.HardwareMessage.Type.READ_GPIOS r.gpio_mask = mask return self._sendHardware(nodeid, r, wantResponse=True, onResponse=onResponse)
def watchGPIOs(self, nodeid, mask)
-
Watch the specified bits from GPIO inputs on the device for changes
Expand source code
def watchGPIOs(self, nodeid, mask): """Watch the specified bits from GPIO inputs on the device for changes""" logging.debug(f"watchGPIOs nodeid:{nodeid} mask:{mask}") r = remote_hardware_pb2.HardwareMessage() r.type = remote_hardware_pb2.HardwareMessage.Type.WATCH_GPIOS r.gpio_mask = mask self.iface.mask = mask return self._sendHardware(nodeid, r)
def writeGPIOs(self, nodeid, mask, vals)
-
Write the specified vals bits to the device GPIOs. Only bits in mask that are 1 will be changed
Expand source code
def writeGPIOs(self, nodeid, mask, vals): """ Write the specified vals bits to the device GPIOs. Only bits in mask that are 1 will be changed """ logging.debug(f"writeGPIOs nodeid:{nodeid} mask:{mask} vals:{vals}") r = remote_hardware_pb2.HardwareMessage() r.type = remote_hardware_pb2.HardwareMessage.Type.WRITE_GPIOS r.gpio_mask = mask r.gpio_value = vals return self._sendHardware(nodeid, r)