Module meshtastic.tests.test_mesh_interface
Meshtastic unit tests for
Expand source code
"""Meshtastic unit tests for"""
import logging
import re
from unittest.mock import MagicMock, patch
import pytest
from hypothesis import given, strategies as st
from ..protobuf import mesh_pb2, config_pb2
from ..mesh_interface import MeshInterface, _timeago
from ..node import Node
from ..slog import LogSet
from ..powermon import SimPowerSupply
# from ..config import Config
from ..util import Timeout
def test_MeshInterface(capsys):
"""Test that we can instantiate a MeshInterface"""
iface = MeshInterface(noProto=True)
NODE_ID = "!9388f81c"
NODE_NUM = 2475227164
node = {
"num": NODE_NUM,
"user": {
"id": NODE_ID,
"longName": "Unknown f81c",
"shortName": "?1C",
"macaddr": "RBeTiPgc",
"hwModel": "TBEAM",
"position": {},
"lastHeard": 1640204888,
iface.nodes = {NODE_ID: node}
iface.nodesByNum = {NODE_NUM: node}
myInfo = MagicMock()
iface.myInfo = myInfo
# Also get some coverage of the structured logging/power meter stuff by turning it on as well
log_set = LogSet(iface, None, SimPowerSupply())
out, err = capsys.readouterr()
assert"Owner: None \(None\)", out, re.MULTILINE)
assert"Nodes", out, re.MULTILINE)
assert"Preferences", out, re.MULTILINE)
assert"Channels", out, re.MULTILINE)
assert"Primary channel URL", out, re.MULTILINE)
assert err == ""
def test_getMyUser(iface_with_nodes):
"""Test getMyUser()"""
iface = iface_with_nodes
iface.myInfo.my_node_num = 2475227164
myuser = iface.getMyUser()
assert myuser is not None
assert myuser["id"] == "!9388f81c"
def test_getLongName(iface_with_nodes):
"""Test getLongName()"""
iface = iface_with_nodes
iface.myInfo.my_node_num = 2475227164
mylongname = iface.getLongName()
assert mylongname == "Unknown f81c"
def test_getShortName(iface_with_nodes):
"""Test getShortName()."""
iface = iface_with_nodes
iface.myInfo.my_node_num = 2475227164
myshortname = iface.getShortName()
assert myshortname == "?1C"
def test_handlePacketFromRadio_no_from(capsys):
"""Test _handlePacketFromRadio with no 'from' in the mesh packet."""
iface = MeshInterface(noProto=True)
meshPacket = mesh_pb2.MeshPacket()
out, err = capsys.readouterr()
assert"Device returned a packet we sent, ignoring", out, re.MULTILINE)
assert err == ""
def test_handlePacketFromRadio_with_a_portnum(caplog):
"""Test _handlePacketFromRadio with a portnum
Since we have an attribute called 'from', we cannot simply 'set' it.
Had to implement a hack just to be able to test some code.
iface = MeshInterface(noProto=True)
meshPacket = mesh_pb2.MeshPacket()
meshPacket.decoded.payload = b""
meshPacket.decoded.portnum = 1
with caplog.at_level(logging.WARNING):
iface._handlePacketFromRadio(meshPacket, hack=True)
assert"Not populating fromId", caplog.text, re.MULTILINE)
def test_handlePacketFromRadio_no_portnum(caplog):
"""Test _handlePacketFromRadio without a portnum"""
iface = MeshInterface(noProto=True)
meshPacket = mesh_pb2.MeshPacket()
meshPacket.decoded.payload = b""
with caplog.at_level(logging.WARNING):
iface._handlePacketFromRadio(meshPacket, hack=True)
assert"Not populating fromId", caplog.text, re.MULTILINE)
def test_getNode_with_local():
"""Test getNode"""
iface = MeshInterface(noProto=True)
anode = iface.getNode(LOCAL_ADDR)
assert anode == iface.localNode
def test_getNode_not_local(caplog):
"""Test getNode not local"""
iface = MeshInterface(noProto=True)
anode = MagicMock(autospec=Node)
with caplog.at_level(logging.DEBUG):
with patch("meshtastic.node.Node", return_value=anode):
another_node = iface.getNode("bar2")
assert another_node != iface.localNode
assert"About to requestChannels", caplog.text, re.MULTILINE)
def test_getNode_not_local_timeout(capsys):
"""Test getNode not local, simulate timeout"""
iface = MeshInterface(noProto=True)
anode = MagicMock(autospec=Node)
anode.waitForConfig.return_value = False
with patch("meshtastic.node.Node", return_value=anode):
with pytest.raises(SystemExit) as pytest_wrapped_e:
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
out, err = capsys.readouterr()
assert re.match(r"Timed out trying to retrieve channel info, retrying", out)
assert err == ""
def test_getNode_not_local_timeout_attempts(capsys):
"""Test getNode not local, simulate timeout"""
iface = MeshInterface(noProto=True)
anode = MagicMock(autospec=Node)
anode.waitForConfig.return_value = False
with patch("meshtastic.node.Node", return_value=anode):
with pytest.raises(SystemExit) as pytest_wrapped_e:
iface.getNode("bar2", requestChannelAttempts=2)
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
out, err = capsys.readouterr()
assert out == 'Timed out trying to retrieve channel info, retrying\nError: Timed out waiting for channels, giving up\n'
assert err == ""
def test_sendPosition(caplog):
"""Test sendPosition"""
iface = MeshInterface(noProto=True)
with caplog.at_level(logging.DEBUG):
# assert"p.time:", caplog.text, re.MULTILINE)
# @pytest.mark.unit
# @pytest.mark.usefixtures("reset_mt_config")
# def test_close_with_heartbeatTimer(caplog):
# """Test close() with heartbeatTimer"""
# iface = MeshInterface(noProto=True)
# anode = Node('foo', 'bar')
# aconfig = Config()
# aonfig.preferences.phone_timeout_secs = 10
# anode.config = aconfig
# iface.localNode = anode
# assert iface.heartbeatTimer is None
# with caplog.at_level(logging.DEBUG):
# iface._startHeartbeat()
# assert iface.heartbeatTimer is not None
# iface.close()
# @pytest.mark.unit
# @pytest.mark.usefixtures("reset_mt_config")
# def test_handleFromRadio_empty_payload(caplog):
# """Test _handleFromRadio"""
# iface = MeshInterface(noProto=True)
# with caplog.at_level(logging.DEBUG):
# iface._handleFromRadio(b'')
# iface.close()
# assert'Unexpected FromRadio payload', caplog.text, re.MULTILINE)
def test_handleFromRadio_with_my_info(caplog):
"""Test _handleFromRadio with my_info"""
# Note: I captured the '--debug --info' for the bytes below.
# It "translates" to this:
# my_info {
# my_node_num: 682584012
# firmware_version: ""
# reboot_count: 13
# bitrate: 17.088470458984375
# message_timeout_msec: 300000
# min_app_version: 20200
# max_channels: 8
# has_wifi: true
# }
from_radio_bytes = b"\x1a,\x08\xcc\xcf\xbd\xc5\x02\x18\r2\x0e1.2.49.5354c49P\r]0\xb5\x88Ah\xe0\xa7\x12p\xe8\x9d\x01x\x08\x90\x01\x01"
iface = MeshInterface(noProto=True)
with caplog.at_level(logging.DEBUG):
assert"Received from radio: my_info {", caplog.text, re.MULTILINE)
assert"my_node_num: 682584012", caplog.text, re.MULTILINE)
def test_handleFromRadio_with_node_info(caplog, capsys):
"""Test _handleFromRadio with node_info"""
# Note: I captured the '--debug --info' for the bytes below.
# It "translates" to this:
# node_info {
# num: 682584012
# user {
# id: "!28af67cc"
# long_name: "Unknown 67cc"
# short_name: "?CC"
# macaddr: "$o(\257g\314"
# hw_model: HELTEC_V2_1
# }
# position {
# }
# }
from_radio_bytes = b'"2\x08\xcc\xcf\xbd\xc5\x02\x12(\n\t!28af67cc\x12\x0cUnknown 67cc\x1a\x03?CC"\x06$o(\xafg\xcc0\n\x1a\x00'
iface = MeshInterface(noProto=True)
with caplog.at_level(logging.DEBUG):
assert"Received from radio: node_info {", caplog.text, re.MULTILINE)
assert"682584012", caplog.text, re.MULTILINE)
# validate some of showNodes() output
out, err = capsys.readouterr()
assert" 1 ", out, re.MULTILINE)
assert"│ Unknown 67cc │ ", out, re.MULTILINE)
assert"│\s+!28af67cc\s+│\s+67cc\s+|", out, re.MULTILINE)
assert err == ""
def test_handleFromRadio_with_node_info_tbeam1(caplog, capsys):
"""Test _handleFromRadio with node_info"""
# Note: Captured the '--debug --info' for the bytes below.
# pylint: disable=C0301
from_radio_bytes = b'"=\x08\x80\xf8\xc8\xf6\x07\x12"\n\t!7ed23c00\x12\x07TBeam 1\x1a\x02T1"\x06\x94\xb9~\xd2<\x000\x04\x1a\x07 ]MN\x01\xbea%\xad\x01\xbea=\x00\x00,A'
iface = MeshInterface(noProto=True)
with caplog.at_level(logging.DEBUG):
assert"Received nodeinfo", caplog.text, re.MULTILINE)
assert"TBeam 1", caplog.text, re.MULTILINE)
assert"2127707136", caplog.text, re.MULTILINE)
# validate some of showNodes() output
out, err = capsys.readouterr()
assert" 1 ", out, re.MULTILINE)
assert"│ TBeam 1 │ ", out, re.MULTILINE)
assert"│ !7ed23c00 │", out, re.MULTILINE)
assert err == ""
def test_handleFromRadio_with_node_info_tbeam_with_bad_data(caplog):
"""Test _handleFromRadio with node_info with some bad data (issue#172) - ensure we do not throw exception"""
# Note: Captured the '--debug --info' for the bytes below.
from_radio_bytes = b'"\x17\x08\xdc\x8a\x8a\xae\x02\x12\x08"\x06\x00\x00\x00\x00\x00\x00\x1a\x00=\x00\x00\xb8@'
iface = MeshInterface(noProto=True)
with caplog.at_level(logging.DEBUG):
def test_MeshInterface_sendToRadioImpl(caplog):
"""Test _sendToRadioImp()"""
iface = MeshInterface(noProto=True)
with caplog.at_level(logging.DEBUG):
assert"Subclass must provide toradio", caplog.text, re.MULTILINE)
def test_MeshInterface_sendToRadio_no_proto(caplog):
"""Test sendToRadio()"""
iface = MeshInterface()
with caplog.at_level(logging.DEBUG):
assert"Subclass must provide toradio", caplog.text, re.MULTILINE)
def test_sendData_too_long(caplog):
"""Test when data payload is too big"""
iface = MeshInterface(noProto=True)
some_large_text = b"This is a long text that will be too long for send text."
some_large_text += b"This is a long text that will be too long for send text."
some_large_text += b"This is a long text that will be too long for send text."
some_large_text += b"This is a long text that will be too long for send text."
some_large_text += b"This is a long text that will be too long for send text."
some_large_text += b"This is a long text that will be too long for send text."
some_large_text += b"This is a long text that will be too long for send text."
some_large_text += b"This is a long text that will be too long for send text."
some_large_text += b"This is a long text that will be too long for send text."
some_large_text += b"This is a long text that will be too long for send text."
some_large_text += b"This is a long text that will be too long for send text."
some_large_text += b"This is a long text that will be too long for send text."
with caplog.at_level(logging.DEBUG):
with pytest.raises(MeshInterface.MeshInterfaceError) as pytest_wrapped_e:
assert"Data payload too big", caplog.text, re.MULTILINE)
assert pytest_wrapped_e.type == MeshInterface.MeshInterfaceError
def test_sendData_unknown_app(capsys):
"""Test sendData when unknown app"""
iface = MeshInterface(noProto=True)
with pytest.raises(SystemExit) as pytest_wrapped_e:
iface.sendData(b"hello", portNum=0)
out, err = capsys.readouterr()
assert"Warning: A non-zero port number", out, re.MULTILINE)
assert err == ""
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
def test_sendPosition_with_a_position(caplog):
"""Test sendPosition when lat/long/alt"""
iface = MeshInterface(noProto=True)
with caplog.at_level(logging.DEBUG):
iface.sendPosition(latitude=40.8, longitude=-111.86, altitude=201)
assert"p.latitude_i:408", caplog.text, re.MULTILINE)
assert"p.longitude_i:-11186", caplog.text, re.MULTILINE)
assert"p.altitude:201", caplog.text, re.MULTILINE)
def test_sendPacket_with_no_destination(capsys):
"""Test _sendPacket()"""
iface = MeshInterface(noProto=True)
with pytest.raises(SystemExit) as pytest_wrapped_e:
iface._sendPacket(b"", destinationId=None)
out, err = capsys.readouterr()
assert"Warning: destinationId must not be None", out, re.MULTILINE)
assert err == ""
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
def test_sendPacket_with_destination_as_int(caplog):
"""Test _sendPacket() with int as a destination"""
iface = MeshInterface(noProto=True)
with caplog.at_level(logging.DEBUG):
meshPacket = mesh_pb2.MeshPacket()
iface._sendPacket(meshPacket, destinationId=123)
assert"Not sending packet", caplog.text, re.MULTILINE)
def test_sendPacket_with_destination_starting_with_a_bang(caplog):
"""Test _sendPacket() with int as a destination"""
iface = MeshInterface(noProto=True)
with caplog.at_level(logging.DEBUG):
meshPacket = mesh_pb2.MeshPacket()
iface._sendPacket(meshPacket, destinationId="!1234")
assert"Not sending packet", caplog.text, re.MULTILINE)
def test_sendPacket_with_destination_as_BROADCAST_ADDR(caplog):
"""Test _sendPacket() with BROADCAST_ADDR as a destination"""
iface = MeshInterface(noProto=True)
with caplog.at_level(logging.DEBUG):
meshPacket = mesh_pb2.MeshPacket()
iface._sendPacket(meshPacket, destinationId=BROADCAST_ADDR)
assert"Not sending packet", caplog.text, re.MULTILINE)
def test_sendPacket_with_destination_as_LOCAL_ADDR_no_myInfo(capsys):
"""Test _sendPacket() with LOCAL_ADDR as a destination with no myInfo"""
iface = MeshInterface(noProto=True)
with pytest.raises(SystemExit) as pytest_wrapped_e:
meshPacket = mesh_pb2.MeshPacket()
iface._sendPacket(meshPacket, destinationId=LOCAL_ADDR)
out, err = capsys.readouterr()
assert"Warning: No myInfo", out, re.MULTILINE)
assert err == ""
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
def test_sendPacket_with_destination_as_LOCAL_ADDR_with_myInfo(caplog):
"""Test _sendPacket() with LOCAL_ADDR as a destination with myInfo"""
iface = MeshInterface(noProto=True)
myInfo = MagicMock()
iface.myInfo = myInfo
iface.myInfo.my_node_num = 1
with caplog.at_level(logging.DEBUG):
meshPacket = mesh_pb2.MeshPacket()
iface._sendPacket(meshPacket, destinationId=LOCAL_ADDR)
assert"Not sending packet", caplog.text, re.MULTILINE)
def test_sendPacket_with_destination_is_blank_with_nodes(capsys, iface_with_nodes):
"""Test _sendPacket() with '' as a destination with myInfo"""
iface = iface_with_nodes
meshPacket = mesh_pb2.MeshPacket()
with pytest.raises(SystemExit) as pytest_wrapped_e:
iface._sendPacket(meshPacket, destinationId="")
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
out, err = capsys.readouterr()
assert re.match(r"Warning: NodeId not found in DB", out, re.MULTILINE)
assert err == ""
def test_sendPacket_with_destination_is_blank_without_nodes(caplog, iface_with_nodes):
"""Test _sendPacket() with '' as a destination with myInfo"""
iface = iface_with_nodes
iface.nodes = None
meshPacket = mesh_pb2.MeshPacket()
with caplog.at_level(logging.WARNING):
iface._sendPacket(meshPacket, destinationId="")
assert"Warning: There were no self.nodes.", caplog.text, re.MULTILINE)
def test_getMyNodeInfo():
"""Test getMyNodeInfo()"""
iface = MeshInterface(noProto=True)
anode = iface.getNode(LOCAL_ADDR)
iface.nodesByNum = {1: anode}
assert iface.nodesByNum.get(1) == anode
myInfo = MagicMock()
iface.myInfo = myInfo
iface.myInfo.my_node_num = 1
myinfo = iface.getMyNodeInfo()
assert myinfo == anode
def test_generatePacketId(capsys):
"""Test _generatePacketId() when no currentPacketId (not connected)"""
iface = MeshInterface(noProto=True)
# not sure when this condition would ever happen... but we can simulate it
iface.currentPacketId = None
assert iface.currentPacketId is None
with pytest.raises(MeshInterface.MeshInterfaceError) as pytest_wrapped_e:
out, err = capsys.readouterr()
r"Not connected yet, can not generate packet", out, re.MULTILINE
assert err == ""
assert pytest_wrapped_e.type == MeshInterface.MeshInterfaceError
def test_fixupPosition_empty_pos():
"""Test _fixupPosition()"""
iface = MeshInterface(noProto=True)
pos = {}
newpos = iface._fixupPosition(pos)
assert newpos == pos
def test_fixupPosition_no_changes_needed():
"""Test _fixupPosition()"""
iface = MeshInterface(noProto=True)
pos = {"latitude": 101, "longitude": 102}
newpos = iface._fixupPosition(pos)
assert newpos == pos
def test_fixupPosition():
"""Test _fixupPosition()"""
iface = MeshInterface(noProto=True)
pos = {"latitudeI": 1010000000, "longitudeI": 1020000000}
newpos = iface._fixupPosition(pos)
assert newpos == {
"latitude": 101.0,
"latitudeI": 1010000000,
"longitude": 102.0,
"longitudeI": 1020000000,
def test_nodeNumToId(iface_with_nodes):
"""Test _nodeNumToId()"""
iface = iface_with_nodes
iface.myInfo.my_node_num = 2475227164
someid = iface._nodeNumToId(2475227164)
assert someid == "!9388f81c"
def test_nodeNumToId_not_found(iface_with_nodes):
"""Test _nodeNumToId()"""
iface = iface_with_nodes
iface.myInfo.my_node_num = 2475227164
someid = iface._nodeNumToId(123)
assert someid is None
def test_nodeNumToId_to_all(iface_with_nodes):
"""Test _nodeNumToId()"""
iface = iface_with_nodes
iface.myInfo.my_node_num = 2475227164
someid = iface._nodeNumToId(0xFFFFFFFF)
assert someid == "^all"
def test_getOrCreateByNum_minimal(iface_with_nodes):
"""Test _getOrCreateByNum()"""
iface = iface_with_nodes
iface.myInfo.my_node_num = 2475227164
tmp = iface._getOrCreateByNum(123)
assert tmp == {"num": 123, "user": {"hwModel": "UNSET", "id": "!0000007b", "shortName": "007b", "longName": "Meshtastic 007b"}}
def test_getOrCreateByNum_not_found(iface_with_nodes):
"""Test _getOrCreateByNum()"""
iface = iface_with_nodes
iface.myInfo.my_node_num = 2475227164
with pytest.raises(MeshInterface.MeshInterfaceError) as pytest_wrapped_e:
assert pytest_wrapped_e.type == MeshInterface.MeshInterfaceError
def test_getOrCreateByNum(iface_with_nodes):
"""Test _getOrCreateByNum()"""
iface = iface_with_nodes
iface.myInfo.my_node_num = 2475227164
tmp = iface._getOrCreateByNum(2475227164)
assert tmp["num"] == 2475227164
# @pytest.mark.unit
# def test_enter():
# """Test __enter__()"""
# iface = MeshInterface(noProto=True)
# assert iface == iface.__enter__()
def test_exit_with_exception(caplog):
"""Test __exit__()"""
iface = MeshInterface(noProto=True)
with caplog.at_level(logging.ERROR):
iface.__exit__("foo", "bar", "baz")
r"An exception of type foo with value bar has occurred",
assert"Traceback: baz", caplog.text, re.MULTILINE)
def test_showNodes_exclude_self(capsys, caplog, iface_with_nodes):
"""Test that we hit that continue statement"""
with caplog.at_level(logging.DEBUG):
iface = iface_with_nodes
iface.localNode.nodeNum = 2475227164
def test_waitForConfig(capsys):
"""Test waitForConfig()"""
iface = MeshInterface(noProto=True)
# override how long to wait
iface._timeout = Timeout(0.01)
with pytest.raises(MeshInterface.MeshInterfaceError) as pytest_wrapped_e:
assert pytest_wrapped_e.type == MeshInterface.MeshInterfaceError
out, err = capsys.readouterr()
r"Exception: Timed out waiting for interface config", err, re.MULTILINE
assert out == ""
def test_waitConnected_raises_an_exception(capsys):
"""Test waitConnected()"""
iface = MeshInterface(noProto=True)
with pytest.raises(MeshInterface.MeshInterfaceError) as pytest_wrapped_e:
iface.failure = MeshInterface.MeshInterfaceError("warn about something")
assert pytest_wrapped_e.type == MeshInterface.MeshInterfaceError
out, err = capsys.readouterr()
assert"warn about something", err, re.MULTILINE)
assert out == ""
def test_waitConnected_isConnected_timeout(capsys):
"""Test waitConnected()"""
with pytest.raises(MeshInterface.MeshInterfaceError) as pytest_wrapped_e:
iface = MeshInterface()
assert pytest_wrapped_e.type == MeshInterface.MeshInterfaceError
out, err = capsys.readouterr()
assert"warn about something", err, re.MULTILINE)
assert out == ""
def test_timeago():
"""Test that the _timeago function returns sane values"""
assert _timeago(0) == "now"
assert _timeago(1) == "1 sec ago"
assert _timeago(15) == "15 secs ago"
assert _timeago(333) == "5 mins ago"
assert _timeago(99999) == "1 day ago"
assert _timeago(9999999) == "3 months ago"
assert _timeago(-999) == "now"
def test_timeago_fuzz(seconds):
"""Fuzz _timeago to ensure it works with any integer"""
val = _timeago(seconds)
assert re.match(r"(now|\d+ (secs?|mins?|hours?|days?|months?|years?))", val)
def test_MeshInterface(capsys)
Test that we can instantiate a MeshInterface
Expand source code
@pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_MeshInterface(capsys): """Test that we can instantiate a MeshInterface""" iface = MeshInterface(noProto=True) NODE_ID = "!9388f81c" NODE_NUM = 2475227164 node = { "num": NODE_NUM, "user": { "id": NODE_ID, "longName": "Unknown f81c", "shortName": "?1C", "macaddr": "RBeTiPgc", "hwModel": "TBEAM", }, "position": {}, "lastHeard": 1640204888, } iface.nodes = {NODE_ID: node} iface.nodesByNum = {NODE_NUM: node} myInfo = MagicMock() iface.myInfo = myInfo iface.localNode.localConfig.lora.CopyFrom(config_pb2.Config.LoRaConfig()) # Also get some coverage of the structured logging/power meter stuff by turning it on as well log_set = LogSet(iface, None, SimPowerSupply()) iface.showInfo() iface.localNode.showInfo() iface.showNodes() iface.sendText("hello") iface.close() log_set.close() out, err = capsys.readouterr() assert"Owner: None \(None\)", out, re.MULTILINE) assert"Nodes", out, re.MULTILINE) assert"Preferences", out, re.MULTILINE) assert"Channels", out, re.MULTILINE) assert"Primary channel URL", out, re.MULTILINE) assert err == ""
def test_MeshInterface_sendToRadioImpl(caplog)
Test _sendToRadioImp()
Expand source code
@pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_MeshInterface_sendToRadioImpl(caplog): """Test _sendToRadioImp()""" iface = MeshInterface(noProto=True) with caplog.at_level(logging.DEBUG): iface._sendToRadioImpl("foo") assert"Subclass must provide toradio", caplog.text, re.MULTILINE) iface.close()
def test_MeshInterface_sendToRadio_no_proto(caplog)
Test sendToRadio()
Expand source code
@pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_MeshInterface_sendToRadio_no_proto(caplog): """Test sendToRadio()""" iface = MeshInterface() with caplog.at_level(logging.DEBUG): iface._sendToRadioImpl("foo") assert"Subclass must provide toradio", caplog.text, re.MULTILINE) iface.close()
def test_exit_with_exception(caplog)
Test exit()
Expand source code
@pytest.mark.unit def test_exit_with_exception(caplog): """Test __exit__()""" iface = MeshInterface(noProto=True) with caplog.at_level(logging.ERROR): iface.__exit__("foo", "bar", "baz") assert r"An exception of type foo with value bar has occurred", caplog.text, re.MULTILINE, ) assert"Traceback: baz", caplog.text, re.MULTILINE)
def test_fixupPosition()
Test _fixupPosition()
Expand source code
@pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_fixupPosition(): """Test _fixupPosition()""" iface = MeshInterface(noProto=True) pos = {"latitudeI": 1010000000, "longitudeI": 1020000000} newpos = iface._fixupPosition(pos) assert newpos == { "latitude": 101.0, "latitudeI": 1010000000, "longitude": 102.0, "longitudeI": 1020000000, }
def test_fixupPosition_empty_pos()
Test _fixupPosition()
Expand source code
@pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_fixupPosition_empty_pos(): """Test _fixupPosition()""" iface = MeshInterface(noProto=True) pos = {} newpos = iface._fixupPosition(pos) assert newpos == pos
def test_fixupPosition_no_changes_needed()
Test _fixupPosition()
Expand source code
@pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_fixupPosition_no_changes_needed(): """Test _fixupPosition()""" iface = MeshInterface(noProto=True) pos = {"latitude": 101, "longitude": 102} newpos = iface._fixupPosition(pos) assert newpos == pos
def test_generatePacketId(capsys)
Test _generatePacketId() when no currentPacketId (not connected)
Expand source code
@pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_generatePacketId(capsys): """Test _generatePacketId() when no currentPacketId (not connected)""" iface = MeshInterface(noProto=True) # not sure when this condition would ever happen... but we can simulate it iface.currentPacketId = None assert iface.currentPacketId is None with pytest.raises(MeshInterface.MeshInterfaceError) as pytest_wrapped_e: iface._generatePacketId() out, err = capsys.readouterr() assert r"Not connected yet, can not generate packet", out, re.MULTILINE ) assert err == "" assert pytest_wrapped_e.type == MeshInterface.MeshInterfaceError
def test_getLongName(iface_with_nodes)
Test getLongName()
Expand source code
@pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_getLongName(iface_with_nodes): """Test getLongName()""" iface = iface_with_nodes iface.myInfo.my_node_num = 2475227164 mylongname = iface.getLongName() assert mylongname == "Unknown f81c"
def test_getMyNodeInfo()
Test getMyNodeInfo()
Expand source code
@pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_getMyNodeInfo(): """Test getMyNodeInfo()""" iface = MeshInterface(noProto=True) anode = iface.getNode(LOCAL_ADDR) iface.nodesByNum = {1: anode} assert iface.nodesByNum.get(1) == anode myInfo = MagicMock() iface.myInfo = myInfo iface.myInfo.my_node_num = 1 myinfo = iface.getMyNodeInfo() assert myinfo == anode
def test_getMyUser(iface_with_nodes)
Test getMyUser()
Expand source code
@pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_getMyUser(iface_with_nodes): """Test getMyUser()""" iface = iface_with_nodes iface.myInfo.my_node_num = 2475227164 myuser = iface.getMyUser() assert myuser is not None assert myuser["id"] == "!9388f81c"
def test_getNode_not_local(caplog)
Test getNode not local
Expand source code
@pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_getNode_not_local(caplog): """Test getNode not local""" iface = MeshInterface(noProto=True) anode = MagicMock(autospec=Node) with caplog.at_level(logging.DEBUG): with patch("meshtastic.node.Node", return_value=anode): another_node = iface.getNode("bar2") assert another_node != iface.localNode assert"About to requestChannels", caplog.text, re.MULTILINE)
def test_getNode_not_local_timeout(capsys)
Test getNode not local, simulate timeout
Expand source code
@pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_getNode_not_local_timeout(capsys): """Test getNode not local, simulate timeout""" iface = MeshInterface(noProto=True) anode = MagicMock(autospec=Node) anode.waitForConfig.return_value = False with patch("meshtastic.node.Node", return_value=anode): with pytest.raises(SystemExit) as pytest_wrapped_e: iface.getNode("bar2") assert pytest_wrapped_e.type == SystemExit assert pytest_wrapped_e.value.code == 1 out, err = capsys.readouterr() assert re.match(r"Timed out trying to retrieve channel info, retrying", out) assert err == ""
def test_getNode_not_local_timeout_attempts(capsys)
Test getNode not local, simulate timeout
Expand source code
@pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_getNode_not_local_timeout_attempts(capsys): """Test getNode not local, simulate timeout""" iface = MeshInterface(noProto=True) anode = MagicMock(autospec=Node) anode.waitForConfig.return_value = False with patch("meshtastic.node.Node", return_value=anode): with pytest.raises(SystemExit) as pytest_wrapped_e: iface.getNode("bar2", requestChannelAttempts=2) assert pytest_wrapped_e.type == SystemExit assert pytest_wrapped_e.value.code == 1 out, err = capsys.readouterr() assert out == 'Timed out trying to retrieve channel info, retrying\nError: Timed out waiting for channels, giving up\n' assert err == ""
def test_getNode_with_local()
Test getNode
Expand source code
@pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_getNode_with_local(): """Test getNode""" iface = MeshInterface(noProto=True) anode = iface.getNode(LOCAL_ADDR) assert anode == iface.localNode
def test_getOrCreateByNum(iface_with_nodes)
Test _getOrCreateByNum()
Expand source code
@pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_getOrCreateByNum(iface_with_nodes): """Test _getOrCreateByNum()""" iface = iface_with_nodes iface.myInfo.my_node_num = 2475227164 tmp = iface._getOrCreateByNum(2475227164) assert tmp["num"] == 2475227164
def test_getOrCreateByNum_minimal(iface_with_nodes)
Test _getOrCreateByNum()
Expand source code
@pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_getOrCreateByNum_minimal(iface_with_nodes): """Test _getOrCreateByNum()""" iface = iface_with_nodes iface.myInfo.my_node_num = 2475227164 tmp = iface._getOrCreateByNum(123) assert tmp == {"num": 123, "user": {"hwModel": "UNSET", "id": "!0000007b", "shortName": "007b", "longName": "Meshtastic 007b"}}
def test_getOrCreateByNum_not_found(iface_with_nodes)
Test _getOrCreateByNum()
Expand source code
@pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_getOrCreateByNum_not_found(iface_with_nodes): """Test _getOrCreateByNum()""" iface = iface_with_nodes iface.myInfo.my_node_num = 2475227164 with pytest.raises(MeshInterface.MeshInterfaceError) as pytest_wrapped_e: iface._getOrCreateByNum(0xFFFFFFFF) assert pytest_wrapped_e.type == MeshInterface.MeshInterfaceError
def test_getShortName(iface_with_nodes)
Test getShortName().
Expand source code
@pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_getShortName(iface_with_nodes): """Test getShortName().""" iface = iface_with_nodes iface.myInfo.my_node_num = 2475227164 myshortname = iface.getShortName() assert myshortname == "?1C"
def test_handleFromRadio_with_my_info(caplog)
Test _handleFromRadio with my_info
Expand source code
@pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_handleFromRadio_with_my_info(caplog): """Test _handleFromRadio with my_info""" # Note: I captured the '--debug --info' for the bytes below. # It "translates" to this: # my_info { # my_node_num: 682584012 # firmware_version: "" # reboot_count: 13 # bitrate: 17.088470458984375 # message_timeout_msec: 300000 # min_app_version: 20200 # max_channels: 8 # has_wifi: true # } from_radio_bytes = b"\x1a,\x08\xcc\xcf\xbd\xc5\x02\x18\r2\x0e1.2.49.5354c49P\r]0\xb5\x88Ah\xe0\xa7\x12p\xe8\x9d\x01x\x08\x90\x01\x01" iface = MeshInterface(noProto=True) with caplog.at_level(logging.DEBUG): iface._handleFromRadio(from_radio_bytes) iface.close() assert"Received from radio: my_info {", caplog.text, re.MULTILINE) assert"my_node_num: 682584012", caplog.text, re.MULTILINE)
def test_handleFromRadio_with_node_info(caplog, capsys)
Test _handleFromRadio with node_info
Expand source code
@pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_handleFromRadio_with_node_info(caplog, capsys): """Test _handleFromRadio with node_info""" # Note: I captured the '--debug --info' for the bytes below. # It "translates" to this: # node_info { # num: 682584012 # user { # id: "!28af67cc" # long_name: "Unknown 67cc" # short_name: "?CC" # macaddr: "$o(\257g\314" # hw_model: HELTEC_V2_1 # } # position { # } # } from_radio_bytes = b'"2\x08\xcc\xcf\xbd\xc5\x02\x12(\n\t!28af67cc\x12\x0cUnknown 67cc\x1a\x03?CC"\x06$o(\xafg\xcc0\n\x1a\x00' iface = MeshInterface(noProto=True) with caplog.at_level(logging.DEBUG): iface._startConfig() iface._handleFromRadio(from_radio_bytes) assert"Received from radio: node_info {", caplog.text, re.MULTILINE) assert"682584012", caplog.text, re.MULTILINE) # validate some of showNodes() output iface.showNodes() out, err = capsys.readouterr() assert" 1 ", out, re.MULTILINE) assert"│ Unknown 67cc │ ", out, re.MULTILINE) assert"│\s+!28af67cc\s+│\s+67cc\s+|", out, re.MULTILINE) assert err == "" iface.close()
def test_handleFromRadio_with_node_info_tbeam1(caplog, capsys)
Test _handleFromRadio with node_info
Expand source code
@pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_handleFromRadio_with_node_info_tbeam1(caplog, capsys): """Test _handleFromRadio with node_info""" # Note: Captured the '--debug --info' for the bytes below. # pylint: disable=C0301 from_radio_bytes = b'"=\x08\x80\xf8\xc8\xf6\x07\x12"\n\t!7ed23c00\x12\x07TBeam 1\x1a\x02T1"\x06\x94\xb9~\xd2<\x000\x04\x1a\x07 ]MN\x01\xbea%\xad\x01\xbea=\x00\x00,A' iface = MeshInterface(noProto=True) with caplog.at_level(logging.DEBUG): iface._startConfig() iface._handleFromRadio(from_radio_bytes) assert"Received nodeinfo", caplog.text, re.MULTILINE) assert"TBeam 1", caplog.text, re.MULTILINE) assert"2127707136", caplog.text, re.MULTILINE) # validate some of showNodes() output iface.showNodes() out, err = capsys.readouterr() assert" 1 ", out, re.MULTILINE) assert"│ TBeam 1 │ ", out, re.MULTILINE) assert"│ !7ed23c00 │", out, re.MULTILINE) assert err == "" iface.close()
def test_handleFromRadio_with_node_info_tbeam_with_bad_data(caplog)
Test _handleFromRadio with node_info with some bad data (issue#172) - ensure we do not throw exception
Expand source code
@pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_handleFromRadio_with_node_info_tbeam_with_bad_data(caplog): """Test _handleFromRadio with node_info with some bad data (issue#172) - ensure we do not throw exception""" # Note: Captured the '--debug --info' for the bytes below. from_radio_bytes = b'"\x17\x08\xdc\x8a\x8a\xae\x02\x12\x08"\x06\x00\x00\x00\x00\x00\x00\x1a\x00=\x00\x00\xb8@' iface = MeshInterface(noProto=True) with caplog.at_level(logging.DEBUG): iface._startConfig() iface._handleFromRadio(from_radio_bytes)
def test_handlePacketFromRadio_no_from(capsys)
Test _handlePacketFromRadio with no 'from' in the mesh packet.
Expand source code
@pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_handlePacketFromRadio_no_from(capsys): """Test _handlePacketFromRadio with no 'from' in the mesh packet.""" iface = MeshInterface(noProto=True) meshPacket = mesh_pb2.MeshPacket() iface._handlePacketFromRadio(meshPacket) out, err = capsys.readouterr() assert"Device returned a packet we sent, ignoring", out, re.MULTILINE) assert err == ""
def test_handlePacketFromRadio_no_portnum(caplog)
Test _handlePacketFromRadio without a portnum
Expand source code
@pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_handlePacketFromRadio_no_portnum(caplog): """Test _handlePacketFromRadio without a portnum""" iface = MeshInterface(noProto=True) meshPacket = mesh_pb2.MeshPacket() meshPacket.decoded.payload = b"" with caplog.at_level(logging.WARNING): iface._handlePacketFromRadio(meshPacket, hack=True) assert"Not populating fromId", caplog.text, re.MULTILINE)
def test_handlePacketFromRadio_with_a_portnum(caplog)
Test _handlePacketFromRadio with a portnum Since we have an attribute called 'from', we cannot simply 'set' it. Had to implement a hack just to be able to test some code.
Expand source code
@pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_handlePacketFromRadio_with_a_portnum(caplog): """Test _handlePacketFromRadio with a portnum Since we have an attribute called 'from', we cannot simply 'set' it. Had to implement a hack just to be able to test some code. """ iface = MeshInterface(noProto=True) meshPacket = mesh_pb2.MeshPacket() meshPacket.decoded.payload = b"" meshPacket.decoded.portnum = 1 with caplog.at_level(logging.WARNING): iface._handlePacketFromRadio(meshPacket, hack=True) assert"Not populating fromId", caplog.text, re.MULTILINE)
def test_nodeNumToId(iface_with_nodes)
Test _nodeNumToId()
Expand source code
@pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_nodeNumToId(iface_with_nodes): """Test _nodeNumToId()""" iface = iface_with_nodes iface.myInfo.my_node_num = 2475227164 someid = iface._nodeNumToId(2475227164) assert someid == "!9388f81c"
def test_nodeNumToId_not_found(iface_with_nodes)
Test _nodeNumToId()
Expand source code
@pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_nodeNumToId_not_found(iface_with_nodes): """Test _nodeNumToId()""" iface = iface_with_nodes iface.myInfo.my_node_num = 2475227164 someid = iface._nodeNumToId(123) assert someid is None
def test_nodeNumToId_to_all(iface_with_nodes)
Test _nodeNumToId()
Expand source code
@pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_nodeNumToId_to_all(iface_with_nodes): """Test _nodeNumToId()""" iface = iface_with_nodes iface.myInfo.my_node_num = 2475227164 someid = iface._nodeNumToId(0xFFFFFFFF) assert someid == "^all"
def test_sendData_too_long(caplog)
Test when data payload is too big
Expand source code
@pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_sendData_too_long(caplog): """Test when data payload is too big""" iface = MeshInterface(noProto=True) some_large_text = b"This is a long text that will be too long for send text." some_large_text += b"This is a long text that will be too long for send text." some_large_text += b"This is a long text that will be too long for send text." some_large_text += b"This is a long text that will be too long for send text." some_large_text += b"This is a long text that will be too long for send text." some_large_text += b"This is a long text that will be too long for send text." some_large_text += b"This is a long text that will be too long for send text." some_large_text += b"This is a long text that will be too long for send text." some_large_text += b"This is a long text that will be too long for send text." some_large_text += b"This is a long text that will be too long for send text." some_large_text += b"This is a long text that will be too long for send text." some_large_text += b"This is a long text that will be too long for send text." with caplog.at_level(logging.DEBUG): with pytest.raises(MeshInterface.MeshInterfaceError) as pytest_wrapped_e: iface.sendData(some_large_text) assert"Data payload too big", caplog.text, re.MULTILINE) assert pytest_wrapped_e.type == MeshInterface.MeshInterfaceError iface.close()
def test_sendData_unknown_app(capsys)
Test sendData when unknown app
Expand source code
@pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_sendData_unknown_app(capsys): """Test sendData when unknown app""" iface = MeshInterface(noProto=True) with pytest.raises(SystemExit) as pytest_wrapped_e: iface.sendData(b"hello", portNum=0) out, err = capsys.readouterr() assert"Warning: A non-zero port number", out, re.MULTILINE) assert err == "" assert pytest_wrapped_e.type == SystemExit assert pytest_wrapped_e.value.code == 1
def test_sendPacket_with_destination_as_BROADCAST_ADDR(caplog)
Test _sendPacket() with BROADCAST_ADDR as a destination
Expand source code
@pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_sendPacket_with_destination_as_BROADCAST_ADDR(caplog): """Test _sendPacket() with BROADCAST_ADDR as a destination""" iface = MeshInterface(noProto=True) with caplog.at_level(logging.DEBUG): meshPacket = mesh_pb2.MeshPacket() iface._sendPacket(meshPacket, destinationId=BROADCAST_ADDR) assert"Not sending packet", caplog.text, re.MULTILINE)
def test_sendPacket_with_destination_as_LOCAL_ADDR_no_myInfo(capsys)
Test _sendPacket() with LOCAL_ADDR as a destination with no myInfo
Expand source code
@pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_sendPacket_with_destination_as_LOCAL_ADDR_no_myInfo(capsys): """Test _sendPacket() with LOCAL_ADDR as a destination with no myInfo""" iface = MeshInterface(noProto=True) with pytest.raises(SystemExit) as pytest_wrapped_e: meshPacket = mesh_pb2.MeshPacket() iface._sendPacket(meshPacket, destinationId=LOCAL_ADDR) out, err = capsys.readouterr() assert"Warning: No myInfo", out, re.MULTILINE) assert err == "" assert pytest_wrapped_e.type == SystemExit assert pytest_wrapped_e.value.code == 1
def test_sendPacket_with_destination_as_LOCAL_ADDR_with_myInfo(caplog)
Test _sendPacket() with LOCAL_ADDR as a destination with myInfo
Expand source code
@pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_sendPacket_with_destination_as_LOCAL_ADDR_with_myInfo(caplog): """Test _sendPacket() with LOCAL_ADDR as a destination with myInfo""" iface = MeshInterface(noProto=True) myInfo = MagicMock() iface.myInfo = myInfo iface.myInfo.my_node_num = 1 with caplog.at_level(logging.DEBUG): meshPacket = mesh_pb2.MeshPacket() iface._sendPacket(meshPacket, destinationId=LOCAL_ADDR) assert"Not sending packet", caplog.text, re.MULTILINE)
def test_sendPacket_with_destination_as_int(caplog)
Test _sendPacket() with int as a destination
Expand source code
@pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_sendPacket_with_destination_as_int(caplog): """Test _sendPacket() with int as a destination""" iface = MeshInterface(noProto=True) with caplog.at_level(logging.DEBUG): meshPacket = mesh_pb2.MeshPacket() iface._sendPacket(meshPacket, destinationId=123) assert"Not sending packet", caplog.text, re.MULTILINE)
def test_sendPacket_with_destination_is_blank_with_nodes(capsys, iface_with_nodes)
Test _sendPacket() with '' as a destination with myInfo
Expand source code
@pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_sendPacket_with_destination_is_blank_with_nodes(capsys, iface_with_nodes): """Test _sendPacket() with '' as a destination with myInfo""" iface = iface_with_nodes meshPacket = mesh_pb2.MeshPacket() with pytest.raises(SystemExit) as pytest_wrapped_e: iface._sendPacket(meshPacket, destinationId="") assert pytest_wrapped_e.type == SystemExit assert pytest_wrapped_e.value.code == 1 out, err = capsys.readouterr() assert re.match(r"Warning: NodeId not found in DB", out, re.MULTILINE) assert err == ""
def test_sendPacket_with_destination_is_blank_without_nodes(caplog, iface_with_nodes)
Test _sendPacket() with '' as a destination with myInfo
Expand source code
@pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_sendPacket_with_destination_is_blank_without_nodes(caplog, iface_with_nodes): """Test _sendPacket() with '' as a destination with myInfo""" iface = iface_with_nodes iface.nodes = None meshPacket = mesh_pb2.MeshPacket() with caplog.at_level(logging.WARNING): iface._sendPacket(meshPacket, destinationId="") assert"Warning: There were no self.nodes.", caplog.text, re.MULTILINE)
def test_sendPacket_with_destination_starting_with_a_bang(caplog)
Test _sendPacket() with int as a destination
Expand source code
@pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_sendPacket_with_destination_starting_with_a_bang(caplog): """Test _sendPacket() with int as a destination""" iface = MeshInterface(noProto=True) with caplog.at_level(logging.DEBUG): meshPacket = mesh_pb2.MeshPacket() iface._sendPacket(meshPacket, destinationId="!1234") assert"Not sending packet", caplog.text, re.MULTILINE)
def test_sendPacket_with_no_destination(capsys)
Test _sendPacket()
Expand source code
@pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_sendPacket_with_no_destination(capsys): """Test _sendPacket()""" iface = MeshInterface(noProto=True) with pytest.raises(SystemExit) as pytest_wrapped_e: iface._sendPacket(b"", destinationId=None) out, err = capsys.readouterr() assert"Warning: destinationId must not be None", out, re.MULTILINE) assert err == "" assert pytest_wrapped_e.type == SystemExit assert pytest_wrapped_e.value.code == 1
def test_sendPosition(caplog)
Test sendPosition
Expand source code
@pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_sendPosition(caplog): """Test sendPosition""" iface = MeshInterface(noProto=True) with caplog.at_level(logging.DEBUG): iface.sendPosition() iface.close() # assert"p.time:", caplog.text, re.MULTILINE)
def test_sendPosition_with_a_position(caplog)
Test sendPosition when lat/long/alt
Expand source code
@pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_sendPosition_with_a_position(caplog): """Test sendPosition when lat/long/alt""" iface = MeshInterface(noProto=True) with caplog.at_level(logging.DEBUG): iface.sendPosition(latitude=40.8, longitude=-111.86, altitude=201) assert"p.latitude_i:408", caplog.text, re.MULTILINE) assert"p.longitude_i:-11186", caplog.text, re.MULTILINE) assert"p.altitude:201", caplog.text, re.MULTILINE)
def test_showNodes_exclude_self(capsys, caplog, iface_with_nodes)
Test that we hit that continue statement
Expand source code
@pytest.mark.unit def test_showNodes_exclude_self(capsys, caplog, iface_with_nodes): """Test that we hit that continue statement""" with caplog.at_level(logging.DEBUG): iface = iface_with_nodes iface.localNode.nodeNum = 2475227164 iface.showNodes() iface.showNodes(includeSelf=False) capsys.readouterr()
def test_timeago()
Test that the _timeago function returns sane values
Expand source code
@pytest.mark.unit def test_timeago(): """Test that the _timeago function returns sane values""" assert _timeago(0) == "now" assert _timeago(1) == "1 sec ago" assert _timeago(15) == "15 secs ago" assert _timeago(333) == "5 mins ago" assert _timeago(99999) == "1 day ago" assert _timeago(9999999) == "3 months ago" assert _timeago(-999) == "now"
def test_timeago_fuzz() ‑> None
Fuzz _timeago to ensure it works with any integer
Expand source code
@given(seconds=st.integers()) def test_timeago_fuzz(seconds): """Fuzz _timeago to ensure it works with any integer""" val = _timeago(seconds) assert re.match(r"(now|\d+ (secs?|mins?|hours?|days?|months?|years?))", val)
def test_waitConnected_isConnected_timeout(capsys)
Test waitConnected()
Expand source code
@pytest.mark.unit def test_waitConnected_isConnected_timeout(capsys): """Test waitConnected()""" with pytest.raises(MeshInterface.MeshInterfaceError) as pytest_wrapped_e: iface = MeshInterface() iface._waitConnected(0.01) assert pytest_wrapped_e.type == MeshInterface.MeshInterfaceError out, err = capsys.readouterr() assert"warn about something", err, re.MULTILINE) assert out == ""
def test_waitConnected_raises_an_exception(capsys)
Test waitConnected()
Expand source code
@pytest.mark.unit def test_waitConnected_raises_an_exception(capsys): """Test waitConnected()""" iface = MeshInterface(noProto=True) with pytest.raises(MeshInterface.MeshInterfaceError) as pytest_wrapped_e: iface.failure = MeshInterface.MeshInterfaceError("warn about something") iface._waitConnected(0.01) assert pytest_wrapped_e.type == MeshInterface.MeshInterfaceError out, err = capsys.readouterr() assert"warn about something", err, re.MULTILINE) assert out == ""
def test_waitForConfig(capsys)
Test waitForConfig()
Expand source code
@pytest.mark.unitslow def test_waitForConfig(capsys): """Test waitForConfig()""" iface = MeshInterface(noProto=True) # override how long to wait iface._timeout = Timeout(0.01) with pytest.raises(MeshInterface.MeshInterfaceError) as pytest_wrapped_e: iface.waitForConfig() assert pytest_wrapped_e.type == MeshInterface.MeshInterfaceError out, err = capsys.readouterr() assert r"Exception: Timed out waiting for interface config", err, re.MULTILINE ) assert out == ""