Module meshtastic.tests.test_node

Meshtastic unit tests for node.py

Expand source code
"""Meshtastic unit tests for node.py"""

import logging
import re
from unittest.mock import MagicMock, patch

import pytest

from ..protobuf import admin_pb2, localonly_pb2, config_pb2
from ..protobuf.channel_pb2 import Channel # pylint: disable=E0611
from ..node import Node
from ..serial_interface import SerialInterface
from ..mesh_interface import MeshInterface

# from ..config_pb2 import Config
# from ..cannedmessages_pb2 import (CannedMessagePluginMessagePart1, CannedMessagePluginMessagePart2,
#                                  CannedMessagePluginMessagePart3, CannedMessagePluginMessagePart4,
#                                  CannedMessagePluginMessagePart5)
# from ..util import Timeout


@pytest.mark.unit
def test_node(capsys):
    """Test that we can instantiate a Node"""
    iface = MagicMock(autospec=SerialInterface)
    with patch("meshtastic.serial_interface.SerialInterface", return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, "bar", noProto=True)
        lc = localonly_pb2.LocalConfig()
        anode.localConfig = lc
        lc.lora.CopyFrom(config_pb2.Config.LoRaConfig())
        anode.moduleConfig = localonly_pb2.LocalModuleConfig()
        anode.showInfo()
        out, err = capsys.readouterr()
        assert re.search(r'Preferences', out)
        assert re.search(r'Module preferences', out)
        assert re.search(r'Channels', out)
        assert re.search(r'Primary channel URL', out)
        assert not re.search(r'remote node', out)
        assert err == ''

# TODO
# @pytest.mark.unit
# def test_node_requestConfig(capsys):
#    """Test run requestConfig"""
#    iface = MagicMock(autospec=SerialInterface)
#    amesg = MagicMock(autospec=AdminMessage)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
#            anode = Node(mo, 'bar')
#            anode.requestConfig()
#    out, err = capsys.readouterr()
#    assert re.search(r'Requesting preferences from remote node', out, re.MULTILINE)
#    assert err == ''


# @pytest.mark.unit
# def test_node_get_canned_message_with_all_parts(capsys):
#    """Test run get_canned_message()"""
#    iface = MagicMock(autospec=SerialInterface)
#    amesg = MagicMock(autospec=AdminMessage)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
#            # we have a sleep in this method, so override it so it goes fast
#            with patch('time.sleep'):
#                anode = Node(mo, 'bar')
#                anode.cannedPluginMessagePart1 = 'a'
#                anode.cannedPluginMessagePart2 = 'b'
#                anode.cannedPluginMessagePart3 = 'c'
#                anode.cannedPluginMessagePart4 = 'd'
#                anode.cannedPluginMessagePart5 = 'e'
#                anode.get_canned_message()
#    out, err = capsys.readouterr()
#    assert re.search(r'canned_plugin_message:abcde', out, re.MULTILINE)
#    assert err == ''
#
#
# @pytest.mark.unit
# def test_node_get_canned_message_with_some_parts(capsys):
#    """Test run get_canned_message()"""
#    iface = MagicMock(autospec=SerialInterface)
#    amesg = MagicMock(autospec=AdminMessage)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
#            # we have a sleep in this method, so override it so it goes fast
#            with patch('time.sleep'):
#                anode = Node(mo, 'bar')
#                anode.cannedPluginMessagePart1 = 'a'
#                anode.get_canned_message()
#    out, err = capsys.readouterr()
#    assert re.search(r'canned_plugin_message:a', out, re.MULTILINE)
#    assert err == ''
#
#
# @pytest.mark.unit
# def test_node_set_canned_message_one_part(caplog):
#    """Test run set_canned_message()"""
#    iface = MagicMock(autospec=SerialInterface)
#    amesg = MagicMock(autospec=AdminMessage)
#    with caplog.at_level(logging.DEBUG):
#        with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#            with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
#                anode = Node(mo, 'bar')
#                anode.set_canned_message('foo')
#    assert re.search(r"Setting canned message 'foo' part 1", caplog.text, re.MULTILINE)
#    assert not re.search(r"Setting canned message '' part 2", caplog.text, re.MULTILINE)
#
#
# @pytest.mark.unit
# def test_node_set_canned_message_200(caplog):
#    """Test run set_canned_message() 200 characters long"""
#    iface = MagicMock(autospec=SerialInterface)
#    amesg = MagicMock(autospec=AdminMessage)
#    with caplog.at_level(logging.DEBUG):
#        with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#            with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
#                anode = Node(mo, 'bar')
#                message_200_chars_long = 'a' * 200
#                anode.set_canned_message(message_200_chars_long)
#    assert re.search(r" part 1", caplog.text, re.MULTILINE)
#    assert not re.search(r"Setting canned message '' part 2", caplog.text, re.MULTILINE)
#
#
# @pytest.mark.unit
# def test_node_set_canned_message_201(caplog):
#    """Test run set_canned_message() 201 characters long"""
#    iface = MagicMock(autospec=SerialInterface)
#    amesg = MagicMock(autospec=AdminMessage)
#    with caplog.at_level(logging.DEBUG):
#        with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#            with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
#                anode = Node(mo, 'bar')
#                message_201_chars_long = 'a' * 201
#                anode.set_canned_message(message_201_chars_long)
#    assert re.search(r" part 1", caplog.text, re.MULTILINE)
#    assert re.search(r"Setting canned message 'a' part 2", caplog.text, re.MULTILINE)
#
#
# @pytest.mark.unit
# def test_node_set_canned_message_1000(caplog):
#    """Test run set_canned_message() 1000 characters long"""
#    iface = MagicMock(autospec=SerialInterface)
#    amesg = MagicMock(autospec=AdminMessage)
#    with caplog.at_level(logging.DEBUG):
#        with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#            with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
#                anode = Node(mo, 'bar')
#                message_1000_chars_long = 'a' * 1000
#                anode.set_canned_message(message_1000_chars_long)
#    assert re.search(r" part 1", caplog.text, re.MULTILINE)
#    assert re.search(r" part 2", caplog.text, re.MULTILINE)
#    assert re.search(r" part 3", caplog.text, re.MULTILINE)
#    assert re.search(r" part 4", caplog.text, re.MULTILINE)
#    assert re.search(r" part 5", caplog.text, re.MULTILINE)
#
#
# @pytest.mark.unit
# def test_node_set_canned_message_1001(capsys):
#    """Test run set_canned_message() 1001 characters long"""
#    iface = MagicMock(autospec=SerialInterface)
#    with pytest.raises(SystemExit) as pytest_wrapped_e:
#        with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#            anode = Node(mo, 'bar')
#            message_1001_chars_long = 'a' * 1001
#            anode.set_canned_message(message_1001_chars_long)
#    assert pytest_wrapped_e.type == SystemExit
#    assert pytest_wrapped_e.value.code == 1
#    out, err = capsys.readouterr()
#    assert re.search(r'Warning: The canned message', out, re.MULTILINE)
#    assert err == ''


# TODO
# @pytest.mark.unit
# def test_setOwnerShort(caplog):
#    """Test setOwner"""
#    anode = Node('foo', 'bar', noProto=True)
#    with caplog.at_level(logging.DEBUG):
#        anode.setOwner(long_name=None, short_name='123')
#    assert re.search(r'p.set_owner.short_name:123:', caplog.text, re.MULTILINE)


# TODO
# @pytest.mark.unit
# def test_setOwner_no_short_name(caplog):
#    """Test setOwner"""
#    anode = Node('foo', 'bar', noProto=True)
#    with caplog.at_level(logging.DEBUG):
#        anode.setOwner(long_name ='Test123')
#    assert re.search(r'p.set_owner.long_name:Test123:', caplog.text, re.MULTILINE)
#    assert re.search(r'p.set_owner.short_name:Tst:', caplog.text, re.MULTILINE)
#    assert re.search(r'p.set_owner.is_licensed:False', caplog.text, re.MULTILINE)


# TODO
# @pytest.mark.unit
# def test_setOwner_no_short_name_and_long_name_is_short(caplog):
#    """Test setOwner"""
#    anode = Node('foo', 'bar', noProto=True)
#    with caplog.at_level(logging.DEBUG):
#        anode.setOwner(long_name ='Tnt')
#    assert re.search(r'p.set_owner.long_name:Tnt:', caplog.text, re.MULTILINE)
#    assert re.search(r'p.set_owner.short_name:Tnt:', caplog.text, re.MULTILINE)
#    assert re.search(r'p.set_owner.is_licensed:False', caplog.text, re.MULTILINE)


# TODO
# @pytest.mark.unit
# def test_setOwner_no_short_name_and_long_name_has_words(caplog):
#    """Test setOwner"""
#    anode = Node('foo', 'bar', noProto=True)
#    with caplog.at_level(logging.DEBUG):
#        anode.setOwner(long_name ='A B C', is_licensed=True)
#    assert re.search(r'p.set_owner.long_name:A B C:', caplog.text, re.MULTILINE)
#    assert re.search(r'p.set_owner.short_name:ABC:', caplog.text, re.MULTILINE)
#    assert re.search(r'p.set_owner.is_licensed:True', caplog.text, re.MULTILINE)


# TODO
# @pytest.mark.unit
# def test_setOwner_long_name_no_short(caplog):
#    """Test setOwner"""
#    anode = Node('foo', 'bar', noProto=True)
#    with caplog.at_level(logging.DEBUG):
#        anode.setOwner(long_name ='Aabo', is_licensed=True)
#    assert re.search(r'p.set_owner.long_name:Aabo:', caplog.text, re.MULTILINE)
#    assert re.search(r'p.set_owner.short_name:Aab:', caplog.text, re.MULTILINE)


@pytest.mark.unit
def test_exitSimulator(caplog):
    """Test exitSimulator"""
    interface = MeshInterface()
    interface.nodesByNum = {}
    anode = Node(interface, "!ba400000", noProto=True)
    with caplog.at_level(logging.DEBUG):
        anode.exitSimulator()
    assert re.search(r"in exitSimulator", caplog.text, re.MULTILINE)


@pytest.mark.unit
def test_reboot(caplog):
    """Test reboot"""
    interface = MeshInterface()
    interface.nodesByNum = {}
    anode = Node(interface, 1234567890, noProto=True)
    with caplog.at_level(logging.DEBUG):
        anode.reboot()
    assert re.search(r"Telling node to reboot", caplog.text, re.MULTILINE)


@pytest.mark.unit
def test_shutdown(caplog):
    """Test shutdown"""
    interface = MeshInterface()
    interface.nodesByNum = {}
    anode = Node(interface, 1234567890, noProto=True)
    with caplog.at_level(logging.DEBUG):
        anode.shutdown()
    assert re.search(r"Telling node to shutdown", caplog.text, re.MULTILINE)


@pytest.mark.unit
def test_setURL_empty_url(capsys):
    """Test reboot"""
    anode = Node("foo", "bar", noProto=True)
    with pytest.raises(SystemExit) as pytest_wrapped_e:
        anode.setURL("")
    assert pytest_wrapped_e.type == SystemExit
    assert pytest_wrapped_e.value.code == 1
    out, err = capsys.readouterr()
    assert re.search(r"Warning: config or channels not loaded", out, re.MULTILINE)
    assert err == ""


# TODO
# @pytest.mark.unit
# def test_setURL_valid_URL(caplog):
#    """Test setURL"""
#    iface = MagicMock(autospec=SerialInterface)
#    url = "https://www.meshtastic.org/d/#CgUYAyIBAQ"
#    with caplog.at_level(logging.DEBUG):
#        anode = Node(iface, 'bar', noProto=True)
#        anode.radioConfig = 'baz'
#        channels = ['zoo']
#        anode.channels = channels
#        anode.setURL(url)
#    assert re.search(r'Channel i:0', caplog.text, re.MULTILINE)
#    assert re.search(r'modem_config: MidSlow', caplog.text, re.MULTILINE)
#    assert re.search(r'psk: "\\001"', caplog.text, re.MULTILINE)
#    assert re.search(r'role: PRIMARY', caplog.text, re.MULTILINE)


@pytest.mark.unit
def test_setURL_valid_URL_but_no_settings(capsys):
    """Test setURL"""
    iface = MagicMock(autospec=SerialInterface)
    url = "https://www.meshtastic.org/d/#"
    with pytest.raises(SystemExit) as pytest_wrapped_e:
        anode = Node(iface, "bar", noProto=True)
        anode.radioConfig = "baz"
        anode.setURL(url)
    assert pytest_wrapped_e.type == SystemExit
    assert pytest_wrapped_e.value.code == 1
    out, err = capsys.readouterr()
    assert re.search(r"Warning: config or channels not loaded", out, re.MULTILINE)
    assert err == ""


# TODO
# @pytest.mark.unit
# def test_showChannels(capsys):
#    """Test showChannels"""
#    anode = Node('foo', 'bar')
#
#    # primary channel
#    # role: 0=Disabled, 1=Primary, 2=Secondary
#    # modem_config: 0-5
#    # role: 0=Disabled, 1=Primary, 2=Secondary
#    channel1 = Channel(index=1, role=1)
#    channel1.settings.modem_config = 3
#    channel1.settings.psk = b'\x01'
#
#    channel2 = Channel(index=2, role=2)
#    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
#    channel2.settings.name = 'testing'
#
#    channel3 = Channel(index=3, role=0)
#    channel4 = Channel(index=4, role=0)
#    channel5 = Channel(index=5, role=0)
#    channel6 = Channel(index=6, role=0)
#    channel7 = Channel(index=7, role=0)
#    channel8 = Channel(index=8, role=0)
#
#    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]
#
#    anode.channels = channels
#    anode.showChannels()
#    out, err = capsys.readouterr()
#    assert re.search(r'Channels:', out, re.MULTILINE)
#    # primary channel
#    assert re.search(r'Primary channel URL', out, re.MULTILINE)
#    assert re.search(r'PRIMARY psk=default ', out, re.MULTILINE)
#    assert re.search(r'"modemConfig": "MidSlow"', out, re.MULTILINE)
#    assert re.search(r'"psk": "AQ=="', out, re.MULTILINE)
#    # secondary channel
#    assert re.search(r'SECONDARY psk=secret ', out, re.MULTILINE)
#    assert re.search(r'"psk": "ipR5DsbJHjWREkCmMKi0M4cA8ksO539Bes31sJAwqDQ="', out, re.MULTILINE)
#    assert err == ''


@pytest.mark.unit
def test_getChannelByChannelIndex():
    """Test getChannelByChannelIndex()"""
    anode = Node("foo", "bar")

    channel1 = Channel(index=1, role=1)  # primary channel
    channel2 = Channel(index=2, role=2)  # secondary channel
    channel3 = Channel(index=3, role=0)
    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [
        channel1,
        channel2,
        channel3,
        channel4,
        channel5,
        channel6,
        channel7,
        channel8,
    ]

    anode.channels = channels

    # test primary
    assert anode.getChannelByChannelIndex(0) is not None
    # test secondary
    assert anode.getChannelByChannelIndex(1) is not None
    # test disabled
    assert anode.getChannelByChannelIndex(2) is not None
    # test invalid values
    assert anode.getChannelByChannelIndex(-1) is None
    assert anode.getChannelByChannelIndex(9) is None


# TODO
# @pytest.mark.unit
# def test_deleteChannel_try_to_delete_primary_channel(capsys):
#    """Try to delete primary channel."""
#    anode = Node('foo', 'bar')
#
#    channel1 = Channel(index=1, role=1)
#    channel1.settings.modem_config = 3
#    channel1.settings.psk = b'\x01'
#
#    # no secondary channels
#    channel2 = Channel(index=2, role=0)
#    channel3 = Channel(index=3, role=0)
#    channel4 = Channel(index=4, role=0)
#    channel5 = Channel(index=5, role=0)
#    channel6 = Channel(index=6, role=0)
#    channel7 = Channel(index=7, role=0)
#    channel8 = Channel(index=8, role=0)
#
#    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]
#
#    anode.channels = channels
#    with pytest.raises(SystemExit) as pytest_wrapped_e:
#        anode.deleteChannel(0)
#    assert pytest_wrapped_e.type == SystemExit
#    assert pytest_wrapped_e.value.code == 1
#    out, err = capsys.readouterr()
#    assert re.search(r'Warning: Only SECONDARY channels can be deleted', out, re.MULTILINE)
#    assert err == ''


# TODO
# @pytest.mark.unit
# def test_deleteChannel_secondary():
#    """Try to delete a secondary channel."""
#
#    channel1 = Channel(index=1, role=1)
#    channel1.settings.modem_config = 3
#    channel1.settings.psk = b'\x01'
#
#    channel2 = Channel(index=2, role=2)
#    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
#    channel2.settings.name = 'testing'
#
#    channel3 = Channel(index=3, role=0)
#    channel4 = Channel(index=4, role=0)
#    channel5 = Channel(index=5, role=0)
#    channel6 = Channel(index=6, role=0)
#    channel7 = Channel(index=7, role=0)
#    channel8 = Channel(index=8, role=0)
#
#    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]
#
#
#    iface = MagicMock(autospec=SerialInterface)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        mo.localNode.getChannelByName.return_value = None
#        mo.myInfo.max_channels = 8
#        anode = Node(mo, 'bar', noProto=True)
#
#        anode.channels = channels
#        assert len(anode.channels) == 8
#        assert channels[0].settings.modem_config == 3
#        assert channels[1].settings.name == 'testing'
#        assert channels[2].settings.name == ''
#        assert channels[3].settings.name == ''
#        assert channels[4].settings.name == ''
#        assert channels[5].settings.name == ''
#        assert channels[6].settings.name == ''
#        assert channels[7].settings.name == ''
#
#        anode.deleteChannel(1)
#
#        assert len(anode.channels) == 8
#        assert channels[0].settings.modem_config == 3
#        assert channels[1].settings.name == ''
#        assert channels[2].settings.name == ''
#        assert channels[3].settings.name == ''
#        assert channels[4].settings.name == ''
#        assert channels[5].settings.name == ''
#        assert channels[6].settings.name == ''
#        assert channels[7].settings.name == ''


# TODO
# @pytest.mark.unit
# def test_deleteChannel_secondary_with_admin_channel_after_testing():
#    """Try to delete a secondary channel where there is an admin channel."""
#
#    channel1 = Channel(index=1, role=1)
#    channel1.settings.modem_config = 3
#    channel1.settings.psk = b'\x01'
#
#    channel2 = Channel(index=2, role=2)
#    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
#    channel2.settings.name = 'testing'
#
#    channel3 = Channel(index=3, role=2)
#    channel3.settings.name = 'admin'
#
#    channel4 = Channel(index=4, role=0)
#    channel5 = Channel(index=5, role=0)
#    channel6 = Channel(index=6, role=0)
#    channel7 = Channel(index=7, role=0)
#    channel8 = Channel(index=8, role=0)
#
#    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]
#
#
#    iface = MagicMock(autospec=SerialInterface)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        mo.localNode.getChannelByName.return_value = None
#        mo.myInfo.max_channels = 8
#        anode = Node(mo, 'bar', noProto=True)
#
#        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
#        mo.localNode = anode
#
#        assert mo.localNode == anode
#
#        anode.channels = channels
#        assert len(anode.channels) == 8
#        assert channels[0].settings.modem_config == 3
#        assert channels[1].settings.name == 'testing'
#        assert channels[2].settings.name == 'admin'
#        assert channels[3].settings.name == ''
#        assert channels[4].settings.name == ''
#        assert channels[5].settings.name == ''
#        assert channels[6].settings.name == ''
#        assert channels[7].settings.name == ''
#
#        anode.deleteChannel(1)
#
#        assert len(anode.channels) == 8
#        assert channels[0].settings.modem_config == 3
#        assert channels[1].settings.name == 'admin'
#        assert channels[2].settings.name == ''
#        assert channels[3].settings.name == ''
#        assert channels[4].settings.name == ''
#        assert channels[5].settings.name == ''
#        assert channels[6].settings.name == ''
#        assert channels[7].settings.name == ''


# TODO
# @pytest.mark.unit
# def test_deleteChannel_secondary_with_admin_channel_before_testing():
#    """Try to delete a secondary channel where there is an admin channel."""
#
#    channel1 = Channel(index=1, role=1)
#    channel1.settings.modem_config = 3
#    channel1.settings.psk = b'\x01'
#
#    channel2 = Channel(index=2, role=2)
#    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
#    channel2.settings.name = 'admin'
#
#    channel3 = Channel(index=3, role=2)
#    channel3.settings.name = 'testing'
#
#    channel4 = Channel(index=4, role=0)
#    channel5 = Channel(index=5, role=0)
#    channel6 = Channel(index=6, role=0)
#    channel7 = Channel(index=7, role=0)
#    channel8 = Channel(index=8, role=0)
#
#    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]
#
#
#    iface = MagicMock(autospec=SerialInterface)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        mo.localNode.getChannelByName.return_value = None
#        mo.myInfo.max_channels = 8
#        anode = Node(mo, 'bar', noProto=True)
#
#        anode.channels = channels
#        assert len(anode.channels) == 8
#        assert channels[0].settings.modem_config == 3
#        assert channels[1].settings.name == 'admin'
#        assert channels[2].settings.name == 'testing'
#        assert channels[3].settings.name == ''
#        assert channels[4].settings.name == ''
#        assert channels[5].settings.name == ''
#        assert channels[6].settings.name == ''
#        assert channels[7].settings.name == ''
#
#        anode.deleteChannel(2)
#
#        assert len(anode.channels) == 8
#        assert channels[0].settings.modem_config == 3
#        assert channels[1].settings.name == 'admin'
#        assert channels[2].settings.name == ''
#        assert channels[3].settings.name == ''
#        assert channels[4].settings.name == ''
#        assert channels[5].settings.name == ''
#        assert channels[6].settings.name == ''
#        assert channels[7].settings.name == ''
#
#
# @pytest.mark.unit
# def test_getChannelByName():
#    """Get a channel by the name."""
#    anode = Node('foo', 'bar')
#
#    channel1 = Channel(index=1, role=1)
#    channel1.settings.modem_config = 3
#    channel1.settings.psk = b'\x01'
#
#    channel2 = Channel(index=2, role=2)
#    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
#    channel2.settings.name = 'admin'
#
#    channel3 = Channel(index=3, role=0)
#    channel4 = Channel(index=4, role=0)
#    channel5 = Channel(index=5, role=0)
#    channel6 = Channel(index=6, role=0)
#    channel7 = Channel(index=7, role=0)
#    channel8 = Channel(index=8, role=0)
#
#    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]
#
#    anode.channels = channels
#    ch = anode.getChannelByName('admin')
#    assert ch.index == 2


# TODO
# @pytest.mark.unit
# def test_getChannelByName_invalid_name():
#    """Get a channel by the name but one that is not present."""
#    anode = Node('foo', 'bar')
#
#    channel1 = Channel(index=1, role=1)
#    channel1.settings.modem_config = 3
#    channel1.settings.psk = b'\x01'
#
#    channel2 = Channel(index=2, role=2)
#    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
#    channel2.settings.name = 'admin'
#
#    channel3 = Channel(index=3, role=0)
#    channel4 = Channel(index=4, role=0)
#    channel5 = Channel(index=5, role=0)
#    channel6 = Channel(index=6, role=0)
#    channel7 = Channel(index=7, role=0)
#    channel8 = Channel(index=8, role=0)
#
#    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]
#
#    anode.channels = channels
#    ch = anode.getChannelByName('testing')
#    assert ch is None
#
#
# @pytest.mark.unit
# def test_getDisabledChannel():
#    """Get the first disabled channel."""
#    anode = Node('foo', 'bar')
#
#    channel1 = Channel(index=1, role=1)
#    channel1.settings.modem_config = 3
#    channel1.settings.psk = b'\x01'
#
#    channel2 = Channel(index=2, role=2)
#    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
#    channel2.settings.name = 'testingA'
#
#    channel3 = Channel(index=3, role=2)
#    channel3.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
#    channel3.settings.name = 'testingB'
#
#    channel4 = Channel(index=4, role=0)
#    channel5 = Channel(index=5, role=0)
#    channel6 = Channel(index=6, role=0)
#    channel7 = Channel(index=7, role=0)
#    channel8 = Channel(index=8, role=0)
#
#    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]
#
#    anode.channels = channels
#    ch = anode.getDisabledChannel()
#    assert ch.index == 4


# TODO
# @pytest.mark.unit
# def test_getDisabledChannel_where_all_channels_are_used():
#    """Get the first disabled channel."""
#    anode = Node('foo', 'bar')
#
#    channel1 = Channel(index=1, role=1)
#    channel1.settings.modem_config = 3
#    channel1.settings.psk = b'\x01'
#
#    channel2 = Channel(index=2, role=2)
#    channel3 = Channel(index=3, role=2)
#    channel4 = Channel(index=4, role=2)
#    channel5 = Channel(index=5, role=2)
#    channel6 = Channel(index=6, role=2)
#    channel7 = Channel(index=7, role=2)
#    channel8 = Channel(index=8, role=2)
#
#    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]
#
#    anode.channels = channels
#    ch = anode.getDisabledChannel()
#    assert ch is None


# TODO
# @pytest.mark.unit
# def test_getAdminChannelIndex():
#    """Get the 'admin' channel index."""
#    anode = Node('foo', 'bar')
#
#    channel1 = Channel(index=1, role=1)
#    channel1.settings.modem_config = 3
#    channel1.settings.psk = b'\x01'
#
#    channel2 = Channel(index=2, role=2)
#    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
#    channel2.settings.name = 'admin'
#
#    channel3 = Channel(index=3, role=0)
#    channel4 = Channel(index=4, role=0)
#    channel5 = Channel(index=5, role=0)
#    channel6 = Channel(index=6, role=0)
#    channel7 = Channel(index=7, role=0)
#    channel8 = Channel(index=8, role=0)
#
#    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]
#
#    anode.channels = channels
#    i = anode._getAdminChannelIndex()
#    assert i == 2


# TODO
# @pytest.mark.unit
# def test_getAdminChannelIndex_when_no_admin_named_channel():
#    """Get the 'admin' channel when there is not one."""
#    anode = Node('foo', 'bar')
#
#    channel1 = Channel(index=1, role=1)
#    channel1.settings.modem_config = 3
#    channel1.settings.psk = b'\x01'
#
#    channel2 = Channel(index=2, role=0)
#    channel3 = Channel(index=3, role=0)
#    channel4 = Channel(index=4, role=0)
#    channel5 = Channel(index=5, role=0)
#    channel6 = Channel(index=6, role=0)
#    channel7 = Channel(index=7, role=0)
#    channel8 = Channel(index=8, role=0)
#
#    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]
#
#    anode.channels = channels
#    i = anode._getAdminChannelIndex()
#    assert i == 0


# TODO
# TODO: should we check if we need to turn it off?
# @pytest.mark.unit
# def test_turnOffEncryptionOnPrimaryChannel(capsys):
#    """Turn off encryption when there is a psk."""
#    anode = Node('foo', 'bar', noProto=True)
#
#    channel1 = Channel(index=1, role=1)
#    channel1.settings.modem_config = 3
#    # value from using "--ch-set psk 0x1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b "
#    channel1.settings.psk = b'\x1a\x1a\x1a\x1a++++\x1a\x1a\x1a\x1a++++\x1a\x1a\x1a\x1a++++\x1a\x1a\x1a\x1a++++'
#
#    channel2 = Channel(index=2, role=0)
#    channel3 = Channel(index=3, role=0)
#    channel4 = Channel(index=4, role=0)
#    channel5 = Channel(index=5, role=0)
#    channel6 = Channel(index=6, role=0)
#    channel7 = Channel(index=7, role=0)
#    channel8 = Channel(index=8, role=0)
#
#    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]
#
#    anode.channels = channels
#    anode.turnOffEncryptionOnPrimaryChannel()
#    out, err = capsys.readouterr()
#    assert re.search(r'Writing modified channels to device', out)
#    assert err == ''


@pytest.mark.unit
def test_writeConfig_with_no_radioConfig(capsys):
    """Test writeConfig with no radioConfig."""
    anode = Node("foo", "bar", noProto=True)

    with pytest.raises(SystemExit) as pytest_wrapped_e:
        anode.writeConfig('foo')
    assert pytest_wrapped_e.type == SystemExit
    assert pytest_wrapped_e.value.code == 1
    out, err = capsys.readouterr()
    print(out)
    assert re.search(r"Error: No valid config with name foo", out)
    assert err == ""


# TODO
# @pytest.mark.unit
# def test_writeConfig(caplog):
#    """Test writeConfig"""
#    anode = Node('foo', 'bar', noProto=True)
#    radioConfig = RadioConfig()
#    anode.radioConfig = radioConfig
#
#    with caplog.at_level(logging.DEBUG):
#        anode.writeConfig()
#    assert re.search(r'Wrote config', caplog.text, re.MULTILINE)


@pytest.mark.unit
def test_requestChannel_not_localNode(caplog, capsys):
    """Test _requestChannel()"""
    iface = MagicMock(autospec=SerialInterface)
    with patch("meshtastic.serial_interface.SerialInterface", return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, "bar", noProto=True)
        with caplog.at_level(logging.DEBUG):
            anode._requestChannel(0)
            assert re.search(
                r"Requesting channel 0 info from remote node", caplog.text, re.MULTILINE
            )
        out, err = capsys.readouterr()
        assert re.search(r"Requesting channel 0 info", out, re.MULTILINE)
        assert err == ""


@pytest.mark.unit
def test_requestChannel_localNode(caplog):
    """Test _requestChannel()"""
    iface = MagicMock(autospec=SerialInterface)
    with patch("meshtastic.serial_interface.SerialInterface", return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, "bar", noProto=True)

        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
        mo.localNode = anode

        with caplog.at_level(logging.DEBUG):
            anode._requestChannel(0)
            assert re.search(r"Requesting channel 0", caplog.text, re.MULTILINE)
            assert not re.search(r"from remote node", caplog.text, re.MULTILINE)

@pytest.mark.unit
def test_requestChannels_non_localNode(caplog):
    """Test requestChannels() with a starting index of 0"""
    iface = MagicMock(autospec=SerialInterface)
    with patch("meshtastic.serial_interface.SerialInterface", return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, "bar", noProto=True)
        anode.partialChannels = ['0']
        with caplog.at_level(logging.DEBUG):
            anode.requestChannels(0)
            assert re.search(f"Requesting channel 0 info from remote node", caplog.text, re.MULTILINE)
            assert anode.partialChannels == []

@pytest.mark.unit
def test_requestChannels_non_localNode_starting_index(caplog):
    """Test requestChannels() with a starting index of non-0"""
    iface = MagicMock(autospec=SerialInterface)
    with patch("meshtastic.serial_interface.SerialInterface", return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, "bar", noProto=True)
        anode.partialChannels = ['1']
        with caplog.at_level(logging.DEBUG):
            anode.requestChannels(3)
            assert re.search(f"Requesting channel 3 info from remote node", caplog.text, re.MULTILINE)
            # make sure it hasn't been initialized
            assert anode.partialChannels == ['1']

# @pytest.mark.unit
# def test_onResponseRequestCannedMessagePluginMesagePart1(caplog):
#    """Test onResponseRequestCannedMessagePluginMessagePart1()"""
#
#    part1 = CannedMessagePluginMessagePart1()
#    part1.text = 'foo1'
#
#    msg1 = MagicMock(autospec=AdminMessage)
#    msg1.get_canned_message_plugin_part1_response = part1
#
#    packet = {
#            'from': 682968612,
#            'to': 682968612,
#            'decoded': {
#                'portnum': 'ADMIN_APP',
#                'payload': 'faked',
#                'requestId': 927039000,
#                'admin': {
#                    'getCannedMessagePluginPart1Response': {'text': 'foo1'},
#                    'raw': msg1
#                    }
#                },
#            'id': 589440320,
#            'rxTime': 1642710843,
#            'hopLimit': 3,
#            'priority': 'RELIABLE',
#            'raw': 'faked',
#            'fromId': '!28b54624',
#            'toId': '!28b54624'
#            }
#
#    iface = MagicMock(autospec=SerialInterface)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        anode = Node(mo, 'bar', noProto=True)
#        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
#        mo.localNode = anode
#
#        with caplog.at_level(logging.DEBUG):
#            anode.onResponseRequestCannedMessagePluginMessagePart1(packet)
#            assert re.search(r'onResponseRequestCannedMessagePluginMessagePart1', caplog.text, re.MULTILINE)
#            assert anode.cannedPluginMessagePart1 == 'foo1'


# @pytest.mark.unit
# def test_onResponseRequestCannedMessagePluginMesagePart2(caplog):
#    """Test onResponseRequestCannedMessagePluginMessagePart2()"""
#
#    part2 = CannedMessagePluginMessagePart2()
#    part2.text = 'foo2'
#
#    msg2 = MagicMock(autospec=AdminMessage)
#    msg2.get_canned_message_plugin_part2_response = part2
#
#    packet = {
#            'from': 682968612,
#            'to': 682968612,
#            'decoded': {
#                'portnum': 'ADMIN_APP',
#                'payload': 'faked',
#                'requestId': 927039000,
#                'admin': {
#                    'getCannedMessagePluginPart2Response': {'text': 'foo2'},
#                    'raw': msg2
#                    }
#                },
#            'id': 589440320,
#            'rxTime': 1642710843,
#            'hopLimit': 3,
#            'priority': 'RELIABLE',
#            'raw': 'faked',
#            'fromId': '!28b54624',
#            'toId': '!28b54624'
#            }
#
#    iface = MagicMock(autospec=SerialInterface)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        anode = Node(mo, 'bar', noProto=True)
#        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
#        mo.localNode = anode
#
#        with caplog.at_level(logging.DEBUG):
#            anode.onResponseRequestCannedMessagePluginMessagePart2(packet)
#            assert re.search(r'onResponseRequestCannedMessagePluginMessagePart2', caplog.text, re.MULTILINE)
#            assert anode.cannedPluginMessagePart2 == 'foo2'


# @pytest.mark.unit
# def test_onResponseRequestCannedMessagePluginMesagePart3(caplog):
#    """Test onResponseRequestCannedMessagePluginMessagePart3()"""
#
#    part3 = CannedMessagePluginMessagePart3()
#    part3.text = 'foo3'
#
#    msg3 = MagicMock(autospec=AdminMessage)
#    msg3.get_canned_message_plugin_part3_response = part3
#
#    packet = {
#            'from': 682968612,
#            'to': 682968612,
#            'decoded': {
#                'portnum': 'ADMIN_APP',
#                'payload': 'faked',
#                'requestId': 927039000,
#                'admin': {
#                    'getCannedMessagePluginPart3Response': {'text': 'foo3'},
#                    'raw': msg3
#                    }
#                },
#            'id': 589440320,
#            'rxTime': 1642710843,
#            'hopLimit': 3,
#            'priority': 'RELIABLE',
#            'raw': 'faked',
#            'fromId': '!28b54624',
#            'toId': '!28b54624'
#            }
#
#    iface = MagicMock(autospec=SerialInterface)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        anode = Node(mo, 'bar', noProto=True)
#        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
#        mo.localNode = anode
#
#        with caplog.at_level(logging.DEBUG):
#            anode.onResponseRequestCannedMessagePluginMessagePart3(packet)
#            assert re.search(r'onResponseRequestCannedMessagePluginMessagePart3', caplog.text, re.MULTILINE)
#            assert anode.cannedPluginMessagePart3 == 'foo3'


# @pytest.mark.unit
# def test_onResponseRequestCannedMessagePluginMesagePart4(caplog):
#    """Test onResponseRequestCannedMessagePluginMessagePart4()"""
#
#    part4 = CannedMessagePluginMessagePart4()
#    part4.text = 'foo4'
#
#    msg4 = MagicMock(autospec=AdminMessage)
#    msg4.get_canned_message_plugin_part4_response = part4
#
#    packet = {
#            'from': 682968612,
#            'to': 682968612,
#            'decoded': {
#                'portnum': 'ADMIN_APP',
#                'payload': 'faked',
#                'requestId': 927039000,
#                'admin': {
#                    'getCannedMessagePluginPart4Response': {'text': 'foo4'},
#                    'raw': msg4
#                    }
#                },
#            'id': 589440320,
#            'rxTime': 1642710843,
#            'hopLimit': 3,
#            'priority': 'RELIABLE',
#            'raw': 'faked',
#            'fromId': '!28b54624',
#            'toId': '!28b54624'
#            }
#
#    iface = MagicMock(autospec=SerialInterface)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        anode = Node(mo, 'bar', noProto=True)
#        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
#        mo.localNode = anode
#
#        with caplog.at_level(logging.DEBUG):
#            anode.onResponseRequestCannedMessagePluginMessagePart4(packet)
#            assert re.search(r'onResponseRequestCannedMessagePluginMessagePart4', caplog.text, re.MULTILINE)
#            assert anode.cannedPluginMessagePart4 == 'foo4'


# @pytest.mark.unit
# def test_onResponseRequestCannedMessagePluginMesagePart5(caplog):
#    """Test onResponseRequestCannedMessagePluginMessagePart5()"""
#
#    part5 = CannedMessagePluginMessagePart5()
#    part5.text = 'foo5'
#
#    msg5 = MagicMock(autospec=AdminMessage)
#    msg5.get_canned_message_plugin_part5_response = part5
#
#
#    packet = {
#            'from': 682968612,
#            'to': 682968612,
#            'decoded': {
#                'portnum': 'ADMIN_APP',
#                'payload': 'faked',
#                'requestId': 927039000,
#                'admin': {
#                    'getCannedMessagePluginPart5Response': {'text': 'foo5'},
#                    'raw': msg5
#                    }
#                },
#            'id': 589440320,
#            'rxTime': 1642710843,
#            'hopLimit': 3,
#            'priority': 'RELIABLE',
#            'raw': 'faked',
#            'fromId': '!28b54624',
#            'toId': '!28b54624'
#            }
#
#    iface = MagicMock(autospec=SerialInterface)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        anode = Node(mo, 'bar', noProto=True)
#        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
#        mo.localNode = anode
#
#        with caplog.at_level(logging.DEBUG):
#            anode.onResponseRequestCannedMessagePluginMessagePart5(packet)
#            assert re.search(r'onResponseRequestCannedMessagePluginMessagePart5', caplog.text, re.MULTILINE)
#            assert anode.cannedPluginMessagePart5 == 'foo5'


# @pytest.mark.unit
# def test_onResponseRequestCannedMessagePluginMesagePart1_error(caplog, capsys):
#    """Test onResponseRequestCannedMessagePluginMessagePart1() with error"""
#
#    packet = {
#            'decoded': {
#                'routing': {
#                    'errorReason': 'some made up error',
#                    },
#                },
#            }
#
#    iface = MagicMock(autospec=SerialInterface)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        anode = Node(mo, 'bar', noProto=True)
#        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
#        mo.localNode = anode
#
#        with caplog.at_level(logging.DEBUG):
#            anode.onResponseRequestCannedMessagePluginMessagePart1(packet)
#            assert re.search(r'onResponseRequestCannedMessagePluginMessagePart1', caplog.text, re.MULTILINE)
#        out, err = capsys.readouterr()
#        assert re.search(r'Error on response', out)
#        assert err == ''


# @pytest.mark.unit
# def test_onResponseRequestCannedMessagePluginMesagePart2_error(caplog, capsys):
#    """Test onResponseRequestCannedMessagePluginMessagePart2() with error"""
#
#    packet = {
#            'decoded': {
#                'routing': {
#                    'errorReason': 'some made up error',
#                    },
#                },
#            }
#
#    iface = MagicMock(autospec=SerialInterface)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        anode = Node(mo, 'bar', noProto=True)
#        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
#        mo.localNode = anode
#
#        with caplog.at_level(logging.DEBUG):
#            anode.onResponseRequestCannedMessagePluginMessagePart2(packet)
#            assert re.search(r'onResponseRequestCannedMessagePluginMessagePart2', caplog.text, re.MULTILINE)
#        out, err = capsys.readouterr()
#        assert re.search(r'Error on response', out)
#        assert err == ''


# @pytest.mark.unit
# def test_onResponseRequestCannedMessagePluginMesagePart3_error(caplog, capsys):
#    """Test onResponseRequestCannedMessagePluginMessagePart3() with error"""
#
#    packet = {
#            'decoded': {
#                'routing': {
#                    'errorReason': 'some made up error',
#                    },
#                },
#            }
#
#    iface = MagicMock(autospec=SerialInterface)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        anode = Node(mo, 'bar', noProto=True)
#        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
#        mo.localNode = anode
#
#        with caplog.at_level(logging.DEBUG):
#            anode.onResponseRequestCannedMessagePluginMessagePart3(packet)
#            assert re.search(r'onResponseRequestCannedMessagePluginMessagePart3', caplog.text, re.MULTILINE)
#        out, err = capsys.readouterr()
#        assert re.search(r'Error on response', out)
#        assert err == ''
#
#
# @pytest.mark.unit
# def test_onResponseRequestCannedMessagePluginMesagePart4_error(caplog, capsys):
#    """Test onResponseRequestCannedMessagePluginMessagePart4() with error"""
#
#    packet = {
#            'decoded': {
#                'routing': {
#                    'errorReason': 'some made up error',
#                    },
#                },
#            }
#
#    iface = MagicMock(autospec=SerialInterface)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        anode = Node(mo, 'bar', noProto=True)
#        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
#        mo.localNode = anode
#
#        with caplog.at_level(logging.DEBUG):
#            anode.onResponseRequestCannedMessagePluginMessagePart4(packet)
#            assert re.search(r'onResponseRequestCannedMessagePluginMessagePart4', caplog.text, re.MULTILINE)
#        out, err = capsys.readouterr()
#        assert re.search(r'Error on response', out)
#        assert err == ''
#
#
# @pytest.mark.unit
# def test_onResponseRequestCannedMessagePluginMesagePart5_error(caplog, capsys):
#    """Test onResponseRequestCannedMessagePluginMessagePart5() with error"""
#
#    packet = {
#            'decoded': {
#                'routing': {
#                    'errorReason': 'some made up error',
#                    },
#                },
#            }
#
#    iface = MagicMock(autospec=SerialInterface)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        anode = Node(mo, 'bar', noProto=True)
#        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
#        mo.localNode = anode
#
#        with caplog.at_level(logging.DEBUG):
#            anode.onResponseRequestCannedMessagePluginMessagePart5(packet)
#            assert re.search(r'onResponseRequestCannedMessagePluginMessagePart5', caplog.text, re.MULTILINE)
#        out, err = capsys.readouterr()
#        assert re.search(r'Error on response', out)
#        assert err == ''


# TODO
# @pytest.mark.unit
# def test_onResponseRequestChannel(caplog):
#    """Test onResponseRequestChannel()"""
#
#    channel1 = Channel(index=1, role=1)
#    channel1.settings.modem_config = 3
#    channel1.settings.psk = b'\x01'
#
#    msg1 = MagicMock(autospec=AdminMessage)
#    msg1.get_channel_response = channel1
#
#    msg2 = MagicMock(autospec=AdminMessage)
#    channel2 = Channel(index=2, role=0) # disabled
#    msg2.get_channel_response = channel2
#
#    # default primary channel
#    packet1 = {
#        'from': 2475227164,
#        'to': 2475227164,
#        'decoded': {
#            'portnum': 'ADMIN_APP',
#            'payload': b':\t\x12\x05\x18\x03"\x01\x01\x18\x01',
#            'requestId': 2615094405,
#            'admin': {
#                'getChannelResponse': {
#                    'settings': {
#                        'modemConfig': 'Bw125Cr48Sf4096',
#                        'psk': 'AQ=='
#                    },
#                    'role': 'PRIMARY'
#                },
#                'raw': msg1,
#            }
#        },
#        'id': 1692918436,
#        'hopLimit': 3,
#        'priority':
#        'RELIABLE',
#        'raw': 'fake',
#        'fromId': '!9388f81c',
#        'toId': '!9388f81c'
#        }
#
#    # no other channels
#    packet2 = {
#        'from': 2475227164,
#        'to': 2475227164,
#        'decoded': {
#            'portnum': 'ADMIN_APP',
#            'payload': b':\x04\x08\x02\x12\x00',
#            'requestId': 743049663,
#            'admin': {
#                'getChannelResponse': {
#                    'index': 2,
#                    'settings': {}
#                },
#                'raw': msg2,
#            }
#        },
#        'id': 1692918456,
#        'rxTime': 1640202239,
#        'hopLimit': 3,
#        'priority': 'RELIABLE',
#        'raw': 'faked',
#        'fromId': '!9388f81c',
#        'toId': '!9388f81c'
#    }
#
#    iface = MagicMock(autospec=SerialInterface)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        mo.localNode.getChannelByName.return_value = None
#        mo.myInfo.max_channels = 8
#        anode = Node(mo, 'bar', noProto=True)
#
#        radioConfig = RadioConfig()
#        anode.radioConfig = radioConfig
#
#        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
#        mo.localNode = anode
#
#        with caplog.at_level(logging.DEBUG):
#            anode.requestConfig()
#            anode.onResponseRequestChannel(packet1)
#            assert re.search(r'Received channel', caplog.text, re.MULTILINE)
#            anode.onResponseRequestChannel(packet2)
#            assert re.search(r'Received channel', caplog.text, re.MULTILINE)
#            assert re.search(r'Finished downloading channels', caplog.text, re.MULTILINE)
#            assert len(anode.channels) == 8
#            assert anode.channels[0].settings.modem_config == 3
#            assert anode.channels[1].settings.name == ''
#            assert anode.channels[2].settings.name == ''
#            assert anode.channels[3].settings.name == ''
#            assert anode.channels[4].settings.name == ''
#            assert anode.channels[5].settings.name == ''
#            assert anode.channels[6].settings.name == ''
#            assert anode.channels[7].settings.name == ''


# TODO
# @pytest.mark.unit
# def test_onResponseRequestSetting(caplog):
#    """Test onResponseRequestSetting()"""
#    # Note: Split out the get_radio_response to a MagicMock
#    # so it could be "returned" (not really sure how to do that
#    # in a python dict.
#    amsg = MagicMock(autospec=AdminMessage)
#    amsg.get_radio_response = """{
#  preferences {
#    phone_timeout_secs: 900
#    ls_secs: 300
#    position_broadcast_smart: true
#    position_flags: 35
#  }
# }"""
#    packet = {
#        'from': 2475227164,
#        'to': 2475227164,
#        'decoded': {
#            'portnum': 'ADMIN_APP',
#            'payload': b'*\x0e\n\x0c0\x84\x07P\xac\x02\x88\x01\x01\xb0\t#',
#            'requestId': 3145147848,
#            'admin': {
#                'getRadioResponse': {
#                    'preferences': {
#                        'phoneTimeoutSecs': 900,
#                        'lsSecs': 300,
#                        'positionBroadcastSmart': True,
#                        'positionFlags': 35
#                     }
#                },
#                'raw': amsg
#            },
#            'id': 365963704,
#            'rxTime': 1640195197,
#            'hopLimit': 3,
#            'priority': 'RELIABLE',
#            'raw': 'faked',
#            'fromId': '!9388f81c',
#            'toId': '!9388f81c'
#        }
#    }
#    iface = MagicMock(autospec=SerialInterface)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        mo.localNode.getChannelByName.return_value = None
#        mo.myInfo.max_channels = 8
#        anode = Node(mo, 'bar', noProto=True)
#
#        radioConfig = RadioConfig()
#        anode.radioConfig = radioConfig
#
#        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
#        mo.localNode = anode
#
#        with caplog.at_level(logging.DEBUG):
#            anode.onResponseRequestSettings(packet)
#            assert re.search(r'Received radio config, now fetching channels..', caplog.text, re.MULTILINE)


# TODO
# @pytest.mark.unit
# def test_onResponseRequestSetting_with_error(capsys):
#    """Test onResponseRequestSetting() with an error"""
#    packet = {
#        'from': 2475227164,
#        'to': 2475227164,
#        'decoded': {
#            'portnum': 'ADMIN_APP',
#            'payload': b'*\x0e\n\x0c0\x84\x07P\xac\x02\x88\x01\x01\xb0\t#',
#            'requestId': 3145147848,
#            'routing': {
#                'errorReason': 'some made up error',
#            },
#            'admin': {
#                'getRadioResponse': {
#                    'preferences': {
#                        'phoneTimeoutSecs': 900,
#                        'lsSecs': 300,
#                        'positionBroadcastSmart': True,
#                        'positionFlags': 35
#                     }
#                },
#            },
#            'id': 365963704,
#            'rxTime': 1640195197,
#            'hopLimit': 3,
#            'priority': 'RELIABLE',
#            'fromId': '!9388f81c',
#            'toId': '!9388f81c'
#        }
#    }
#    iface = MagicMock(autospec=SerialInterface)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        mo.localNode.getChannelByName.return_value = None
#        mo.myInfo.max_channels = 8
#        anode = Node(mo, 'bar', noProto=True)
#
#        radioConfig = RadioConfig()
#        anode.radioConfig = radioConfig
#
#        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
#        mo.localNode = anode
#
#        anode.onResponseRequestSettings(packet)
#        out, err = capsys.readouterr()
#        assert re.search(r'Error on response', out)
#        assert err == ''


@pytest.mark.unit
@pytest.mark.parametrize("favorite", ["!1dec0ded", 502009325])
def test_set_favorite(favorite):
    """Test setFavorite"""
    iface = MagicMock(autospec=SerialInterface)
    node = Node(iface, 12345678)
    amesg = admin_pb2.AdminMessage()
    with patch("meshtastic.admin_pb2.AdminMessage", return_value=amesg):
        node.setFavorite(favorite)
    assert amesg.set_favorite_node == 502009325
    iface.sendData.assert_called_once()


@pytest.mark.unit
@pytest.mark.parametrize("favorite", ["!1dec0ded", 502009325])
def test_remove_favorite(favorite):
    """Test setFavorite"""
    iface = MagicMock(autospec=SerialInterface)
    node = Node(iface, 12345678)
    amesg = admin_pb2.AdminMessage()
    with patch("meshtastic.admin_pb2.AdminMessage", return_value=amesg):
        node.removeFavorite(favorite)

    assert amesg.remove_favorite_node == 502009325
    iface.sendData.assert_called_once()


@pytest.mark.unit
@pytest.mark.parametrize("ignored", ["!1dec0ded", 502009325])
def test_set_ignored(ignored):
    """Test setFavorite"""
    iface = MagicMock(autospec=SerialInterface)
    node = Node(iface, 12345678)
    amesg = admin_pb2.AdminMessage()
    with patch("meshtastic.admin_pb2.AdminMessage", return_value=amesg):
        node.setIgnored(ignored)
    assert amesg.set_ignored_node == 502009325
    iface.sendData.assert_called_once()


@pytest.mark.unit
@pytest.mark.parametrize("ignored", ["!1dec0ded", 502009325])
def test_remove_ignored(ignored):
    """Test setFavorite"""
    iface = MagicMock(autospec=SerialInterface)
    node = Node(iface, 12345678)
    amesg = admin_pb2.AdminMessage()
    with patch("meshtastic.admin_pb2.AdminMessage", return_value=amesg):
        node.removeIgnored(ignored)

    assert amesg.remove_ignored_node == 502009325
    iface.sendData.assert_called_once()


# TODO
# @pytest.mark.unitslow
# def test_waitForConfig():
#    """Test waitForConfig()"""
#    anode = Node('foo', 'bar')
#    radioConfig = RadioConfig()
#    anode.radioConfig = radioConfig
#    anode._timeout = Timeout(0.01)
#    result = anode.waitForConfig()
#    assert not result

Functions

def test_exitSimulator(caplog)

Test exitSimulator

Expand source code
@pytest.mark.unit
def test_exitSimulator(caplog):
    """Test exitSimulator"""
    interface = MeshInterface()
    interface.nodesByNum = {}
    anode = Node(interface, "!ba400000", noProto=True)
    with caplog.at_level(logging.DEBUG):
        anode.exitSimulator()
    assert re.search(r"in exitSimulator", caplog.text, re.MULTILINE)
def test_getChannelByChannelIndex()

Test getChannelByChannelIndex()

Expand source code
@pytest.mark.unit
def test_getChannelByChannelIndex():
    """Test getChannelByChannelIndex()"""
    anode = Node("foo", "bar")

    channel1 = Channel(index=1, role=1)  # primary channel
    channel2 = Channel(index=2, role=2)  # secondary channel
    channel3 = Channel(index=3, role=0)
    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [
        channel1,
        channel2,
        channel3,
        channel4,
        channel5,
        channel6,
        channel7,
        channel8,
    ]

    anode.channels = channels

    # test primary
    assert anode.getChannelByChannelIndex(0) is not None
    # test secondary
    assert anode.getChannelByChannelIndex(1) is not None
    # test disabled
    assert anode.getChannelByChannelIndex(2) is not None
    # test invalid values
    assert anode.getChannelByChannelIndex(-1) is None
    assert anode.getChannelByChannelIndex(9) is None
def test_node(capsys)

Test that we can instantiate a Node

Expand source code
@pytest.mark.unit
def test_node(capsys):
    """Test that we can instantiate a Node"""
    iface = MagicMock(autospec=SerialInterface)
    with patch("meshtastic.serial_interface.SerialInterface", return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, "bar", noProto=True)
        lc = localonly_pb2.LocalConfig()
        anode.localConfig = lc
        lc.lora.CopyFrom(config_pb2.Config.LoRaConfig())
        anode.moduleConfig = localonly_pb2.LocalModuleConfig()
        anode.showInfo()
        out, err = capsys.readouterr()
        assert re.search(r'Preferences', out)
        assert re.search(r'Module preferences', out)
        assert re.search(r'Channels', out)
        assert re.search(r'Primary channel URL', out)
        assert not re.search(r'remote node', out)
        assert err == ''
def test_reboot(caplog)

Test reboot

Expand source code
@pytest.mark.unit
def test_reboot(caplog):
    """Test reboot"""
    interface = MeshInterface()
    interface.nodesByNum = {}
    anode = Node(interface, 1234567890, noProto=True)
    with caplog.at_level(logging.DEBUG):
        anode.reboot()
    assert re.search(r"Telling node to reboot", caplog.text, re.MULTILINE)
def test_remove_favorite(favorite)

Test setFavorite

Expand source code
@pytest.mark.unit
@pytest.mark.parametrize("favorite", ["!1dec0ded", 502009325])
def test_remove_favorite(favorite):
    """Test setFavorite"""
    iface = MagicMock(autospec=SerialInterface)
    node = Node(iface, 12345678)
    amesg = admin_pb2.AdminMessage()
    with patch("meshtastic.admin_pb2.AdminMessage", return_value=amesg):
        node.removeFavorite(favorite)

    assert amesg.remove_favorite_node == 502009325
    iface.sendData.assert_called_once()
def test_remove_ignored(ignored)

Test setFavorite

Expand source code
@pytest.mark.unit
@pytest.mark.parametrize("ignored", ["!1dec0ded", 502009325])
def test_remove_ignored(ignored):
    """Test setFavorite"""
    iface = MagicMock(autospec=SerialInterface)
    node = Node(iface, 12345678)
    amesg = admin_pb2.AdminMessage()
    with patch("meshtastic.admin_pb2.AdminMessage", return_value=amesg):
        node.removeIgnored(ignored)

    assert amesg.remove_ignored_node == 502009325
    iface.sendData.assert_called_once()
def test_requestChannel_localNode(caplog)

Test _requestChannel()

Expand source code
@pytest.mark.unit
def test_requestChannel_localNode(caplog):
    """Test _requestChannel()"""
    iface = MagicMock(autospec=SerialInterface)
    with patch("meshtastic.serial_interface.SerialInterface", return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, "bar", noProto=True)

        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
        mo.localNode = anode

        with caplog.at_level(logging.DEBUG):
            anode._requestChannel(0)
            assert re.search(r"Requesting channel 0", caplog.text, re.MULTILINE)
            assert not re.search(r"from remote node", caplog.text, re.MULTILINE)
def test_requestChannel_not_localNode(caplog, capsys)

Test _requestChannel()

Expand source code
@pytest.mark.unit
def test_requestChannel_not_localNode(caplog, capsys):
    """Test _requestChannel()"""
    iface = MagicMock(autospec=SerialInterface)
    with patch("meshtastic.serial_interface.SerialInterface", return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, "bar", noProto=True)
        with caplog.at_level(logging.DEBUG):
            anode._requestChannel(0)
            assert re.search(
                r"Requesting channel 0 info from remote node", caplog.text, re.MULTILINE
            )
        out, err = capsys.readouterr()
        assert re.search(r"Requesting channel 0 info", out, re.MULTILINE)
        assert err == ""
def test_requestChannels_non_localNode(caplog)

Test requestChannels() with a starting index of 0

Expand source code
@pytest.mark.unit
def test_requestChannels_non_localNode(caplog):
    """Test requestChannels() with a starting index of 0"""
    iface = MagicMock(autospec=SerialInterface)
    with patch("meshtastic.serial_interface.SerialInterface", return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, "bar", noProto=True)
        anode.partialChannels = ['0']
        with caplog.at_level(logging.DEBUG):
            anode.requestChannels(0)
            assert re.search(f"Requesting channel 0 info from remote node", caplog.text, re.MULTILINE)
            assert anode.partialChannels == []
def test_requestChannels_non_localNode_starting_index(caplog)

Test requestChannels() with a starting index of non-0

Expand source code
@pytest.mark.unit
def test_requestChannels_non_localNode_starting_index(caplog):
    """Test requestChannels() with a starting index of non-0"""
    iface = MagicMock(autospec=SerialInterface)
    with patch("meshtastic.serial_interface.SerialInterface", return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, "bar", noProto=True)
        anode.partialChannels = ['1']
        with caplog.at_level(logging.DEBUG):
            anode.requestChannels(3)
            assert re.search(f"Requesting channel 3 info from remote node", caplog.text, re.MULTILINE)
            # make sure it hasn't been initialized
            assert anode.partialChannels == ['1']
def test_setURL_empty_url(capsys)

Test reboot

Expand source code
@pytest.mark.unit
def test_setURL_empty_url(capsys):
    """Test reboot"""
    anode = Node("foo", "bar", noProto=True)
    with pytest.raises(SystemExit) as pytest_wrapped_e:
        anode.setURL("")
    assert pytest_wrapped_e.type == SystemExit
    assert pytest_wrapped_e.value.code == 1
    out, err = capsys.readouterr()
    assert re.search(r"Warning: config or channels not loaded", out, re.MULTILINE)
    assert err == ""
def test_setURL_valid_URL_but_no_settings(capsys)

Test setURL

Expand source code
@pytest.mark.unit
def test_setURL_valid_URL_but_no_settings(capsys):
    """Test setURL"""
    iface = MagicMock(autospec=SerialInterface)
    url = "https://www.meshtastic.org/d/#"
    with pytest.raises(SystemExit) as pytest_wrapped_e:
        anode = Node(iface, "bar", noProto=True)
        anode.radioConfig = "baz"
        anode.setURL(url)
    assert pytest_wrapped_e.type == SystemExit
    assert pytest_wrapped_e.value.code == 1
    out, err = capsys.readouterr()
    assert re.search(r"Warning: config or channels not loaded", out, re.MULTILINE)
    assert err == ""
def test_set_favorite(favorite)

Test setFavorite

Expand source code
@pytest.mark.unit
@pytest.mark.parametrize("favorite", ["!1dec0ded", 502009325])
def test_set_favorite(favorite):
    """Test setFavorite"""
    iface = MagicMock(autospec=SerialInterface)
    node = Node(iface, 12345678)
    amesg = admin_pb2.AdminMessage()
    with patch("meshtastic.admin_pb2.AdminMessage", return_value=amesg):
        node.setFavorite(favorite)
    assert amesg.set_favorite_node == 502009325
    iface.sendData.assert_called_once()
def test_set_ignored(ignored)

Test setFavorite

Expand source code
@pytest.mark.unit
@pytest.mark.parametrize("ignored", ["!1dec0ded", 502009325])
def test_set_ignored(ignored):
    """Test setFavorite"""
    iface = MagicMock(autospec=SerialInterface)
    node = Node(iface, 12345678)
    amesg = admin_pb2.AdminMessage()
    with patch("meshtastic.admin_pb2.AdminMessage", return_value=amesg):
        node.setIgnored(ignored)
    assert amesg.set_ignored_node == 502009325
    iface.sendData.assert_called_once()
def test_shutdown(caplog)

Test shutdown

Expand source code
@pytest.mark.unit
def test_shutdown(caplog):
    """Test shutdown"""
    interface = MeshInterface()
    interface.nodesByNum = {}
    anode = Node(interface, 1234567890, noProto=True)
    with caplog.at_level(logging.DEBUG):
        anode.shutdown()
    assert re.search(r"Telling node to shutdown", caplog.text, re.MULTILINE)
def test_writeConfig_with_no_radioConfig(capsys)

Test writeConfig with no radioConfig.

Expand source code
@pytest.mark.unit
def test_writeConfig_with_no_radioConfig(capsys):
    """Test writeConfig with no radioConfig."""
    anode = Node("foo", "bar", noProto=True)

    with pytest.raises(SystemExit) as pytest_wrapped_e:
        anode.writeConfig('foo')
    assert pytest_wrapped_e.type == SystemExit
    assert pytest_wrapped_e.value.code == 1
    out, err = capsys.readouterr()
    print(out)
    assert re.search(r"Error: No valid config with name foo", out)
    assert err == ""