Module meshtastic.tests.test_mesh_interface

Meshtastic unit tests for mesh_interface.py

Expand source code
"""Meshtastic unit tests for mesh_interface.py"""

import logging
import re
from unittest.mock import MagicMock, patch

import pytest
from hypothesis import given, strategies as st

from ..protobuf import mesh_pb2, config_pb2
from .. import BROADCAST_ADDR, LOCAL_ADDR
from ..mesh_interface import MeshInterface, _timeago
from ..node import Node
from ..slog import LogSet
from ..powermon import SimPowerSupply

# TODO
# from ..config import Config
from ..util import Timeout


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_MeshInterface(capsys):
    """Test that we can instantiate a MeshInterface"""
    iface = MeshInterface(noProto=True)

    NODE_ID = "!9388f81c"
    NODE_NUM = 2475227164
    node = {
            "num": NODE_NUM,
            "user": {
                "id": NODE_ID,
                "longName": "Unknown f81c",
                "shortName": "?1C",
                "macaddr": "RBeTiPgc",
                "hwModel": "TBEAM",
            },
            "position": {},
            "lastHeard": 1640204888,
        }


    iface.nodes = {NODE_ID: node}
    iface.nodesByNum = {NODE_NUM: node}

    myInfo = MagicMock()
    iface.myInfo = myInfo

    iface.localNode.localConfig.lora.CopyFrom(config_pb2.Config.LoRaConfig())

    # Also get some coverage of the structured logging/power meter stuff by turning it on as well
    log_set = LogSet(iface, None, SimPowerSupply())

    iface.showInfo()
    iface.localNode.showInfo()
    iface.showNodes()
    iface.sendText("hello")
    iface.close()
    log_set.close()
    out, err = capsys.readouterr()
    assert re.search(r"Owner: None \(None\)", out, re.MULTILINE)
    assert re.search(r"Nodes", out, re.MULTILINE)
    assert re.search(r"Preferences", out, re.MULTILINE)
    assert re.search(r"Channels", out, re.MULTILINE)
    assert re.search(r"Primary channel URL", out, re.MULTILINE)
    assert err == ""


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_getMyUser(iface_with_nodes):
    """Test getMyUser()"""
    iface = iface_with_nodes
    iface.myInfo.my_node_num = 2475227164
    myuser = iface.getMyUser()
    assert myuser is not None
    assert myuser["id"] == "!9388f81c"


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_getLongName(iface_with_nodes):
    """Test getLongName()"""
    iface = iface_with_nodes
    iface.myInfo.my_node_num = 2475227164
    mylongname = iface.getLongName()
    assert mylongname == "Unknown f81c"


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_getShortName(iface_with_nodes):
    """Test getShortName()."""
    iface = iface_with_nodes
    iface.myInfo.my_node_num = 2475227164
    myshortname = iface.getShortName()
    assert myshortname == "?1C"


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_handlePacketFromRadio_no_from(capsys):
    """Test _handlePacketFromRadio with no 'from' in the mesh packet."""
    iface = MeshInterface(noProto=True)
    meshPacket = mesh_pb2.MeshPacket()
    iface._handlePacketFromRadio(meshPacket)
    out, err = capsys.readouterr()
    assert re.search(r"Device returned a packet we sent, ignoring", out, re.MULTILINE)
    assert err == ""


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_handlePacketFromRadio_with_a_portnum(caplog):
    """Test _handlePacketFromRadio with a portnum
    Since we have an attribute called 'from', we cannot simply 'set' it.
    Had to implement a hack just to be able to test some code.
    """
    iface = MeshInterface(noProto=True)
    meshPacket = mesh_pb2.MeshPacket()
    meshPacket.decoded.payload = b""
    meshPacket.decoded.portnum = 1
    with caplog.at_level(logging.WARNING):
        iface._handlePacketFromRadio(meshPacket, hack=True)
    assert re.search(r"Not populating fromId", caplog.text, re.MULTILINE)


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_handlePacketFromRadio_no_portnum(caplog):
    """Test _handlePacketFromRadio without a portnum"""
    iface = MeshInterface(noProto=True)
    meshPacket = mesh_pb2.MeshPacket()
    meshPacket.decoded.payload = b""
    with caplog.at_level(logging.WARNING):
        iface._handlePacketFromRadio(meshPacket, hack=True)
    assert re.search(r"Not populating fromId", caplog.text, re.MULTILINE)


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_getNode_with_local():
    """Test getNode"""
    iface = MeshInterface(noProto=True)
    anode = iface.getNode(LOCAL_ADDR)
    assert anode == iface.localNode


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_getNode_not_local(caplog):
    """Test getNode not local"""
    iface = MeshInterface(noProto=True)
    anode = MagicMock(autospec=Node)
    with caplog.at_level(logging.DEBUG):
        with patch("meshtastic.node.Node", return_value=anode):
            another_node = iface.getNode("bar2")
            assert another_node != iface.localNode
    assert re.search(r"About to requestChannels", caplog.text, re.MULTILINE)


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_getNode_not_local_timeout(capsys):
    """Test getNode not local, simulate timeout"""
    iface = MeshInterface(noProto=True)
    anode = MagicMock(autospec=Node)
    anode.waitForConfig.return_value = False
    with patch("meshtastic.node.Node", return_value=anode):
        with pytest.raises(SystemExit) as pytest_wrapped_e:
            iface.getNode("bar2")
        assert pytest_wrapped_e.type == SystemExit
        assert pytest_wrapped_e.value.code == 1
        out, err = capsys.readouterr()
        assert re.match(r"Timed out trying to retrieve channel info, retrying", out)
        assert err == ""

@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_getNode_not_local_timeout_attempts(capsys):
    """Test getNode not local, simulate timeout"""
    iface = MeshInterface(noProto=True)
    anode = MagicMock(autospec=Node)
    anode.waitForConfig.return_value = False
    with patch("meshtastic.node.Node", return_value=anode):
        with pytest.raises(SystemExit) as pytest_wrapped_e:
            iface.getNode("bar2", requestChannelAttempts=2)
        assert pytest_wrapped_e.type == SystemExit
        assert pytest_wrapped_e.value.code == 1
        out, err = capsys.readouterr()
        assert out == 'Timed out trying to retrieve channel info, retrying\nError: Timed out waiting for channels, giving up\n'
        assert err == ""


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_sendPosition(caplog):
    """Test sendPosition"""
    iface = MeshInterface(noProto=True)
    with caplog.at_level(logging.DEBUG):
        iface.sendPosition()
    iface.close()
    # assert re.search(r"p.time:", caplog.text, re.MULTILINE)


# TODO
# @pytest.mark.unit
# @pytest.mark.usefixtures("reset_mt_config")
# def test_close_with_heartbeatTimer(caplog):
#    """Test close() with heartbeatTimer"""
#    iface = MeshInterface(noProto=True)
#    anode = Node('foo', 'bar')
#    aconfig = Config()
#    aonfig.preferences.phone_timeout_secs = 10
#    anode.config = aconfig
#    iface.localNode = anode
#    assert iface.heartbeatTimer is None
#    with caplog.at_level(logging.DEBUG):
#        iface._startHeartbeat()
#        assert iface.heartbeatTimer is not None
#        iface.close()


# TODO
# @pytest.mark.unit
# @pytest.mark.usefixtures("reset_mt_config")
# def test_handleFromRadio_empty_payload(caplog):
#    """Test _handleFromRadio"""
#    iface = MeshInterface(noProto=True)
#    with caplog.at_level(logging.DEBUG):
#        iface._handleFromRadio(b'')
#    iface.close()
#    assert re.search(r'Unexpected FromRadio payload', caplog.text, re.MULTILINE)


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_handleFromRadio_with_my_info(caplog):
    """Test _handleFromRadio with my_info"""
    # Note: I captured the '--debug --info' for the bytes below.
    # It "translates" to this:
    # my_info {
    #  my_node_num: 682584012
    #  firmware_version: "1.2.49.5354c49"
    #  reboot_count: 13
    #  bitrate: 17.088470458984375
    #  message_timeout_msec: 300000
    #  min_app_version: 20200
    #  max_channels: 8
    #  has_wifi: true
    # }
    from_radio_bytes = b"\x1a,\x08\xcc\xcf\xbd\xc5\x02\x18\r2\x0e1.2.49.5354c49P\r]0\xb5\x88Ah\xe0\xa7\x12p\xe8\x9d\x01x\x08\x90\x01\x01"
    iface = MeshInterface(noProto=True)
    with caplog.at_level(logging.DEBUG):
        iface._handleFromRadio(from_radio_bytes)
    iface.close()
    assert re.search(r"Received from radio: my_info {", caplog.text, re.MULTILINE)
    assert re.search(r"my_node_num: 682584012", caplog.text, re.MULTILINE)


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_handleFromRadio_with_node_info(caplog, capsys):
    """Test _handleFromRadio with node_info"""
    # Note: I captured the '--debug --info' for the bytes below.
    # It "translates" to this:
    # node_info {
    #  num: 682584012
    #  user {
    #    id: "!28af67cc"
    #    long_name: "Unknown 67cc"
    #    short_name: "?CC"
    #    macaddr: "$o(\257g\314"
    #    hw_model: HELTEC_V2_1
    #  }
    #  position {
    #    }
    #  }

    from_radio_bytes = b'"2\x08\xcc\xcf\xbd\xc5\x02\x12(\n\t!28af67cc\x12\x0cUnknown 67cc\x1a\x03?CC"\x06$o(\xafg\xcc0\n\x1a\x00'
    iface = MeshInterface(noProto=True)
    with caplog.at_level(logging.DEBUG):
        iface._startConfig()
        iface._handleFromRadio(from_radio_bytes)
        assert re.search(r"Received from radio: node_info {", caplog.text, re.MULTILINE)
        assert re.search(r"682584012", caplog.text, re.MULTILINE)
        # validate some of showNodes() output
        iface.showNodes()
        out, err = capsys.readouterr()
        assert re.search(r" 1 ", out, re.MULTILINE)
        assert re.search(r"│ Unknown 67cc │ ", out, re.MULTILINE)
        assert re.search(r"│\s+!28af67cc\s+│\s+67cc\s+|", out, re.MULTILINE)
        assert err == ""
        iface.close()


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_handleFromRadio_with_node_info_tbeam1(caplog, capsys):
    """Test _handleFromRadio with node_info"""
    # Note: Captured the '--debug --info' for the bytes below.
    # pylint: disable=C0301
    from_radio_bytes = b'"=\x08\x80\xf8\xc8\xf6\x07\x12"\n\t!7ed23c00\x12\x07TBeam 1\x1a\x02T1"\x06\x94\xb9~\xd2<\x000\x04\x1a\x07 ]MN\x01\xbea%\xad\x01\xbea=\x00\x00,A'
    iface = MeshInterface(noProto=True)
    with caplog.at_level(logging.DEBUG):
        iface._startConfig()
        iface._handleFromRadio(from_radio_bytes)
        assert re.search(r"Received nodeinfo", caplog.text, re.MULTILINE)
        assert re.search(r"TBeam 1", caplog.text, re.MULTILINE)
        assert re.search(r"2127707136", caplog.text, re.MULTILINE)
        # validate some of showNodes() output
        iface.showNodes()
        out, err = capsys.readouterr()
        assert re.search(r" 1 ", out, re.MULTILINE)
        assert re.search(r"│ TBeam 1 │ ", out, re.MULTILINE)
        assert re.search(r"│ !7ed23c00 │", out, re.MULTILINE)
        assert err == ""
        iface.close()


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_handleFromRadio_with_node_info_tbeam_with_bad_data(caplog):
    """Test _handleFromRadio with node_info with some bad data (issue#172) - ensure we do not throw exception"""
    # Note: Captured the '--debug --info' for the bytes below.
    from_radio_bytes = b'"\x17\x08\xdc\x8a\x8a\xae\x02\x12\x08"\x06\x00\x00\x00\x00\x00\x00\x1a\x00=\x00\x00\xb8@'
    iface = MeshInterface(noProto=True)
    with caplog.at_level(logging.DEBUG):
        iface._startConfig()
        iface._handleFromRadio(from_radio_bytes)


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_MeshInterface_sendToRadioImpl(caplog):
    """Test _sendToRadioImp()"""
    iface = MeshInterface(noProto=True)
    with caplog.at_level(logging.DEBUG):
        iface._sendToRadioImpl("foo")
    assert re.search(r"Subclass must provide toradio", caplog.text, re.MULTILINE)
    iface.close()


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_MeshInterface_sendToRadio_no_proto(caplog):
    """Test sendToRadio()"""
    iface = MeshInterface()
    with caplog.at_level(logging.DEBUG):
        iface._sendToRadioImpl("foo")
    assert re.search(r"Subclass must provide toradio", caplog.text, re.MULTILINE)
    iface.close()


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_sendData_too_long(caplog):
    """Test when data payload is too big"""
    iface = MeshInterface(noProto=True)
    some_large_text = b"This is a long text that will be too long for send text."
    some_large_text += b"This is a long text that will be too long for send text."
    some_large_text += b"This is a long text that will be too long for send text."
    some_large_text += b"This is a long text that will be too long for send text."
    some_large_text += b"This is a long text that will be too long for send text."
    some_large_text += b"This is a long text that will be too long for send text."
    some_large_text += b"This is a long text that will be too long for send text."
    some_large_text += b"This is a long text that will be too long for send text."
    some_large_text += b"This is a long text that will be too long for send text."
    some_large_text += b"This is a long text that will be too long for send text."
    some_large_text += b"This is a long text that will be too long for send text."
    some_large_text += b"This is a long text that will be too long for send text."
    with caplog.at_level(logging.DEBUG):
        with pytest.raises(MeshInterface.MeshInterfaceError) as pytest_wrapped_e:
            iface.sendData(some_large_text)
            assert re.search("Data payload too big", caplog.text, re.MULTILINE)
        assert pytest_wrapped_e.type == MeshInterface.MeshInterfaceError
    iface.close()


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_sendData_unknown_app(capsys):
    """Test sendData when unknown app"""
    iface = MeshInterface(noProto=True)
    with pytest.raises(SystemExit) as pytest_wrapped_e:
        iface.sendData(b"hello", portNum=0)
    out, err = capsys.readouterr()
    assert re.search(r"Warning: A non-zero port number", out, re.MULTILINE)
    assert err == ""
    assert pytest_wrapped_e.type == SystemExit
    assert pytest_wrapped_e.value.code == 1


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_sendPosition_with_a_position(caplog):
    """Test sendPosition when lat/long/alt"""
    iface = MeshInterface(noProto=True)
    with caplog.at_level(logging.DEBUG):
        iface.sendPosition(latitude=40.8, longitude=-111.86, altitude=201)
        assert re.search(r"p.latitude_i:408", caplog.text, re.MULTILINE)
        assert re.search(r"p.longitude_i:-11186", caplog.text, re.MULTILINE)
        assert re.search(r"p.altitude:201", caplog.text, re.MULTILINE)


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_sendPacket_with_no_destination(capsys):
    """Test _sendPacket()"""
    iface = MeshInterface(noProto=True)
    with pytest.raises(SystemExit) as pytest_wrapped_e:
        iface._sendPacket(b"", destinationId=None)
    out, err = capsys.readouterr()
    assert re.search(r"Warning: destinationId must not be None", out, re.MULTILINE)
    assert err == ""
    assert pytest_wrapped_e.type == SystemExit
    assert pytest_wrapped_e.value.code == 1


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_sendPacket_with_destination_as_int(caplog):
    """Test _sendPacket() with int as a destination"""
    iface = MeshInterface(noProto=True)
    with caplog.at_level(logging.DEBUG):
        meshPacket = mesh_pb2.MeshPacket()
        iface._sendPacket(meshPacket, destinationId=123)
        assert re.search(r"Not sending packet", caplog.text, re.MULTILINE)


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_sendPacket_with_destination_starting_with_a_bang(caplog):
    """Test _sendPacket() with int as a destination"""
    iface = MeshInterface(noProto=True)
    with caplog.at_level(logging.DEBUG):
        meshPacket = mesh_pb2.MeshPacket()
        iface._sendPacket(meshPacket, destinationId="!1234")
        assert re.search(r"Not sending packet", caplog.text, re.MULTILINE)


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_sendPacket_with_destination_as_BROADCAST_ADDR(caplog):
    """Test _sendPacket() with BROADCAST_ADDR as a destination"""
    iface = MeshInterface(noProto=True)
    with caplog.at_level(logging.DEBUG):
        meshPacket = mesh_pb2.MeshPacket()
        iface._sendPacket(meshPacket, destinationId=BROADCAST_ADDR)
        assert re.search(r"Not sending packet", caplog.text, re.MULTILINE)


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_sendPacket_with_destination_as_LOCAL_ADDR_no_myInfo(capsys):
    """Test _sendPacket() with LOCAL_ADDR as a destination with no myInfo"""
    iface = MeshInterface(noProto=True)
    with pytest.raises(SystemExit) as pytest_wrapped_e:
        meshPacket = mesh_pb2.MeshPacket()
        iface._sendPacket(meshPacket, destinationId=LOCAL_ADDR)
    out, err = capsys.readouterr()
    assert re.search(r"Warning: No myInfo", out, re.MULTILINE)
    assert err == ""
    assert pytest_wrapped_e.type == SystemExit
    assert pytest_wrapped_e.value.code == 1


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_sendPacket_with_destination_as_LOCAL_ADDR_with_myInfo(caplog):
    """Test _sendPacket() with LOCAL_ADDR as a destination with myInfo"""
    iface = MeshInterface(noProto=True)
    myInfo = MagicMock()
    iface.myInfo = myInfo
    iface.myInfo.my_node_num = 1
    with caplog.at_level(logging.DEBUG):
        meshPacket = mesh_pb2.MeshPacket()
        iface._sendPacket(meshPacket, destinationId=LOCAL_ADDR)
        assert re.search(r"Not sending packet", caplog.text, re.MULTILINE)


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_sendPacket_with_destination_is_blank_with_nodes(capsys, iface_with_nodes):
    """Test _sendPacket() with '' as a destination with myInfo"""
    iface = iface_with_nodes
    meshPacket = mesh_pb2.MeshPacket()
    with pytest.raises(SystemExit) as pytest_wrapped_e:
        iface._sendPacket(meshPacket, destinationId="")
    assert pytest_wrapped_e.type == SystemExit
    assert pytest_wrapped_e.value.code == 1
    out, err = capsys.readouterr()
    assert re.match(r"Warning: NodeId  not found in DB", out, re.MULTILINE)
    assert err == ""


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_sendPacket_with_destination_is_blank_without_nodes(caplog, iface_with_nodes):
    """Test _sendPacket() with '' as a destination with myInfo"""
    iface = iface_with_nodes
    iface.nodes = None
    meshPacket = mesh_pb2.MeshPacket()
    with caplog.at_level(logging.WARNING):
        iface._sendPacket(meshPacket, destinationId="")
    assert re.search(r"Warning: There were no self.nodes.", caplog.text, re.MULTILINE)


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_getMyNodeInfo():
    """Test getMyNodeInfo()"""
    iface = MeshInterface(noProto=True)
    anode = iface.getNode(LOCAL_ADDR)
    iface.nodesByNum = {1: anode}
    assert iface.nodesByNum.get(1) == anode
    myInfo = MagicMock()
    iface.myInfo = myInfo
    iface.myInfo.my_node_num = 1
    myinfo = iface.getMyNodeInfo()
    assert myinfo == anode


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_generatePacketId(capsys):
    """Test _generatePacketId() when no currentPacketId (not connected)"""
    iface = MeshInterface(noProto=True)
    # not sure when this condition would ever happen... but we can simulate it
    iface.currentPacketId = None
    assert iface.currentPacketId is None
    with pytest.raises(MeshInterface.MeshInterfaceError) as pytest_wrapped_e:
        iface._generatePacketId()
        out, err = capsys.readouterr()
        assert re.search(
            r"Not connected yet, can not generate packet", out, re.MULTILINE
        )
        assert err == ""
    assert pytest_wrapped_e.type == MeshInterface.MeshInterfaceError


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_fixupPosition_empty_pos():
    """Test _fixupPosition()"""
    iface = MeshInterface(noProto=True)
    pos = {}
    newpos = iface._fixupPosition(pos)
    assert newpos == pos


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_fixupPosition_no_changes_needed():
    """Test _fixupPosition()"""
    iface = MeshInterface(noProto=True)
    pos = {"latitude": 101, "longitude": 102}
    newpos = iface._fixupPosition(pos)
    assert newpos == pos


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_fixupPosition():
    """Test _fixupPosition()"""
    iface = MeshInterface(noProto=True)
    pos = {"latitudeI": 1010000000, "longitudeI": 1020000000}
    newpos = iface._fixupPosition(pos)
    assert newpos == {
        "latitude": 101.0,
        "latitudeI": 1010000000,
        "longitude": 102.0,
        "longitudeI": 1020000000,
    }


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_nodeNumToId(iface_with_nodes):
    """Test _nodeNumToId()"""
    iface = iface_with_nodes
    iface.myInfo.my_node_num = 2475227164
    someid = iface._nodeNumToId(2475227164)
    assert someid == "!9388f81c"


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_nodeNumToId_not_found(iface_with_nodes):
    """Test _nodeNumToId()"""
    iface = iface_with_nodes
    iface.myInfo.my_node_num = 2475227164
    someid = iface._nodeNumToId(123)
    assert someid is None


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_nodeNumToId_to_all(iface_with_nodes):
    """Test _nodeNumToId()"""
    iface = iface_with_nodes
    iface.myInfo.my_node_num = 2475227164
    someid = iface._nodeNumToId(0xFFFFFFFF)
    assert someid == "^all"


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_getOrCreateByNum_minimal(iface_with_nodes):
    """Test _getOrCreateByNum()"""
    iface = iface_with_nodes
    iface.myInfo.my_node_num = 2475227164
    tmp = iface._getOrCreateByNum(123)
    assert tmp == {"num": 123, "user": {"hwModel": "UNSET", "id": "!0000007b", "shortName": "007b", "longName": "Meshtastic 007b"}}


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_getOrCreateByNum_not_found(iface_with_nodes):
    """Test _getOrCreateByNum()"""
    iface = iface_with_nodes
    iface.myInfo.my_node_num = 2475227164
    with pytest.raises(MeshInterface.MeshInterfaceError) as pytest_wrapped_e:
        iface._getOrCreateByNum(0xFFFFFFFF)
    assert pytest_wrapped_e.type == MeshInterface.MeshInterfaceError


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_getOrCreateByNum(iface_with_nodes):
    """Test _getOrCreateByNum()"""
    iface = iface_with_nodes
    iface.myInfo.my_node_num = 2475227164
    tmp = iface._getOrCreateByNum(2475227164)
    assert tmp["num"] == 2475227164


# TODO
# @pytest.mark.unit
# def test_enter():
#    """Test __enter__()"""
#    iface = MeshInterface(noProto=True)
#    assert iface == iface.__enter__()


@pytest.mark.unit
def test_exit_with_exception(caplog):
    """Test __exit__()"""
    iface = MeshInterface(noProto=True)
    with caplog.at_level(logging.ERROR):
        iface.__exit__("foo", "bar", "baz")
        assert re.search(
            r"An exception of type foo with value bar has occurred",
            caplog.text,
            re.MULTILINE,
        )
        assert re.search(r"Traceback: baz", caplog.text, re.MULTILINE)


@pytest.mark.unit
def test_showNodes_exclude_self(capsys, caplog, iface_with_nodes):
    """Test that we hit that continue statement"""
    with caplog.at_level(logging.DEBUG):
        iface = iface_with_nodes
        iface.localNode.nodeNum = 2475227164
        iface.showNodes()
        iface.showNodes(includeSelf=False)
        capsys.readouterr()


@pytest.mark.unitslow
def test_waitForConfig(capsys):
    """Test waitForConfig()"""
    iface = MeshInterface(noProto=True)
    # override how long to wait
    iface._timeout = Timeout(0.01)
    with pytest.raises(MeshInterface.MeshInterfaceError) as pytest_wrapped_e:
        iface.waitForConfig()
        assert pytest_wrapped_e.type == MeshInterface.MeshInterfaceError
        out, err = capsys.readouterr()
        assert re.search(
            r"Exception: Timed out waiting for interface config", err, re.MULTILINE
        )
        assert out == ""


@pytest.mark.unit
def test_waitConnected_raises_an_exception(capsys):
    """Test waitConnected()"""
    iface = MeshInterface(noProto=True)
    with pytest.raises(MeshInterface.MeshInterfaceError) as pytest_wrapped_e:
        iface.failure = MeshInterface.MeshInterfaceError("warn about something")
        iface._waitConnected(0.01)
        assert pytest_wrapped_e.type == MeshInterface.MeshInterfaceError
        out, err = capsys.readouterr()
        assert re.search(r"warn about something", err, re.MULTILINE)
        assert out == ""


@pytest.mark.unit
def test_waitConnected_isConnected_timeout(capsys):
    """Test waitConnected()"""
    with pytest.raises(MeshInterface.MeshInterfaceError) as pytest_wrapped_e:
        iface = MeshInterface()
        iface._waitConnected(0.01)
        assert pytest_wrapped_e.type == MeshInterface.MeshInterfaceError
        out, err = capsys.readouterr()
        assert re.search(r"warn about something", err, re.MULTILINE)
        assert out == ""


@pytest.mark.unit
def test_timeago():
    """Test that the _timeago function returns sane values"""
    assert _timeago(0) == "now"
    assert _timeago(1) == "1 sec ago"
    assert _timeago(15) == "15 secs ago"
    assert _timeago(333) == "5 mins ago"
    assert _timeago(99999) == "1 day ago"
    assert _timeago(9999999) == "3 months ago"
    assert _timeago(-999) == "now"

@given(seconds=st.integers())
def test_timeago_fuzz(seconds):
    """Fuzz _timeago to ensure it works with any integer"""
    val = _timeago(seconds)
    assert re.match(r"(now|\d+ (secs?|mins?|hours?|days?|months?|years?))", val)

Functions

def test_MeshInterface(capsys)

Test that we can instantiate a MeshInterface

Expand source code
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_MeshInterface(capsys):
    """Test that we can instantiate a MeshInterface"""
    iface = MeshInterface(noProto=True)

    NODE_ID = "!9388f81c"
    NODE_NUM = 2475227164
    node = {
            "num": NODE_NUM,
            "user": {
                "id": NODE_ID,
                "longName": "Unknown f81c",
                "shortName": "?1C",
                "macaddr": "RBeTiPgc",
                "hwModel": "TBEAM",
            },
            "position": {},
            "lastHeard": 1640204888,
        }


    iface.nodes = {NODE_ID: node}
    iface.nodesByNum = {NODE_NUM: node}

    myInfo = MagicMock()
    iface.myInfo = myInfo

    iface.localNode.localConfig.lora.CopyFrom(config_pb2.Config.LoRaConfig())

    # Also get some coverage of the structured logging/power meter stuff by turning it on as well
    log_set = LogSet(iface, None, SimPowerSupply())

    iface.showInfo()
    iface.localNode.showInfo()
    iface.showNodes()
    iface.sendText("hello")
    iface.close()
    log_set.close()
    out, err = capsys.readouterr()
    assert re.search(r"Owner: None \(None\)", out, re.MULTILINE)
    assert re.search(r"Nodes", out, re.MULTILINE)
    assert re.search(r"Preferences", out, re.MULTILINE)
    assert re.search(r"Channels", out, re.MULTILINE)
    assert re.search(r"Primary channel URL", out, re.MULTILINE)
    assert err == ""
def test_MeshInterface_sendToRadioImpl(caplog)

Test _sendToRadioImp()

Expand source code
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_MeshInterface_sendToRadioImpl(caplog):
    """Test _sendToRadioImp()"""
    iface = MeshInterface(noProto=True)
    with caplog.at_level(logging.DEBUG):
        iface._sendToRadioImpl("foo")
    assert re.search(r"Subclass must provide toradio", caplog.text, re.MULTILINE)
    iface.close()
def test_MeshInterface_sendToRadio_no_proto(caplog)

Test sendToRadio()

Expand source code
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_MeshInterface_sendToRadio_no_proto(caplog):
    """Test sendToRadio()"""
    iface = MeshInterface()
    with caplog.at_level(logging.DEBUG):
        iface._sendToRadioImpl("foo")
    assert re.search(r"Subclass must provide toradio", caplog.text, re.MULTILINE)
    iface.close()
def test_exit_with_exception(caplog)

Test exit()

Expand source code
@pytest.mark.unit
def test_exit_with_exception(caplog):
    """Test __exit__()"""
    iface = MeshInterface(noProto=True)
    with caplog.at_level(logging.ERROR):
        iface.__exit__("foo", "bar", "baz")
        assert re.search(
            r"An exception of type foo with value bar has occurred",
            caplog.text,
            re.MULTILINE,
        )
        assert re.search(r"Traceback: baz", caplog.text, re.MULTILINE)
def test_fixupPosition()

Test _fixupPosition()

Expand source code
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_fixupPosition():
    """Test _fixupPosition()"""
    iface = MeshInterface(noProto=True)
    pos = {"latitudeI": 1010000000, "longitudeI": 1020000000}
    newpos = iface._fixupPosition(pos)
    assert newpos == {
        "latitude": 101.0,
        "latitudeI": 1010000000,
        "longitude": 102.0,
        "longitudeI": 1020000000,
    }
def test_fixupPosition_empty_pos()

Test _fixupPosition()

Expand source code
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_fixupPosition_empty_pos():
    """Test _fixupPosition()"""
    iface = MeshInterface(noProto=True)
    pos = {}
    newpos = iface._fixupPosition(pos)
    assert newpos == pos
def test_fixupPosition_no_changes_needed()

Test _fixupPosition()

Expand source code
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_fixupPosition_no_changes_needed():
    """Test _fixupPosition()"""
    iface = MeshInterface(noProto=True)
    pos = {"latitude": 101, "longitude": 102}
    newpos = iface._fixupPosition(pos)
    assert newpos == pos
def test_generatePacketId(capsys)

Test _generatePacketId() when no currentPacketId (not connected)

Expand source code
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_generatePacketId(capsys):
    """Test _generatePacketId() when no currentPacketId (not connected)"""
    iface = MeshInterface(noProto=True)
    # not sure when this condition would ever happen... but we can simulate it
    iface.currentPacketId = None
    assert iface.currentPacketId is None
    with pytest.raises(MeshInterface.MeshInterfaceError) as pytest_wrapped_e:
        iface._generatePacketId()
        out, err = capsys.readouterr()
        assert re.search(
            r"Not connected yet, can not generate packet", out, re.MULTILINE
        )
        assert err == ""
    assert pytest_wrapped_e.type == MeshInterface.MeshInterfaceError
def test_getLongName(iface_with_nodes)

Test getLongName()

Expand source code
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_getLongName(iface_with_nodes):
    """Test getLongName()"""
    iface = iface_with_nodes
    iface.myInfo.my_node_num = 2475227164
    mylongname = iface.getLongName()
    assert mylongname == "Unknown f81c"
def test_getMyNodeInfo()

Test getMyNodeInfo()

Expand source code
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_getMyNodeInfo():
    """Test getMyNodeInfo()"""
    iface = MeshInterface(noProto=True)
    anode = iface.getNode(LOCAL_ADDR)
    iface.nodesByNum = {1: anode}
    assert iface.nodesByNum.get(1) == anode
    myInfo = MagicMock()
    iface.myInfo = myInfo
    iface.myInfo.my_node_num = 1
    myinfo = iface.getMyNodeInfo()
    assert myinfo == anode
def test_getMyUser(iface_with_nodes)

Test getMyUser()

Expand source code
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_getMyUser(iface_with_nodes):
    """Test getMyUser()"""
    iface = iface_with_nodes
    iface.myInfo.my_node_num = 2475227164
    myuser = iface.getMyUser()
    assert myuser is not None
    assert myuser["id"] == "!9388f81c"
def test_getNode_not_local(caplog)

Test getNode not local

Expand source code
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_getNode_not_local(caplog):
    """Test getNode not local"""
    iface = MeshInterface(noProto=True)
    anode = MagicMock(autospec=Node)
    with caplog.at_level(logging.DEBUG):
        with patch("meshtastic.node.Node", return_value=anode):
            another_node = iface.getNode("bar2")
            assert another_node != iface.localNode
    assert re.search(r"About to requestChannels", caplog.text, re.MULTILINE)
def test_getNode_not_local_timeout(capsys)

Test getNode not local, simulate timeout

Expand source code
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_getNode_not_local_timeout(capsys):
    """Test getNode not local, simulate timeout"""
    iface = MeshInterface(noProto=True)
    anode = MagicMock(autospec=Node)
    anode.waitForConfig.return_value = False
    with patch("meshtastic.node.Node", return_value=anode):
        with pytest.raises(SystemExit) as pytest_wrapped_e:
            iface.getNode("bar2")
        assert pytest_wrapped_e.type == SystemExit
        assert pytest_wrapped_e.value.code == 1
        out, err = capsys.readouterr()
        assert re.match(r"Timed out trying to retrieve channel info, retrying", out)
        assert err == ""
def test_getNode_not_local_timeout_attempts(capsys)

Test getNode not local, simulate timeout

Expand source code
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_getNode_not_local_timeout_attempts(capsys):
    """Test getNode not local, simulate timeout"""
    iface = MeshInterface(noProto=True)
    anode = MagicMock(autospec=Node)
    anode.waitForConfig.return_value = False
    with patch("meshtastic.node.Node", return_value=anode):
        with pytest.raises(SystemExit) as pytest_wrapped_e:
            iface.getNode("bar2", requestChannelAttempts=2)
        assert pytest_wrapped_e.type == SystemExit
        assert pytest_wrapped_e.value.code == 1
        out, err = capsys.readouterr()
        assert out == 'Timed out trying to retrieve channel info, retrying\nError: Timed out waiting for channels, giving up\n'
        assert err == ""
def test_getNode_with_local()

Test getNode

Expand source code
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_getNode_with_local():
    """Test getNode"""
    iface = MeshInterface(noProto=True)
    anode = iface.getNode(LOCAL_ADDR)
    assert anode == iface.localNode
def test_getOrCreateByNum(iface_with_nodes)

Test _getOrCreateByNum()

Expand source code
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_getOrCreateByNum(iface_with_nodes):
    """Test _getOrCreateByNum()"""
    iface = iface_with_nodes
    iface.myInfo.my_node_num = 2475227164
    tmp = iface._getOrCreateByNum(2475227164)
    assert tmp["num"] == 2475227164
def test_getOrCreateByNum_minimal(iface_with_nodes)

Test _getOrCreateByNum()

Expand source code
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_getOrCreateByNum_minimal(iface_with_nodes):
    """Test _getOrCreateByNum()"""
    iface = iface_with_nodes
    iface.myInfo.my_node_num = 2475227164
    tmp = iface._getOrCreateByNum(123)
    assert tmp == {"num": 123, "user": {"hwModel": "UNSET", "id": "!0000007b", "shortName": "007b", "longName": "Meshtastic 007b"}}
def test_getOrCreateByNum_not_found(iface_with_nodes)

Test _getOrCreateByNum()

Expand source code
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_getOrCreateByNum_not_found(iface_with_nodes):
    """Test _getOrCreateByNum()"""
    iface = iface_with_nodes
    iface.myInfo.my_node_num = 2475227164
    with pytest.raises(MeshInterface.MeshInterfaceError) as pytest_wrapped_e:
        iface._getOrCreateByNum(0xFFFFFFFF)
    assert pytest_wrapped_e.type == MeshInterface.MeshInterfaceError
def test_getShortName(iface_with_nodes)

Test getShortName().

Expand source code
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_getShortName(iface_with_nodes):
    """Test getShortName()."""
    iface = iface_with_nodes
    iface.myInfo.my_node_num = 2475227164
    myshortname = iface.getShortName()
    assert myshortname == "?1C"
def test_handleFromRadio_with_my_info(caplog)

Test _handleFromRadio with my_info

Expand source code
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_handleFromRadio_with_my_info(caplog):
    """Test _handleFromRadio with my_info"""
    # Note: I captured the '--debug --info' for the bytes below.
    # It "translates" to this:
    # my_info {
    #  my_node_num: 682584012
    #  firmware_version: "1.2.49.5354c49"
    #  reboot_count: 13
    #  bitrate: 17.088470458984375
    #  message_timeout_msec: 300000
    #  min_app_version: 20200
    #  max_channels: 8
    #  has_wifi: true
    # }
    from_radio_bytes = b"\x1a,\x08\xcc\xcf\xbd\xc5\x02\x18\r2\x0e1.2.49.5354c49P\r]0\xb5\x88Ah\xe0\xa7\x12p\xe8\x9d\x01x\x08\x90\x01\x01"
    iface = MeshInterface(noProto=True)
    with caplog.at_level(logging.DEBUG):
        iface._handleFromRadio(from_radio_bytes)
    iface.close()
    assert re.search(r"Received from radio: my_info {", caplog.text, re.MULTILINE)
    assert re.search(r"my_node_num: 682584012", caplog.text, re.MULTILINE)
def test_handleFromRadio_with_node_info(caplog, capsys)

Test _handleFromRadio with node_info

Expand source code
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_handleFromRadio_with_node_info(caplog, capsys):
    """Test _handleFromRadio with node_info"""
    # Note: I captured the '--debug --info' for the bytes below.
    # It "translates" to this:
    # node_info {
    #  num: 682584012
    #  user {
    #    id: "!28af67cc"
    #    long_name: "Unknown 67cc"
    #    short_name: "?CC"
    #    macaddr: "$o(\257g\314"
    #    hw_model: HELTEC_V2_1
    #  }
    #  position {
    #    }
    #  }

    from_radio_bytes = b'"2\x08\xcc\xcf\xbd\xc5\x02\x12(\n\t!28af67cc\x12\x0cUnknown 67cc\x1a\x03?CC"\x06$o(\xafg\xcc0\n\x1a\x00'
    iface = MeshInterface(noProto=True)
    with caplog.at_level(logging.DEBUG):
        iface._startConfig()
        iface._handleFromRadio(from_radio_bytes)
        assert re.search(r"Received from radio: node_info {", caplog.text, re.MULTILINE)
        assert re.search(r"682584012", caplog.text, re.MULTILINE)
        # validate some of showNodes() output
        iface.showNodes()
        out, err = capsys.readouterr()
        assert re.search(r" 1 ", out, re.MULTILINE)
        assert re.search(r"│ Unknown 67cc │ ", out, re.MULTILINE)
        assert re.search(r"│\s+!28af67cc\s+│\s+67cc\s+|", out, re.MULTILINE)
        assert err == ""
        iface.close()
def test_handleFromRadio_with_node_info_tbeam1(caplog, capsys)

Test _handleFromRadio with node_info

Expand source code
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_handleFromRadio_with_node_info_tbeam1(caplog, capsys):
    """Test _handleFromRadio with node_info"""
    # Note: Captured the '--debug --info' for the bytes below.
    # pylint: disable=C0301
    from_radio_bytes = b'"=\x08\x80\xf8\xc8\xf6\x07\x12"\n\t!7ed23c00\x12\x07TBeam 1\x1a\x02T1"\x06\x94\xb9~\xd2<\x000\x04\x1a\x07 ]MN\x01\xbea%\xad\x01\xbea=\x00\x00,A'
    iface = MeshInterface(noProto=True)
    with caplog.at_level(logging.DEBUG):
        iface._startConfig()
        iface._handleFromRadio(from_radio_bytes)
        assert re.search(r"Received nodeinfo", caplog.text, re.MULTILINE)
        assert re.search(r"TBeam 1", caplog.text, re.MULTILINE)
        assert re.search(r"2127707136", caplog.text, re.MULTILINE)
        # validate some of showNodes() output
        iface.showNodes()
        out, err = capsys.readouterr()
        assert re.search(r" 1 ", out, re.MULTILINE)
        assert re.search(r"│ TBeam 1 │ ", out, re.MULTILINE)
        assert re.search(r"│ !7ed23c00 │", out, re.MULTILINE)
        assert err == ""
        iface.close()
def test_handleFromRadio_with_node_info_tbeam_with_bad_data(caplog)

Test _handleFromRadio with node_info with some bad data (issue#172) - ensure we do not throw exception

Expand source code
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_handleFromRadio_with_node_info_tbeam_with_bad_data(caplog):
    """Test _handleFromRadio with node_info with some bad data (issue#172) - ensure we do not throw exception"""
    # Note: Captured the '--debug --info' for the bytes below.
    from_radio_bytes = b'"\x17\x08\xdc\x8a\x8a\xae\x02\x12\x08"\x06\x00\x00\x00\x00\x00\x00\x1a\x00=\x00\x00\xb8@'
    iface = MeshInterface(noProto=True)
    with caplog.at_level(logging.DEBUG):
        iface._startConfig()
        iface._handleFromRadio(from_radio_bytes)
def test_handlePacketFromRadio_no_from(capsys)

Test _handlePacketFromRadio with no 'from' in the mesh packet.

Expand source code
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_handlePacketFromRadio_no_from(capsys):
    """Test _handlePacketFromRadio with no 'from' in the mesh packet."""
    iface = MeshInterface(noProto=True)
    meshPacket = mesh_pb2.MeshPacket()
    iface._handlePacketFromRadio(meshPacket)
    out, err = capsys.readouterr()
    assert re.search(r"Device returned a packet we sent, ignoring", out, re.MULTILINE)
    assert err == ""
def test_handlePacketFromRadio_no_portnum(caplog)

Test _handlePacketFromRadio without a portnum

Expand source code
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_handlePacketFromRadio_no_portnum(caplog):
    """Test _handlePacketFromRadio without a portnum"""
    iface = MeshInterface(noProto=True)
    meshPacket = mesh_pb2.MeshPacket()
    meshPacket.decoded.payload = b""
    with caplog.at_level(logging.WARNING):
        iface._handlePacketFromRadio(meshPacket, hack=True)
    assert re.search(r"Not populating fromId", caplog.text, re.MULTILINE)
def test_handlePacketFromRadio_with_a_portnum(caplog)

Test _handlePacketFromRadio with a portnum Since we have an attribute called 'from', we cannot simply 'set' it. Had to implement a hack just to be able to test some code.

Expand source code
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_handlePacketFromRadio_with_a_portnum(caplog):
    """Test _handlePacketFromRadio with a portnum
    Since we have an attribute called 'from', we cannot simply 'set' it.
    Had to implement a hack just to be able to test some code.
    """
    iface = MeshInterface(noProto=True)
    meshPacket = mesh_pb2.MeshPacket()
    meshPacket.decoded.payload = b""
    meshPacket.decoded.portnum = 1
    with caplog.at_level(logging.WARNING):
        iface._handlePacketFromRadio(meshPacket, hack=True)
    assert re.search(r"Not populating fromId", caplog.text, re.MULTILINE)
def test_nodeNumToId(iface_with_nodes)

Test _nodeNumToId()

Expand source code
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_nodeNumToId(iface_with_nodes):
    """Test _nodeNumToId()"""
    iface = iface_with_nodes
    iface.myInfo.my_node_num = 2475227164
    someid = iface._nodeNumToId(2475227164)
    assert someid == "!9388f81c"
def test_nodeNumToId_not_found(iface_with_nodes)

Test _nodeNumToId()

Expand source code
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_nodeNumToId_not_found(iface_with_nodes):
    """Test _nodeNumToId()"""
    iface = iface_with_nodes
    iface.myInfo.my_node_num = 2475227164
    someid = iface._nodeNumToId(123)
    assert someid is None
def test_nodeNumToId_to_all(iface_with_nodes)

Test _nodeNumToId()

Expand source code
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_nodeNumToId_to_all(iface_with_nodes):
    """Test _nodeNumToId()"""
    iface = iface_with_nodes
    iface.myInfo.my_node_num = 2475227164
    someid = iface._nodeNumToId(0xFFFFFFFF)
    assert someid == "^all"
def test_sendData_too_long(caplog)

Test when data payload is too big

Expand source code
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_sendData_too_long(caplog):
    """Test when data payload is too big"""
    iface = MeshInterface(noProto=True)
    some_large_text = b"This is a long text that will be too long for send text."
    some_large_text += b"This is a long text that will be too long for send text."
    some_large_text += b"This is a long text that will be too long for send text."
    some_large_text += b"This is a long text that will be too long for send text."
    some_large_text += b"This is a long text that will be too long for send text."
    some_large_text += b"This is a long text that will be too long for send text."
    some_large_text += b"This is a long text that will be too long for send text."
    some_large_text += b"This is a long text that will be too long for send text."
    some_large_text += b"This is a long text that will be too long for send text."
    some_large_text += b"This is a long text that will be too long for send text."
    some_large_text += b"This is a long text that will be too long for send text."
    some_large_text += b"This is a long text that will be too long for send text."
    with caplog.at_level(logging.DEBUG):
        with pytest.raises(MeshInterface.MeshInterfaceError) as pytest_wrapped_e:
            iface.sendData(some_large_text)
            assert re.search("Data payload too big", caplog.text, re.MULTILINE)
        assert pytest_wrapped_e.type == MeshInterface.MeshInterfaceError
    iface.close()
def test_sendData_unknown_app(capsys)

Test sendData when unknown app

Expand source code
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_sendData_unknown_app(capsys):
    """Test sendData when unknown app"""
    iface = MeshInterface(noProto=True)
    with pytest.raises(SystemExit) as pytest_wrapped_e:
        iface.sendData(b"hello", portNum=0)
    out, err = capsys.readouterr()
    assert re.search(r"Warning: A non-zero port number", out, re.MULTILINE)
    assert err == ""
    assert pytest_wrapped_e.type == SystemExit
    assert pytest_wrapped_e.value.code == 1
def test_sendPacket_with_destination_as_BROADCAST_ADDR(caplog)

Test _sendPacket() with BROADCAST_ADDR as a destination

Expand source code
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_sendPacket_with_destination_as_BROADCAST_ADDR(caplog):
    """Test _sendPacket() with BROADCAST_ADDR as a destination"""
    iface = MeshInterface(noProto=True)
    with caplog.at_level(logging.DEBUG):
        meshPacket = mesh_pb2.MeshPacket()
        iface._sendPacket(meshPacket, destinationId=BROADCAST_ADDR)
        assert re.search(r"Not sending packet", caplog.text, re.MULTILINE)
def test_sendPacket_with_destination_as_LOCAL_ADDR_no_myInfo(capsys)

Test _sendPacket() with LOCAL_ADDR as a destination with no myInfo

Expand source code
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_sendPacket_with_destination_as_LOCAL_ADDR_no_myInfo(capsys):
    """Test _sendPacket() with LOCAL_ADDR as a destination with no myInfo"""
    iface = MeshInterface(noProto=True)
    with pytest.raises(SystemExit) as pytest_wrapped_e:
        meshPacket = mesh_pb2.MeshPacket()
        iface._sendPacket(meshPacket, destinationId=LOCAL_ADDR)
    out, err = capsys.readouterr()
    assert re.search(r"Warning: No myInfo", out, re.MULTILINE)
    assert err == ""
    assert pytest_wrapped_e.type == SystemExit
    assert pytest_wrapped_e.value.code == 1
def test_sendPacket_with_destination_as_LOCAL_ADDR_with_myInfo(caplog)

Test _sendPacket() with LOCAL_ADDR as a destination with myInfo

Expand source code
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_sendPacket_with_destination_as_LOCAL_ADDR_with_myInfo(caplog):
    """Test _sendPacket() with LOCAL_ADDR as a destination with myInfo"""
    iface = MeshInterface(noProto=True)
    myInfo = MagicMock()
    iface.myInfo = myInfo
    iface.myInfo.my_node_num = 1
    with caplog.at_level(logging.DEBUG):
        meshPacket = mesh_pb2.MeshPacket()
        iface._sendPacket(meshPacket, destinationId=LOCAL_ADDR)
        assert re.search(r"Not sending packet", caplog.text, re.MULTILINE)
def test_sendPacket_with_destination_as_int(caplog)

Test _sendPacket() with int as a destination

Expand source code
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_sendPacket_with_destination_as_int(caplog):
    """Test _sendPacket() with int as a destination"""
    iface = MeshInterface(noProto=True)
    with caplog.at_level(logging.DEBUG):
        meshPacket = mesh_pb2.MeshPacket()
        iface._sendPacket(meshPacket, destinationId=123)
        assert re.search(r"Not sending packet", caplog.text, re.MULTILINE)
def test_sendPacket_with_destination_is_blank_with_nodes(capsys, iface_with_nodes)

Test _sendPacket() with '' as a destination with myInfo

Expand source code
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_sendPacket_with_destination_is_blank_with_nodes(capsys, iface_with_nodes):
    """Test _sendPacket() with '' as a destination with myInfo"""
    iface = iface_with_nodes
    meshPacket = mesh_pb2.MeshPacket()
    with pytest.raises(SystemExit) as pytest_wrapped_e:
        iface._sendPacket(meshPacket, destinationId="")
    assert pytest_wrapped_e.type == SystemExit
    assert pytest_wrapped_e.value.code == 1
    out, err = capsys.readouterr()
    assert re.match(r"Warning: NodeId  not found in DB", out, re.MULTILINE)
    assert err == ""
def test_sendPacket_with_destination_is_blank_without_nodes(caplog, iface_with_nodes)

Test _sendPacket() with '' as a destination with myInfo

Expand source code
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_sendPacket_with_destination_is_blank_without_nodes(caplog, iface_with_nodes):
    """Test _sendPacket() with '' as a destination with myInfo"""
    iface = iface_with_nodes
    iface.nodes = None
    meshPacket = mesh_pb2.MeshPacket()
    with caplog.at_level(logging.WARNING):
        iface._sendPacket(meshPacket, destinationId="")
    assert re.search(r"Warning: There were no self.nodes.", caplog.text, re.MULTILINE)
def test_sendPacket_with_destination_starting_with_a_bang(caplog)

Test _sendPacket() with int as a destination

Expand source code
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_sendPacket_with_destination_starting_with_a_bang(caplog):
    """Test _sendPacket() with int as a destination"""
    iface = MeshInterface(noProto=True)
    with caplog.at_level(logging.DEBUG):
        meshPacket = mesh_pb2.MeshPacket()
        iface._sendPacket(meshPacket, destinationId="!1234")
        assert re.search(r"Not sending packet", caplog.text, re.MULTILINE)
def test_sendPacket_with_no_destination(capsys)

Test _sendPacket()

Expand source code
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_sendPacket_with_no_destination(capsys):
    """Test _sendPacket()"""
    iface = MeshInterface(noProto=True)
    with pytest.raises(SystemExit) as pytest_wrapped_e:
        iface._sendPacket(b"", destinationId=None)
    out, err = capsys.readouterr()
    assert re.search(r"Warning: destinationId must not be None", out, re.MULTILINE)
    assert err == ""
    assert pytest_wrapped_e.type == SystemExit
    assert pytest_wrapped_e.value.code == 1
def test_sendPosition(caplog)

Test sendPosition

Expand source code
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_sendPosition(caplog):
    """Test sendPosition"""
    iface = MeshInterface(noProto=True)
    with caplog.at_level(logging.DEBUG):
        iface.sendPosition()
    iface.close()
    # assert re.search(r"p.time:", caplog.text, re.MULTILINE)
def test_sendPosition_with_a_position(caplog)

Test sendPosition when lat/long/alt

Expand source code
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_sendPosition_with_a_position(caplog):
    """Test sendPosition when lat/long/alt"""
    iface = MeshInterface(noProto=True)
    with caplog.at_level(logging.DEBUG):
        iface.sendPosition(latitude=40.8, longitude=-111.86, altitude=201)
        assert re.search(r"p.latitude_i:408", caplog.text, re.MULTILINE)
        assert re.search(r"p.longitude_i:-11186", caplog.text, re.MULTILINE)
        assert re.search(r"p.altitude:201", caplog.text, re.MULTILINE)
def test_showNodes_exclude_self(capsys, caplog, iface_with_nodes)

Test that we hit that continue statement

Expand source code
@pytest.mark.unit
def test_showNodes_exclude_self(capsys, caplog, iface_with_nodes):
    """Test that we hit that continue statement"""
    with caplog.at_level(logging.DEBUG):
        iface = iface_with_nodes
        iface.localNode.nodeNum = 2475227164
        iface.showNodes()
        iface.showNodes(includeSelf=False)
        capsys.readouterr()
def test_timeago()

Test that the _timeago function returns sane values

Expand source code
@pytest.mark.unit
def test_timeago():
    """Test that the _timeago function returns sane values"""
    assert _timeago(0) == "now"
    assert _timeago(1) == "1 sec ago"
    assert _timeago(15) == "15 secs ago"
    assert _timeago(333) == "5 mins ago"
    assert _timeago(99999) == "1 day ago"
    assert _timeago(9999999) == "3 months ago"
    assert _timeago(-999) == "now"
def test_timeago_fuzz() ‑> None

Fuzz _timeago to ensure it works with any integer

Expand source code
@given(seconds=st.integers())
def test_timeago_fuzz(seconds):
    """Fuzz _timeago to ensure it works with any integer"""
    val = _timeago(seconds)
    assert re.match(r"(now|\d+ (secs?|mins?|hours?|days?|months?|years?))", val)
def test_waitConnected_isConnected_timeout(capsys)

Test waitConnected()

Expand source code
@pytest.mark.unit
def test_waitConnected_isConnected_timeout(capsys):
    """Test waitConnected()"""
    with pytest.raises(MeshInterface.MeshInterfaceError) as pytest_wrapped_e:
        iface = MeshInterface()
        iface._waitConnected(0.01)
        assert pytest_wrapped_e.type == MeshInterface.MeshInterfaceError
        out, err = capsys.readouterr()
        assert re.search(r"warn about something", err, re.MULTILINE)
        assert out == ""
def test_waitConnected_raises_an_exception(capsys)

Test waitConnected()

Expand source code
@pytest.mark.unit
def test_waitConnected_raises_an_exception(capsys):
    """Test waitConnected()"""
    iface = MeshInterface(noProto=True)
    with pytest.raises(MeshInterface.MeshInterfaceError) as pytest_wrapped_e:
        iface.failure = MeshInterface.MeshInterfaceError("warn about something")
        iface._waitConnected(0.01)
        assert pytest_wrapped_e.type == MeshInterface.MeshInterfaceError
        out, err = capsys.readouterr()
        assert re.search(r"warn about something", err, re.MULTILINE)
        assert out == ""
def test_waitForConfig(capsys)

Test waitForConfig()

Expand source code
@pytest.mark.unitslow
def test_waitForConfig(capsys):
    """Test waitForConfig()"""
    iface = MeshInterface(noProto=True)
    # override how long to wait
    iface._timeout = Timeout(0.01)
    with pytest.raises(MeshInterface.MeshInterfaceError) as pytest_wrapped_e:
        iface.waitForConfig()
        assert pytest_wrapped_e.type == MeshInterface.MeshInterfaceError
        out, err = capsys.readouterr()
        assert re.search(
            r"Exception: Timed out waiting for interface config", err, re.MULTILINE
        )
        assert out == ""