From fe0d7de12d3d5cd1524d0c5c3d0079ecad868ed6 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Wed, 5 Nov 2025 19:03:20 -0500 Subject: [PATCH] Implement zigpy `set_tx_power` API --- README.md | 6 ----- tests/application/test_startup.py | 37 +++++++++++--------------- zigpy_znp/commands/sys.py | 2 +- zigpy_znp/config.py | 15 +++++++++-- zigpy_znp/zigbee/application.py | 44 +++++++++++++++---------------- 5 files changed, 52 insertions(+), 52 deletions(-) diff --git a/README.md b/README.md index 984f6477..37732921 100644 --- a/README.md +++ b/README.md @@ -51,12 +51,6 @@ Below are the defaults with the top-level Home Assistant `zha:` key. zha: zigpy_config: znp_config: - # Only if your stick has a built-in power amplifier (i.e. CC1352P and CC2592) - # If set, must be between: - # * CC1352/2652: -22 and 19 - # * CC253x: -22 and 22 - tx_power: - # Only if your stick has a controllable LED (the CC2531) # If set, must be one of: off, on, blink, flash, toggle led_mode: off diff --git a/tests/application/test_startup.py b/tests/application/test_startup.py index 54b551b7..1c8393b8 100644 --- a/tests/application/test_startup.py +++ b/tests/application/test_startup.py @@ -136,20 +136,18 @@ async def test_reset(device, make_application, mocker): @pytest.mark.parametrize("device", FORMED_DEVICES) @pytest.mark.parametrize("succeed", [True, False]) async def test_tx_power(device, succeed, make_application): - app, znp_server = make_application( - server_cls=device, - client_config={conf.CONF_ZNP_CONFIG: {conf.CONF_TX_POWER: 19}}, - ) + app, znp_server = make_application(server_cls=device) + await app.startup(auto_form=False) if device.version == 3.30: if succeed: set_tx_power = znp_server.reply_once_to( - request=c.SYS.SetTxPower.Req(TXPower=19), + request=c.SYS.SetTxPower.Req(TXPower=10), responses=[c.SYS.SetTxPower.Rsp(StatusOrPower=t.Status.SUCCESS)], ) else: set_tx_power = znp_server.reply_once_to( - request=c.SYS.SetTxPower.Req(TXPower=19), + request=c.SYS.SetTxPower.Req(TXPower=10), responses=[ c.SYS.SetTxPower.Rsp( StatusOrPower=t.Status.MAC_INVALID_PARAMETER - 0xFF - 1 @@ -157,26 +155,23 @@ async def test_tx_power(device, succeed, make_application): ], ) else: - if succeed: - set_tx_power = znp_server.reply_once_to( - request=c.SYS.SetTxPower.Req(TXPower=19), - responses=[c.SYS.SetTxPower.Rsp(StatusOrPower=19)], - ) - else: - set_tx_power = znp_server.reply_once_to( - request=c.SYS.SetTxPower.Req(TXPower=19), - responses=[c.SYS.SetTxPower.Rsp(StatusOrPower=-1)], # adjusted - ) + succeed = True + set_tx_power = znp_server.reply_once_to( + request=c.SYS.SetTxPower.Req(TXPower=10), + responses=[c.SYS.SetTxPower.Rsp(StatusOrPower=10)], + ) if device.version == 3.30 and not succeed: with pytest.raises(InvalidCommandResponse): - await app.startup(auto_form=False) - - await set_tx_power + await app.set_tx_power(10) + elif device.version != 3.30 and succeed: + result = await app.set_tx_power(10) + assert result == 10 else: - await app.startup(auto_form=False) - await set_tx_power + result = await app.set_tx_power(10) + assert result is None + await set_tx_power await app.shutdown() diff --git a/zigpy_znp/commands/sys.py b/zigpy_znp/commands/sys.py index 3a8968d5..3c62363e 100644 --- a/zigpy_znp/commands/sys.py +++ b/zigpy_znp/commands/sys.py @@ -419,7 +419,7 @@ class SYS(t.CommandsBase, subsystem=t.Subsystem.SYS): 0x14, req_schema=(t.Param("TXPower", t.int8s, "Requested TX power setting, in dBm"),), # XXX: Z-Stack 3.30+ returns SUCCESS or INVALID_PARAMETER. - # Z-Stack 1.2 and 3.0 return the cloest TX power setting. + # Z-Stack 1.2 and 3.0 return the closest TX power setting. rsp_schema=( t.Param("StatusOrPower", t.int8s, "Status code or applied power setting"), ), diff --git a/zigpy_znp/config.py b/zigpy_znp/config.py index 09183ac4..db2dc79a 100644 --- a/zigpy_znp/config.py +++ b/zigpy_znp/config.py @@ -19,6 +19,7 @@ CONF_MAX_CONCURRENT_REQUESTS, cv_boolean, ) +from zigpy.config.validators import cv_deprecated as cv_deprecated_zigpy from zigpy_znp.commands.util import LEDMode @@ -92,8 +93,18 @@ def validator(value: typing.Any) -> None: vol.Optional(CONF_ZNP_CONFIG, default={}): vol.Schema( vol.All( { - vol.Optional(CONF_TX_POWER, default=None): vol.Any( - None, vol.All(int, vol.Range(min=-22, max=22)) + vol.Optional(CONF_TX_POWER, default=None): vol.All( + vol.Any( + None, + vol.All( + cv_deprecated_zigpy( + "`zigpy_config: znp_config: tx_power` has been" + " renamed to `zigpy_config: tx_power`." + ), + int, + vol.Range(min=-22, max=22), + ), + ), ), vol.Optional(CONF_SREQ_TIMEOUT, default=15): VolPositiveNumber, vol.Optional(CONF_ARSP_TIMEOUT, default=30): VolPositiveNumber, diff --git a/zigpy_znp/zigbee/application.py b/zigpy_znp/zigbee/application.py index a9e2ce1c..63912d25 100644 --- a/zigpy_znp/zigbee/application.py +++ b/zigpy_znp/zigbee/application.py @@ -56,6 +56,14 @@ class ControllerApplication(zigpy.application.ControllerApplication): SCHEMA = conf.CONFIG_SCHEMA def __init__(self, config: conf.ConfigType): + # Migrate ZNP-specific config to zigpy config + temp_config = self.SCHEMA(config) + + if temp_config[conf.CONF_ZNP_CONFIG][conf.CONF_TX_POWER] is not None: + config[zigpy.config.CONF_NWK][zigpy.config.CONF_NWK_TX_POWER] = temp_config[ + conf.CONF_ZNP_CONFIG + ][conf.CONF_TX_POWER] + super().__init__(config=config) self._znp: ZNP | None = None @@ -152,9 +160,6 @@ async def start_network(self, *, read_only=False): await self._znp.reset() - if self.znp_config[conf.CONF_TX_POWER] is not None: - await self.set_tx_power(dbm=self.znp_config[conf.CONF_TX_POWER]) - await self._znp.start_network() self._version_rsp = await self._znp.request(c.SYS.Version.Req()) @@ -199,25 +204,6 @@ async def start_network(self, *, read_only=False): "Your network is using the insecure Zigbee2MQTT network key!" ) - async def set_tx_power(self, dbm: int) -> None: - """ - Sets the radio TX power. - """ - - rsp = await self._znp.request(c.SYS.SetTxPower.Req(TXPower=dbm)) - - if self._znp.version >= 3.30 and rsp.StatusOrPower != t.Status.SUCCESS: - # Z-Stack 3's response indicates success or failure - raise InvalidCommandResponse( - f"Failed to set TX power: {t.Status(rsp.StatusOrPower & 0xFF)!r}", rsp - ) - elif self._znp.version < 3.30 and rsp.StatusOrPower != dbm: - # Old Z-Stack releases used the response status field to indicate the power - # setting that was actually applied - LOGGER.warning( - "Requested TX power %d was adjusted to %d", dbm, rsp.StatusOrPower - ) - def get_dst_address(self, cluster: zigpy.zcl.Cluster) -> zdo_t.MultiAddress: """ Helper to get a dst address for bind/unbind operations. @@ -326,6 +312,20 @@ async def permit_with_link_key( RspStatus=t.Status.SUCCESS, ) + async def _set_tx_power(self, tx_power: float) -> float | None: + """Set TX power (if supported by the radio), returning the actual TX power.""" + rsp = await self._znp.request(c.SYS.SetTxPower.Req(TXPower=tx_power)) + + if self._znp.version >= 3.30 and rsp.StatusOrPower != t.Status.SUCCESS: + # Z-Stack 3's response indicates success or failure + raise InvalidCommandResponse( + f"Failed to set TX power: {t.Status(rsp.StatusOrPower & 0xFF)!r}", rsp + ) + elif self._znp.version < 3.30: + return rsp.StatusOrPower + else: + return None + async def _move_network_to_channel( self, new_channel: int, new_nwk_update_id: int ) -> None: