|
1 | 1 | import asyncio |
2 | 2 |
|
3 | 3 | import pytest |
4 | | -from zigpy import types as t |
5 | 4 | import zigpy.exceptions |
6 | | -from zigpy.zdo.types import ZDOCmd |
| 5 | +import zigpy.state |
| 6 | +import zigpy.types as t |
| 7 | +import zigpy.zdo.types as zdo_t |
7 | 8 |
|
8 | 9 | from zigpy_xbee.api import ModemStatus, XBee |
9 | 10 | import zigpy_xbee.config as config |
|
21 | 22 | } |
22 | 23 |
|
23 | 24 |
|
| 25 | +@pytest.fixture |
| 26 | +def node_info(): |
| 27 | + return zigpy.state.NodeInfo( |
| 28 | + nwk=t.NWK(0x0000), |
| 29 | + ieee=t.EUI64.convert("00:12:4b:00:1c:a1:b8:46"), |
| 30 | + logical_type=zdo_t.LogicalType.Coordinator, |
| 31 | + ) |
| 32 | + |
| 33 | + |
| 34 | +@pytest.fixture |
| 35 | +def network_info(node_info): |
| 36 | + return zigpy.state.NetworkInfo( |
| 37 | + extended_pan_id=t.ExtendedPanId.convert("bd:27:0b:38:37:95:dc:87"), |
| 38 | + pan_id=t.PanId(0x9BB0), |
| 39 | + nwk_update_id=18, |
| 40 | + nwk_manager_id=t.NWK(0x0000), |
| 41 | + channel=t.uint8_t(15), |
| 42 | + channel_mask=t.Channels.ALL_CHANNELS, |
| 43 | + security_level=t.uint8_t(5), |
| 44 | + network_key=zigpy.state.Key( |
| 45 | + key=t.KeyData.convert("2ccade06b3090c310315b3d574d3c85a"), |
| 46 | + seq=108, |
| 47 | + tx_counter=118785, |
| 48 | + ), |
| 49 | + tc_link_key=zigpy.state.Key( |
| 50 | + key=t.KeyData(b"ZigBeeAlliance09"), |
| 51 | + partner_ieee=node_info.ieee, |
| 52 | + tx_counter=8712428, |
| 53 | + ), |
| 54 | + key_table=[], |
| 55 | + children=[], |
| 56 | + nwk_addresses={}, |
| 57 | + source="zigpy-xbee@0.0.0", |
| 58 | + ) |
| 59 | + |
| 60 | + |
24 | 61 | @pytest.fixture |
25 | 62 | def app(monkeypatch): |
26 | 63 | monkeypatch.setattr(application, "TIMEOUT_TX_STATUS", 0.1) |
@@ -242,59 +279,22 @@ async def test_get_association_state(app): |
242 | 279 | assert ai is mock.sentinel.ai |
243 | 280 |
|
244 | 281 |
|
245 | | -async def test_form_network(app): |
246 | | - legacy_module = False |
247 | | - |
248 | | - async def mock_at_command(cmd, *args): |
249 | | - if cmd == "MY": |
250 | | - return 0x0000 |
251 | | - if cmd == "OI": |
252 | | - return 0x1234 |
253 | | - elif cmd == "ID": |
254 | | - return 0x1234567812345678 |
255 | | - elif cmd == "SL": |
256 | | - return 0x11223344 |
257 | | - elif cmd == "SH": |
258 | | - return 0x55667788 |
259 | | - elif cmd == "WR": |
260 | | - app._api.coordinator_started_event.set() |
261 | | - elif cmd == "CE" and legacy_module: |
262 | | - raise RuntimeError |
263 | | - return None |
| 282 | +async def test_write_network_info(app, node_info, network_info): |
| 283 | + app._api._queued_at = mock.AsyncMock(spec=XBee._queued_at) |
| 284 | + app._api._at_command = mock.AsyncMock(spec=XBee._at_command) |
| 285 | + app._api._running = mock.AsyncMock(spec=app._api._running) |
264 | 286 |
|
265 | | - app._api._at_command = mock.MagicMock( |
266 | | - spec=XBee._at_command, side_effect=mock_at_command |
267 | | - ) |
268 | | - app._api._queued_at = mock.MagicMock( |
269 | | - spec=XBee._at_command, side_effect=mock_at_command |
270 | | - ) |
271 | 287 | app._get_association_state = mock.AsyncMock( |
272 | 288 | spec=application.ControllerApplication._get_association_state, |
273 | 289 | return_value=0x00, |
274 | 290 | ) |
275 | 291 |
|
276 | | - app.write_network_info = mock.MagicMock(wraps=app.write_network_info) |
277 | | - |
278 | | - await app.form_network() |
279 | | - assert app._api._at_command.call_count >= 1 |
280 | | - assert app._api._queued_at.call_count >= 7 |
281 | | - |
282 | | - network_info = app.write_network_info.mock_calls[0][2]["network_info"] |
283 | | - |
284 | | - app._api._queued_at.assert_any_call("SC", 1 << (network_info.channel - 11)) |
285 | | - app._api._queued_at.assert_any_call("KY", b"ZigBeeAlliance09") |
286 | | - |
287 | | - app._api._at_command.reset_mock() |
288 | | - app._api._queued_at.reset_mock() |
289 | | - legacy_module = True |
290 | | - await app.form_network() |
291 | | - assert app._api._at_command.call_count >= 1 |
292 | | - assert app._api._queued_at.call_count >= 7 |
293 | | - |
294 | | - network_info = app.write_network_info.mock_calls[0][2]["network_info"] |
| 292 | + await app.write_network_info(network_info=network_info, node_info=node_info) |
295 | 293 |
|
296 | 294 | app._api._queued_at.assert_any_call("SC", 1 << (network_info.channel - 11)) |
297 | 295 | app._api._queued_at.assert_any_call("KY", b"ZigBeeAlliance09") |
| 296 | + app._api._queued_at.assert_any_call("NK", network_info.network_key.key.serialize()) |
| 297 | + app._api._queued_at.assert_any_call("ID", 0xBD270B383795DC87) |
298 | 298 |
|
299 | 299 |
|
300 | 300 | async def _test_start_network( |
@@ -435,7 +435,7 @@ def _mock_command( |
435 | 435 | seq, |
436 | 436 | b"\xaa\x55\xbe\xef", |
437 | 437 | expect_reply=expect_reply, |
438 | | - **kwargs |
| 438 | + **kwargs, |
439 | 439 | ) |
440 | 440 |
|
441 | 441 |
|
@@ -509,7 +509,7 @@ def nwk(): |
509 | 509 |
|
510 | 510 | def test_rx_device_annce(app, ieee, nwk): |
511 | 511 | dst_ep = 0 |
512 | | - cluster_id = ZDOCmd.Device_annce |
| 512 | + cluster_id = zdo_t.ZDOCmd.Device_annce |
513 | 513 | device = mock.MagicMock() |
514 | 514 | device.status = device.Status.NEW |
515 | 515 | app.get_device = mock.MagicMock(return_value=device) |
|
0 commit comments