@@ -194,14 +194,15 @@ def test_rx_unknown_device_ieee(app):
194194
195195@pytest .fixture
196196def device (app ):
197- """Sample zigpee .Device fixture."""
197+ """Sample zigpy.device .Device fixture."""
198198
199- nwk = t . uint16_t ( 0x1234 )
200-
201- def _device ( new = False , zdo_init = False , nwk = nwk ):
199+ def _device (
200+ new = False , zdo_init = False , nwk = 0x1234 , ieee = b" \x08 \x07 \x06 \x05 \x04 \x03 \x02 \x01 "
201+ ):
202202 from zigpy .device import Device , Status as DeviceStatus
203203
204- ieee , _ = t .EUI64 .deserialize (b"\x08 \x07 \x06 \x05 \x04 \x03 \x02 \x01 " )
204+ nwk = t .uint16_t (nwk )
205+ ieee , _ = t .EUI64 .deserialize (ieee )
205206 dev = Device (app , ieee , nwk )
206207 if new :
207208 dev .status = DeviceStatus .NEW
@@ -732,3 +733,139 @@ async def test_energy_scan(app):
732733 25 : 7.264 ,
733734 26 : 3.844 ,
734735 }
736+
737+
738+ def test_neighbors_updated (app , device ):
739+ """Test LQI from neighbour scan."""
740+ router = device (ieee = b"\x01 \x02 \x03 \x04 \x05 \x06 \x07 \x08 " )
741+ router .radio_details = mock .MagicMock ()
742+ end_device = device (ieee = b"\x08 \x07 \x06 \x05 \x04 \x03 \x02 \x01 " )
743+ end_device .radio_details = mock .MagicMock ()
744+
745+ app .devices [router .ieee ] = router
746+ app .devices [end_device .ieee ] = end_device
747+
748+ pan_id = t .ExtendedPanId (b"\x07 \x07 \x07 \x07 \x07 \x07 \x07 \x07 " )
749+ # The router has two neighbors: the coordinator and the end device
750+ neighbors = [
751+ zdo_t .Neighbor (
752+ extended_pan_id = pan_id ,
753+ ieee = app .state .node_info .ieee ,
754+ nwk = app .state .node_info .nwk ,
755+ device_type = 0x0 ,
756+ rx_on_when_idle = 0x1 ,
757+ relationship = 0x00 ,
758+ reserved1 = 0x0 ,
759+ permit_joining = 0x0 ,
760+ reserved2 = 0x0 ,
761+ depth = 0 ,
762+ lqi = 128 ,
763+ ),
764+ zdo_t .Neighbor (
765+ extended_pan_id = pan_id ,
766+ ieee = end_device .ieee ,
767+ nwk = end_device .nwk ,
768+ device_type = 0x2 ,
769+ rx_on_when_idle = 0x0 ,
770+ relationship = 0x01 ,
771+ reserved1 = 0x0 ,
772+ permit_joining = 0x0 ,
773+ reserved2 = 0x0 ,
774+ depth = 2 ,
775+ lqi = 100 ,
776+ ),
777+ # Let's also include an unknown device
778+ zdo_t .Neighbor (
779+ extended_pan_id = pan_id ,
780+ ieee = t .EUI64 (b"\x00 \x0F \x0E \x0D \x0C \x0B \x0A \x09 " ),
781+ nwk = t .NWK (0x9999 ),
782+ device_type = 0x2 ,
783+ rx_on_when_idle = 0x0 ,
784+ relationship = 0x01 ,
785+ reserved1 = 0x0 ,
786+ permit_joining = 0x0 ,
787+ reserved2 = 0x0 ,
788+ depth = 2 ,
789+ lqi = 99 ,
790+ ),
791+ ]
792+
793+ app .neighbors_updated (router .ieee , neighbors )
794+
795+ router .radio_details .assert_called_once_with (lqi = 128 )
796+ end_device .radio_details .assert_called_once_with (lqi = 100 )
797+
798+
799+ def test_routes_updated_schedule (app ):
800+ """Test scheduling the sync routes_updated function."""
801+ app .create_task = mock .MagicMock ()
802+ app ._routes_updated = mock .MagicMock ()
803+
804+ ieee = t .EUI64 (b"\x01 \x02 \x03 \x04 \x05 \x06 \x07 \x08 " )
805+ routes = []
806+ app .routes_updated (ieee , routes )
807+
808+ assert app .create_task .call_count == 1
809+ app ._routes_updated .assert_called_once_with (ieee , routes )
810+
811+
812+ async def test_routes_updated (app , device ):
813+ """Test RSSI on routes scan update."""
814+ rssi = 0x50
815+ app ._api ._at_command = mock .AsyncMock (return_value = rssi )
816+
817+ router1 = device (ieee = b"\x01 \x02 \x03 \x04 \x05 \x06 \x07 \x08 " )
818+ router1 .radio_details = mock .MagicMock ()
819+ router2 = device (ieee = b"\x08 \x07 \x06 \x05 \x04 \x03 \x02 \x01 " )
820+ router2 .radio_details = mock .MagicMock ()
821+
822+ app .devices [router1 .ieee ] = router1
823+ app .devices [router2 .ieee ] = router2
824+
825+ # Let router1 be immediate child and route2 be child of the router1.
826+ # Then the routes of router1 would be:
827+ routes = [
828+ zdo_t .Route (
829+ DstNWK = app .state .node_info .nwk ,
830+ RouteStatus = 0x00 ,
831+ MemoryConstrained = 0x0 ,
832+ ManyToOne = 0x1 ,
833+ RouteRecordRequired = 0x0 ,
834+ Reserved = 0x0 ,
835+ NextHop = app .state .node_info .nwk ,
836+ ),
837+ zdo_t .Route (
838+ DstNWK = router2 .nwk ,
839+ RouteStatus = 0x00 ,
840+ MemoryConstrained = 0x0 ,
841+ ManyToOne = 0x0 ,
842+ RouteRecordRequired = 0x0 ,
843+ Reserved = 0x0 ,
844+ NextHop = router2 .nwk ,
845+ ),
846+ ]
847+
848+ await app ._routes_updated (router1 .ieee , routes )
849+
850+ router1 .radio_details .assert_called_once_with (rssi = - 80 )
851+ assert router2 .radio_details .call_count == 0
852+
853+ router1 .radio_details .reset_mock ()
854+
855+ routes = [
856+ zdo_t .Route (
857+ DstNWK = router1 .nwk ,
858+ RouteStatus = 0x00 ,
859+ MemoryConstrained = 0x0 ,
860+ ManyToOne = 0x0 ,
861+ RouteRecordRequired = 0x0 ,
862+ Reserved = 0x0 ,
863+ NextHop = router1 .nwk ,
864+ )
865+ ]
866+ await app ._routes_updated (router2 .ieee , routes )
867+
868+ assert router1 .radio_details .call_count == 0
869+ assert router2 .radio_details .call_count == 0
870+
871+ app ._api ._at_command .assert_awaited_once_with ("DB" )
0 commit comments