From 4216cf3b73a7daa1d8d3429450cf555898a70b19 Mon Sep 17 00:00:00 2001 From: Elyasin Shaladi Date: Mon, 3 Nov 2025 20:07:01 +0100 Subject: [PATCH 1/3] Adds support for Aqara E1 TRV external temp sensor --- tests/test_aqara_trv.py | 185 +++ zhaquirks/xiaomi/aqara/thermostat_agl001.py | 1117 +++++++++++-------- 2 files changed, 822 insertions(+), 480 deletions(-) create mode 100755 tests/test_aqara_trv.py mode change 100644 => 100755 zhaquirks/xiaomi/aqara/thermostat_agl001.py diff --git a/tests/test_aqara_trv.py b/tests/test_aqara_trv.py new file mode 100755 index 0000000000..6d7df1be62 --- /dev/null +++ b/tests/test_aqara_trv.py @@ -0,0 +1,185 @@ +"""Tests for Aqara E1 thermostat.""" + +from unittest import mock + +import pytest +from zigpy.zcl import foundation + +from zhaquirks.xiaomi.aqara.thermostat_agl001 import ( + AGL001, + AqaraThermostatSpecificCluster, +) + +Defs = AqaraThermostatSpecificCluster.AttributeDefs + + +@pytest.mark.parametrize("quirk", (AGL001,)) +async def test_external_sensor_mode(zigpy_device_from_quirk, quirk): + """Test Aqara E1 thermostat external sensor mode setting.""" + + # Create virtual device from the quirk + thermostat_dev = zigpy_device_from_quirk(quirk) + + # Access the Aqara specific cluster + aqara_cluster = thermostat_dev.endpoints[1].opple_cluster + + # Simulate a successful response for multiple calls + async def async_success(*args, **kwargs): + return [foundation.Status.SUCCESS] + + # Test changing to external sensor mode (1) + with mock.patch.object(aqara_cluster, "request", side_effect=async_success) as m1: + await aqara_cluster.write_attributes({Defs.sensor.id: 1}) + + # Verify that the request was called twice (once for each write_attributes call) + assert m1.call_count == 2 + + # Verify that the SENSOR_ATTR attribute was used in both calls + first_call_args = m1.call_args_list[0][0] + second_call_args = m1.call_args_list[1][0] + + assert first_call_args[1] == foundation.GeneralCommand.Write_Attributes + assert second_call_args[1] == foundation.GeneralCommand.Write_Attributes + + # Verify that the SENSOR_ATTR is present in the attributes list + assert any(attr.attrid == Defs.sensor_attr.id for attr in first_call_args[3]) + assert any(attr.attrid == Defs.sensor_attr.id for attr in second_call_args[3]) + + # Get the attribute values + first_attr = next( + attr for attr in first_call_args[3] if attr.attrid == Defs.sensor_attr.id + ) + second_attr = next( + attr for attr in second_call_args[3] if attr.attrid == Defs.sensor_attr.id + ) + + first_attr_value = first_attr.value.value + second_attr_value = second_attr.value.value + + assert first_attr_value.startswith(b"\xaa\x71") + assert b"\x02" in first_attr_value # Action code for external sensor + + assert second_attr_value.startswith(b"\xaa\x71") + assert b"\x02" in second_attr_value # Action code for external sensor + + +@pytest.mark.parametrize("quirk", (AGL001,)) +async def test_internal_sensor_mode(zigpy_device_from_quirk, quirk): + """Test Aqara E1 thermostat internal sensor mode setting.""" + + # Create virtual device from the quirk + thermostat_dev = zigpy_device_from_quirk(quirk) + + # Access the Aqara specific cluster + aqara_cluster = thermostat_dev.endpoints[1].opple_cluster + + # Simulate a successful response for multiple calls + async def async_success(*args, **kwargs): + return [foundation.Status.SUCCESS] + + # Test changing to internal sensor mode (0) + with mock.patch.object(aqara_cluster, "request", side_effect=async_success) as m1: + await aqara_cluster.write_attributes({Defs.sensor.id: 0}) + + # Verify that the request was called twice (once for each write_attributes call) + assert m1.call_count == 2 + + # Verify that the SENSOR_ATTR attribute was used in both calls + first_call_args = m1.call_args_list[0][0] + second_call_args = m1.call_args_list[1][0] + + assert first_call_args[1] == foundation.GeneralCommand.Write_Attributes + assert second_call_args[1] == foundation.GeneralCommand.Write_Attributes + + # Verify that the SENSOR_ATTR is present in the attributes list + assert any(attr.attrid == Defs.sensor_attr.id for attr in first_call_args[3]) + assert any(attr.attrid == Defs.sensor_attr.id for attr in second_call_args[3]) + + # Get the attribute values + first_attr = next( + attr for attr in first_call_args[3] if attr.attrid == Defs.sensor_attr.id + ) + second_attr = next( + attr for attr in second_call_args[3] if attr.attrid == Defs.sensor_attr.id + ) + + first_attr_value = first_attr.value.value + second_attr_value = second_attr.value.value + + assert first_attr_value.startswith(b"\xaa\x71") + assert b"\x04" in first_attr_value # Action code for internal sensor + + assert second_attr_value.startswith(b"\xaa\x71") + assert b"\x04" in second_attr_value # Action code for internal sensor + + +@pytest.mark.parametrize("quirk", (AGL001,)) +async def test_external_sensor_temperature(zigpy_device_from_quirk, quirk): + """Test Aqara E1 thermostat external temperature setting.""" + + # Create virtual device from the quirk + thermostat_dev = zigpy_device_from_quirk(quirk) + + # Access the Aqara specific cluster + aqara_cluster = thermostat_dev.endpoints[1].opple_cluster + + # Simulate a successful response + async def async_success(*args, **kwargs): + return [foundation.Status.SUCCESS] + + # Test sending an external temperature (2500 = 25.00°C) + with mock.patch.object(aqara_cluster, "request", side_effect=async_success) as m1: + await aqara_cluster.write_attributes({Defs.sensor_temp.id: 2500}) + + # Verify that the request was called + assert m1.call_count == 1 + + # Verify that the SENSOR_ATTR attribute was used + args = m1.call_args[0] + assert args[1] == foundation.GeneralCommand.Write_Attributes + attr = next(attr for attr in args[3] if attr.attrid == Defs.sensor_attr.id) + + # Verify that the Aqara header is present + attr_value = attr.value.value + assert attr_value.startswith(b"\xaa\x71") + assert b"\x05" in attr_value # Action code for setting temperature + + # Verify that the temperature value is present + sensor_id = b"\x00\x15\x8d\x00\x01\x9d\x1b\x98" + assert sensor_id in attr_value + + +@pytest.mark.parametrize("quirk", (AGL001,)) +async def test_external_sensor_temp_message_length_regression( + zigpy_device_from_quirk, quirk +): + """Test that repeated external sensor temperature writes do not mutate SENSOR_ID.""" + + thermostat_dev = zigpy_device_from_quirk(quirk) + aqara_cluster = thermostat_dev.endpoints[1].opple_cluster + + async def async_success(*args, **kwargs): + return [foundation.Status.SUCCESS] + + with mock.patch.object(aqara_cluster, "request", side_effect=async_success) as m1: + # First call + await aqara_cluster.write_attributes({Defs.sensor_temp.id: 2500}) + first_args = m1.call_args[0] + first_attr = next( + attr for attr in first_args[3] if attr.attrid == Defs.sensor_attr.id + ) + first_value = first_attr.value.value + + # Second call + await aqara_cluster.write_attributes({Defs.sensor_temp.id: 2500}) + second_args = m1.call_args[0] + second_attr = next( + attr for attr in second_args[3] if attr.attrid == Defs.sensor_attr.id + ) + second_value = second_attr.value.value + + # Assert that the message length is the same and content is identical + assert len(first_value) == len(second_value), ( + f"Message length changed: {len(first_value)} vs {len(second_value)}" + ) + assert first_value == second_value, "Message content changed between calls" diff --git a/zhaquirks/xiaomi/aqara/thermostat_agl001.py b/zhaquirks/xiaomi/aqara/thermostat_agl001.py old mode 100644 new mode 100755 index 57ef0a3a4a..601c2aa2c7 --- a/zhaquirks/xiaomi/aqara/thermostat_agl001.py +++ b/zhaquirks/xiaomi/aqara/thermostat_agl001.py @@ -1,480 +1,637 @@ -"""Aqara E1 Radiator Thermostat Quirk.""" - -from __future__ import annotations - -from functools import reduce -import math -import struct -from typing import Any, Final - -from zigpy.profiles import zha -from zigpy.quirks import CustomCluster -import zigpy.types as t -from zigpy.zcl.clusters.general import Basic, Identify, Ota, Time -from zigpy.zcl.clusters.hvac import Thermostat -from zigpy.zcl.foundation import ZCLAttributeDef - -from zhaquirks.const import ( - DEVICE_TYPE, - ENDPOINTS, - INPUT_CLUSTERS, - MODELS_INFO, - OUTPUT_CLUSTERS, - PROFILE_ID, -) -from zhaquirks.xiaomi import ( - LUMI, - XiaomiAqaraE1Cluster, - XiaomiCustomDevice, - XiaomiPowerConfiguration, -) - -ZCL_SYSTEM_MODE = Thermostat.attributes_by_name["system_mode"].id - -XIAOMI_SYSTEM_MODE_MAP = { - 0: Thermostat.SystemMode.Off, - 1: Thermostat.SystemMode.Heat, -} - -SYSTEM_MODE = 0x0271 -PRESET = 0x0272 -WINDOW_DETECTION = 0x0273 -VALVE_DETECTION = 0x0274 -VALVE_ALARM = 0x0275 -CHILD_LOCK = 0x0277 -AWAY_PRESET_TEMPERATURE = 0x0279 -WINDOW_OPEN = 0x027A -CALIBRATED = 0x027B -SCHEDULE = 0x027D -SCHEDULE_SETTINGS = 0x0276 -SENSOR = 0x027E -BATTERY_PERCENTAGE = 0x040A - -XIAOMI_CLUSTER_ID = 0xFCC0 - -DAYS_MAP = { - "mon": 0x02, - "tue": 0x04, - "wed": 0x08, - "thu": 0x10, - "fri": 0x20, - "sat": 0x40, - "sun": 0x80, -} -NEXT_DAY_FLAG = 1 << 15 - - -class ThermostatCluster(CustomCluster, Thermostat): - """Thermostat cluster.""" - - # remove cooling mode - _CONSTANT_ATTRIBUTES = { - Thermostat.attributes_by_name[ - "ctrl_sequence_of_oper" - ].id: Thermostat.ControlSequenceOfOperation.Heating_Only - } - - async def read_attributes( - self, - attributes: list[int | str], - allow_cache: bool = False, - only_cache: bool = False, - manufacturer: int | t.uint16_t | None = None, - ): - """Pass reading attributes to Xiaomi cluster if applicable.""" - successful_r, failed_r = {}, {} - remaining_attributes = attributes.copy() - - # read system_mode from Xiaomi cluster (can be numeric or string) - if ZCL_SYSTEM_MODE in attributes or "system_mode" in attributes: - self.debug("Passing 'system_mode' read to Xiaomi cluster") - - if ZCL_SYSTEM_MODE in attributes: - remaining_attributes.remove(ZCL_SYSTEM_MODE) - if "system_mode" in attributes: - remaining_attributes.remove("system_mode") - - successful_r, failed_r = await self.endpoint.opple_cluster.read_attributes( - [SYSTEM_MODE], allow_cache, only_cache, manufacturer - ) - # convert Xiaomi system_mode to ZCL attribute - if SYSTEM_MODE in successful_r: - successful_r[ZCL_SYSTEM_MODE] = XIAOMI_SYSTEM_MODE_MAP[ - successful_r.pop(SYSTEM_MODE) - ] - # read remaining attributes from thermostat cluster - if remaining_attributes: - remaining_result = await super().read_attributes( - remaining_attributes, allow_cache, only_cache, manufacturer - ) - successful_r.update(remaining_result[0]) - failed_r.update(remaining_result[1]) - return successful_r, failed_r - - async def write_attributes( - self, attributes: dict[str | int, Any], manufacturer: int | None = None - ) -> list: - """Pass writing attributes to Xiaomi cluster if applicable.""" - result = [] - remaining_attributes = attributes.copy() - system_mode_value = None - - # check if system_mode is being written (can be numeric or string) - if ZCL_SYSTEM_MODE in attributes: - remaining_attributes.pop(ZCL_SYSTEM_MODE) - system_mode_value = attributes.get(ZCL_SYSTEM_MODE) - if "system_mode" in attributes: - remaining_attributes.pop("system_mode") - system_mode_value = attributes.get("system_mode") - - # write system_mode to Xiaomi cluster if applicable - if system_mode_value is not None: - self.debug("Passing 'system_mode' write to Xiaomi cluster") - result += await self.endpoint.opple_cluster.write_attributes( - {SYSTEM_MODE: min(int(system_mode_value), 1)} - ) - - # write remaining attributes to thermostat cluster - if remaining_attributes: - result += await super().write_attributes(remaining_attributes, manufacturer) - return result - - -class ScheduleEvent: - """Schedule event object.""" - - _is_next_day = False - - def __init__(self, value, is_next_day=False): - """Create ScheduleEvent object from bytes or string.""" - if isinstance(value, bytes): - self._verify_buffer_len(value) - self._time = self._read_time_from_buf(value) - self._temp = self._read_temp_from_buf(value) - self._validate_time(self._time) - self._validate_temp(self._temp) - elif isinstance(value, str): - groups = value.split(",") - if len(groups) != 2: - raise ValueError("Time and temperature must contain ',' separator") - self._time = self._parse_time(groups[0]) - self._temp = self._parse_temp(groups[1]) - self._validate_time(self._time) - self._validate_temp(self._temp) - else: - raise TypeError( - f"Cannot create ScheduleEvent object from type: {type(value)}" - ) - self._is_next_day = is_next_day - - @staticmethod - def _verify_buffer_len(buf): - if len(buf) != 6: - raise ValueError("Buffer size must equal 6") - - @staticmethod - def _read_time_from_buf(buf): - time = struct.unpack_from(">H", buf, offset=0)[0] - time &= ~NEXT_DAY_FLAG - return time - - @staticmethod - def _parse_time(string): - parts = string.split(":") - if len(parts) != 2: - raise ValueError("Time must contain ':' separator") - - hours = int(parts[0]) - minutes = int(parts[1]) - - return hours * 60 + minutes - - @staticmethod - def _read_temp_from_buf(buf): - return struct.unpack_from(">H", buf, offset=4)[0] / 100 - - @staticmethod - def _parse_temp(string): - return float(string) - - @staticmethod - def _validate_time(time): - if time <= 0: - raise ValueError("Time must be between 00:00 and 23:59") - if time > 24 * 60: - raise ValueError("Time must be between 00:00 and 23:59") - - @staticmethod - def _validate_temp(temp): - if temp < 5: - raise ValueError("Temperature must be between 5 and 30 °C") - if temp > 30: - raise ValueError("Temperature must be between 5 and 30 °C") - if (temp * 10) % 5 != 0: - raise ValueError("Temperature must be whole or half degrees") - - def _write_time_to_buf(self, buf): - time = self._time - if self._is_next_day: - time |= NEXT_DAY_FLAG - struct.pack_into(">H", buf, 0, time) - - def _write_temp_to_buf(self, buf): - struct.pack_into(">H", buf, 4, int(self._temp * 100)) - - def is_next_day(self): - """Return if event is on the next day.""" - return self._is_next_day - - def set_next_day(self, is_next_day): - """Set if event is on the next day.""" - self._is_next_day = is_next_day - - def get_time(self): - """Return event time.""" - return self._time - - def __str__(self): - """Return event as string.""" - return f"{math.floor(self._time / 60)}:{f'{self._time % 60:0>2}'},{f'{self._temp:.1f}'}" - - def serialize(self): - """Serialize event to bytes.""" - result = bytearray(6) - self._write_time_to_buf(result) - self._write_temp_to_buf(result) - return result - - -class ScheduleSettings(t.LVBytes): - """Schedule settings object.""" - - def __new__(cls, value): - """Create ScheduleSettings object from bytes or string.""" - day_selection = None - events = [None] * 4 - if isinstance(value, bytes): - ScheduleSettings._verify_buffer_len(value) - ScheduleSettings._verify_magic_byte(value) - day_selection = ScheduleSettings._read_day_selection(value) - for i in range(4): - events[i] = ScheduleSettings._read_event(value, i) - elif isinstance(value, str): - groups = value.split("|") - ScheduleSettings._verify_string(groups) - day_selection = ScheduleSettings._read_day_selection(groups[0]) - for i in range(4): - events[i] = ScheduleSettings._read_event(groups[i + 1], i) - else: - raise TypeError( - f"Cannot create ScheduleSettings object from type: {type(value)}" - ) - - for i in range(1, 4): - if events[i].get_time() < events[i - 1].get_time(): - events[i].set_next_day(True) - ScheduleSettings._verify_event_durations(events) - - result = bytearray(b"\x04") - result.append(ScheduleSettings._get_day_selection_byte(day_selection)) - for e in events: - result.extend(e.serialize()) - return super().__new__(cls, bytes(result)) - - @staticmethod - def _verify_buffer_len(buf): - if len(buf) != 26: - raise ValueError("Buffer size must equal 26") - - @staticmethod - def _verify_magic_byte(buf): - if struct.unpack_from("c", buf, offset=0)[0][0] != 0x04: - raise ValueError("Magic byte must be equal to 0x04") - - @staticmethod - def _verify_string(groups): - if len(groups) != 5: - raise ValueError("There must be 5 groups in a string") - days = groups[0].split(",") - ScheduleSettings._verify_day_selection_in_str(days) - - @staticmethod - def _verify_day_selection_in_str(days): - if len(days) == 0 or len(days) > 7: - raise ValueError("Number of days selected must be between 1 and 7") - if len(days) != len(set(days)): - raise ValueError("Duplicate day names present") - for d in days: - if d not in DAYS_MAP: - raise ValueError( - f"String: {d} is not a valid day name, valid names: mon, tue, wed, thu, fri, sat, sun" - ) - - @staticmethod - def _read_day_selection(value): - day_selection = [] - if isinstance(value, bytes): - byte = struct.unpack_from("c", value, offset=1)[0][0] - if byte & 0x01: - raise ValueError("Incorrect day selected") - for i, v in DAYS_MAP.items(): - if byte & v: - day_selection.append(i) - ScheduleSettings._verify_day_selection_in_str(day_selection) - elif isinstance(value, str): - day_selection = value.split(",") - ScheduleSettings._verify_day_selection_in_str(day_selection) - return day_selection - - @staticmethod - def _read_event(value, index): - if isinstance(value, bytes): - event_buf = value[2 + index * 6 : 8 + index * 6] - return ScheduleEvent(event_buf) - elif isinstance(value, str): - return ScheduleEvent(value) - - @staticmethod - def _verify_event_durations(events): - full_day = 24 * 60 - prev_time = events[0].get_time() - durations = [] - for i in range(1, 4): - event = events[i] - if event.is_next_day(): - durations.append(full_day - prev_time + event.get_time()) - else: - durations.append(event.get_time() - prev_time) - prev_time = event.get_time() - if any(d < 60 for d in durations): - raise ValueError("The individual times must be at least 1 hour apart") - if reduce((lambda x, y: x + y), durations) > full_day: - raise ValueError("The start and end times must be at most 24 hours apart") - - @staticmethod - def _get_day_selection_byte(day_selection): - byte = 0x00 - for d in day_selection: - byte |= DAYS_MAP[d] - return byte - - def __str__(self): - """Return ScheduleSettings as string.""" - day_selection = ScheduleSettings._read_day_selection(self) - events = [None] * 4 - for i in range(4): - events[i] = ScheduleSettings._read_event(self, i) - result = ",".join(day_selection) - for e in events: - result += f"|{e}" - return result - - -class AqaraThermostatSpecificCluster(XiaomiAqaraE1Cluster): - """Aqara manufacturer specific settings.""" - - class AttributeDefs(XiaomiAqaraE1Cluster.AttributeDefs): - """Attribute definitions.""" - - system_mode: Final = ZCLAttributeDef( - id=SYSTEM_MODE, type=t.uint8_t, is_manufacturer_specific=True - ) - preset: Final = ZCLAttributeDef( - id=PRESET, type=t.uint8_t, is_manufacturer_specific=True - ) - window_detection: Final = ZCLAttributeDef( - id=WINDOW_DETECTION, type=t.uint8_t, is_manufacturer_specific=True - ) - valve_detection: Final = ZCLAttributeDef( - id=VALVE_DETECTION, type=t.uint8_t, is_manufacturer_specific=True - ) - valve_alarm: Final = ZCLAttributeDef( - id=VALVE_ALARM, type=t.uint8_t, is_manufacturer_specific=True - ) - child_lock: Final = ZCLAttributeDef( - id=CHILD_LOCK, type=t.uint8_t, is_manufacturer_specific=True - ) - away_preset_temperature: Final = ZCLAttributeDef( - id=AWAY_PRESET_TEMPERATURE, type=t.uint32_t, is_manufacturer_specific=True - ) - window_open: Final = ZCLAttributeDef( - id=WINDOW_OPEN, type=t.uint8_t, is_manufacturer_specific=True - ) - calibrated: Final = ZCLAttributeDef( - id=CALIBRATED, type=t.uint8_t, is_manufacturer_specific=True - ) - schedule: Final = ZCLAttributeDef( - id=SCHEDULE, type=t.uint8_t, is_manufacturer_specific=True - ) - schedule_settings: Final = ZCLAttributeDef( - id=SCHEDULE_SETTINGS, type=ScheduleSettings, is_manufacturer_specific=True - ) - sensor: Final = ZCLAttributeDef( - id=SENSOR, type=t.uint8_t, is_manufacturer_specific=True - ) - battery_percentage: Final = ZCLAttributeDef( - id=BATTERY_PERCENTAGE, type=t.uint8_t, is_manufacturer_specific=True - ) - - def _update_attribute(self, attrid, value): - self.debug("Updating attribute on Xiaomi cluster %s with %s", attrid, value) - if attrid == BATTERY_PERCENTAGE: - self.endpoint.power.battery_percent_reported(value) - elif attrid == SYSTEM_MODE: - # update ZCL system_mode attribute (e.g. on attribute reports) - self.endpoint.thermostat.update_attribute( - ZCL_SYSTEM_MODE, XIAOMI_SYSTEM_MODE_MAP[value] - ) - super()._update_attribute(attrid, value) - - -class AGL001(XiaomiCustomDevice): - """Aqara E1 Radiator Thermostat (AGL001) Device.""" - - signature = { - # - MODELS_INFO: [(LUMI, "lumi.airrtc.agl001")], - ENDPOINTS: { - 1: { - PROFILE_ID: zha.PROFILE_ID, - DEVICE_TYPE: zha.DeviceType.THERMOSTAT, - INPUT_CLUSTERS: [ - Basic.cluster_id, - Identify.cluster_id, - Thermostat.cluster_id, - Time.cluster_id, - XiaomiPowerConfiguration.cluster_id, - AqaraThermostatSpecificCluster.cluster_id, - ], - OUTPUT_CLUSTERS: [ - Identify.cluster_id, - Thermostat.cluster_id, - AqaraThermostatSpecificCluster.cluster_id, - ], - } - }, - } - - replacement = { - ENDPOINTS: { - 1: { - INPUT_CLUSTERS: [ - Basic.cluster_id, - Identify.cluster_id, - ThermostatCluster, - Time.cluster_id, - XiaomiPowerConfiguration, - AqaraThermostatSpecificCluster, - ], - OUTPUT_CLUSTERS: [ - Identify.cluster_id, - ThermostatCluster, - AqaraThermostatSpecificCluster, - Ota.cluster_id, - ], - } - } - } +"""Aqara E1 Radiator Thermostat Quirk.""" + +from __future__ import annotations + +from functools import reduce +import math +import struct +import time +from typing import Any, Final + +from zigpy.profiles import zha +from zigpy.quirks import CustomCluster +import zigpy.types as t +from zigpy.zcl.clusters.general import Basic, Identify, Ota, Time +from zigpy.zcl.clusters.hvac import Thermostat +from zigpy.zcl.foundation import ZCLAttributeDef + +from zhaquirks.const import ( + DEVICE_TYPE, + ENDPOINTS, + INPUT_CLUSTERS, + MODELS_INFO, + OUTPUT_CLUSTERS, + PROFILE_ID, +) +from zhaquirks.xiaomi import ( + LUMI, + XiaomiAqaraE1Cluster, + XiaomiCustomDevice, + XiaomiPowerConfiguration, +) + +ZCL_SYSTEM_MODE = Thermostat.attributes_by_name["system_mode"].id + +XIAOMI_SYSTEM_MODE_MAP = { + 0: Thermostat.SystemMode.Off, + 1: Thermostat.SystemMode.Heat, +} + + +class Constants: + """Constants specific for Aqara E1 TRV.""" + + SENSOR_ID = bytearray.fromhex("00158d00019d1b98") + + +DAYS_MAP = { + "mon": 0x02, + "tue": 0x04, + "wed": 0x08, + "thu": 0x10, + "fri": 0x20, + "sat": 0x40, + "sun": 0x80, +} +NEXT_DAY_FLAG = 1 << 15 + + +class ThermostatCluster(CustomCluster, Thermostat): + """Thermostat cluster.""" + + # remove cooling mode + _CONSTANT_ATTRIBUTES = { + Thermostat.attributes_by_name[ + "ctrl_sequence_of_oper" + ].id: Thermostat.ControlSequenceOfOperation.Heating_Only + } + + async def read_attributes( + self, + attributes: list[int | str], + allow_cache: bool = False, + only_cache: bool = False, + manufacturer: int | t.uint16_t | None = None, + ): + """Pass reading attributes to Xiaomi cluster if applicable.""" + successful_r, failed_r = {}, {} + remaining_attributes = attributes.copy() + + # read system_mode from Xiaomi cluster (can be numeric or string) + if ( + ZCL_SYSTEM_MODE in attributes + or AqaraThermostatSpecificCluster.AttributeDefs.system_mode.name + in attributes + ): + self.debug("Passing 'system_mode' read to Xiaomi cluster") + + if ZCL_SYSTEM_MODE in attributes: + remaining_attributes.remove(ZCL_SYSTEM_MODE) + if ( + AqaraThermostatSpecificCluster.AttributeDefs.system_mode.name + in attributes + ): + remaining_attributes.remove( + AqaraThermostatSpecificCluster.AttributeDefs.system_mode.name + ) + + successful_r, failed_r = await self.endpoint.opple_cluster.read_attributes( + [AqaraThermostatSpecificCluster.AttributeDefs.system_mode.id], + allow_cache, + only_cache, + manufacturer, + ) + # convert Xiaomi system_mode to ZCL attribute + if ( + AqaraThermostatSpecificCluster.AttributeDefs.system_mode.id + in successful_r + ): + successful_r[ZCL_SYSTEM_MODE] = XIAOMI_SYSTEM_MODE_MAP[ + successful_r.pop( + AqaraThermostatSpecificCluster.AttributeDefs.system_mode.id + ) + ] + # read remaining attributes from thermostat cluster + if remaining_attributes: + remaining_result = await super().read_attributes( + remaining_attributes, allow_cache, only_cache, manufacturer + ) + successful_r.update(remaining_result[0]) + failed_r.update(remaining_result[1]) + return successful_r, failed_r + + async def write_attributes( + self, attributes: dict[str | int, Any], manufacturer: int | None = None + ) -> list: + """Pass writing attributes to Xiaomi cluster if applicable.""" + result = [] + remaining_attributes = attributes.copy() + system_mode_value = None + + # check if system_mode is being written (can be numeric or string) + if ZCL_SYSTEM_MODE in attributes: + remaining_attributes.pop(ZCL_SYSTEM_MODE) + system_mode_value = attributes.get(ZCL_SYSTEM_MODE) + if AqaraThermostatSpecificCluster.AttributeDefs.system_mode.name in attributes: + remaining_attributes.pop( + AqaraThermostatSpecificCluster.AttributeDefs.system_mode.name + ) + system_mode_value = attributes.get( + AqaraThermostatSpecificCluster.AttributeDefs.system_mode.name + ) + + # write system_mode to Xiaomi cluster if applicable + if system_mode_value is not None: + self.debug("Passing 'system_mode' write to Xiaomi cluster") + result += await self.endpoint.opple_cluster.write_attributes( + { + AqaraThermostatSpecificCluster.AttributeDefs.system_mode.id: min( + int(system_mode_value), 1 + ) + } + ) + + # write remaining attributes to thermostat cluster + if remaining_attributes: + result += await super().write_attributes(remaining_attributes, manufacturer) + return result + + +class ScheduleEvent: + """Schedule event object.""" + + _is_next_day = False + + def __init__(self, value, is_next_day=False): + """Create ScheduleEvent object from bytes or string.""" + if isinstance(value, bytes): + self._verify_buffer_len(value) + self._time = self._read_time_from_buf(value) + self._temp = self._read_temp_from_buf(value) + self._validate_time(self._time) + self._validate_temp(self._temp) + elif isinstance(value, str): + groups = value.split(",") + if len(groups) != 2: + raise ValueError("Time and temperature must contain ',' separator") + self._time = self._parse_time(groups[0]) + self._temp = self._parse_temp(groups[1]) + self._validate_time(self._time) + self._validate_temp(self._temp) + else: + raise TypeError( + f"Cannot create ScheduleEvent object from type: {type(value)}" + ) + self._is_next_day = is_next_day + + @staticmethod + def _verify_buffer_len(buf): + if len(buf) != 6: + raise ValueError("Buffer size must equal 6") + + @staticmethod + def _read_time_from_buf(buf): + time = struct.unpack_from(">H", buf, offset=0)[0] + time &= ~NEXT_DAY_FLAG + return time + + @staticmethod + def _parse_time(string): + parts = string.split(":") + if len(parts) != 2: + raise ValueError("Time must contain ':' separator") + + hours = int(parts[0]) + minutes = int(parts[1]) + + return hours * 60 + minutes + + @staticmethod + def _read_temp_from_buf(buf): + return struct.unpack_from(">H", buf, offset=4)[0] / 100 + + @staticmethod + def _parse_temp(string): + return float(string) + + @staticmethod + def _validate_time(time): + if time <= 0: + raise ValueError("Time must be between 00:00 and 23:59") + if time > 24 * 60: + raise ValueError("Time must be between 00:00 and 23:59") + + @staticmethod + def _validate_temp(temp): + if temp < 5: + raise ValueError("Temperature must be between 5 and 30 °C") + if temp > 30: + raise ValueError("Temperature must be between 5 and 30 °C") + if (temp * 10) % 5 != 0: + raise ValueError("Temperature must be whole or half degrees") + + def _write_time_to_buf(self, buf): + time = self._time + if self._is_next_day: + time |= NEXT_DAY_FLAG + struct.pack_into(">H", buf, 0, time) + + def _write_temp_to_buf(self, buf): + struct.pack_into(">H", buf, 4, int(self._temp * 100)) + + def is_next_day(self): + """Return if event is on the next day.""" + return self._is_next_day + + def set_next_day(self, is_next_day): + """Set if event is on the next day.""" + self._is_next_day = is_next_day + + def get_time(self): + """Return event time.""" + return self._time + + def __str__(self): + """Return event as string.""" + return f"{math.floor(self._time / 60)}:{f'{self._time % 60:0>2}'},{f'{self._temp:.1f}'}" + + def serialize(self): + """Serialize event to bytes.""" + result = bytearray(6) + self._write_time_to_buf(result) + self._write_temp_to_buf(result) + return result + + +class ScheduleSettings(t.LVBytes): + """Schedule settings object.""" + + def __new__(cls, value): + """Create ScheduleSettings object from bytes or string.""" + day_selection = None + events = [None] * 4 + if isinstance(value, bytes): + ScheduleSettings._verify_buffer_len(value) + ScheduleSettings._verify_magic_byte(value) + day_selection = ScheduleSettings._read_day_selection(value) + for i in range(4): + events[i] = ScheduleSettings._read_event(value, i) + elif isinstance(value, str): + groups = value.split("|") + ScheduleSettings._verify_string(groups) + day_selection = ScheduleSettings._read_day_selection(groups[0]) + for i in range(4): + events[i] = ScheduleSettings._read_event(groups[i + 1], i) + else: + raise TypeError( + f"Cannot create ScheduleSettings object from type: {type(value)}" + ) + + for i in range(1, 4): + if events[i].get_time() < events[i - 1].get_time(): + events[i].set_next_day(True) + ScheduleSettings._verify_event_durations(events) + + result = bytearray(b"\x04") + result.append(ScheduleSettings._get_day_selection_byte(day_selection)) + for e in events: + result.extend(e.serialize()) + return super().__new__(cls, bytes(result)) + + @staticmethod + def _verify_buffer_len(buf): + if len(buf) != 26: + raise ValueError("Buffer size must equal 26") + + @staticmethod + def _verify_magic_byte(buf): + if struct.unpack_from("c", buf, offset=0)[0][0] != 0x04: + raise ValueError("Magic byte must be equal to 0x04") + + @staticmethod + def _verify_string(groups): + if len(groups) != 5: + raise ValueError("There must be 5 groups in a string") + days = groups[0].split(",") + ScheduleSettings._verify_day_selection_in_str(days) + + @staticmethod + def _verify_day_selection_in_str(days): + if len(days) == 0 or len(days) > 7: + raise ValueError("Number of days selected must be between 1 and 7") + if len(days) != len(set(days)): + raise ValueError("Duplicate day names present") + for d in days: + if d not in DAYS_MAP: + raise ValueError( + f"String: {d} is not a valid day name, valid names: mon, tue, wed, thu, fri, sat, sun" + ) + + @staticmethod + def _read_day_selection(value): + day_selection = [] + if isinstance(value, bytes): + byte = struct.unpack_from("c", value, offset=1)[0][0] + if byte & 0x01: + raise ValueError("Incorrect day selected") + for i, v in DAYS_MAP.items(): + if byte & v: + day_selection.append(i) + ScheduleSettings._verify_day_selection_in_str(day_selection) + elif isinstance(value, str): + day_selection = value.split(",") + ScheduleSettings._verify_day_selection_in_str(day_selection) + return day_selection + + @staticmethod + def _read_event(value, index): + if isinstance(value, bytes): + event_buf = value[2 + index * 6 : 8 + index * 6] + return ScheduleEvent(event_buf) + elif isinstance(value, str): + return ScheduleEvent(value) + + @staticmethod + def _verify_event_durations(events): + full_day = 24 * 60 + prev_time = events[0].get_time() + durations = [] + for i in range(1, 4): + event = events[i] + if event.is_next_day(): + durations.append(full_day - prev_time + event.get_time()) + else: + durations.append(event.get_time() - prev_time) + prev_time = event.get_time() + if any(d < 60 for d in durations): + raise ValueError("The individual times must be at least 1 hour apart") + if reduce((lambda x, y: x + y), durations) > full_day: + raise ValueError("The start and end times must be at most 24 hours apart") + + @staticmethod + def _get_day_selection_byte(day_selection): + byte = 0x00 + for d in day_selection: + byte |= DAYS_MAP[d] + return byte + + def __str__(self): + """Return ScheduleSettings as string.""" + day_selection = ScheduleSettings._read_day_selection(self) + events = [None] * 4 + for i in range(4): + events[i] = ScheduleSettings._read_event(self, i) + result = ",".join(day_selection) + for e in events: + result += f"|{e}" + return result + + +class AqaraThermostatSpecificCluster(XiaomiAqaraE1Cluster): + """Aqara manufacturer specific settings.""" + + class AttributeDefs(XiaomiAqaraE1Cluster.AttributeDefs): + """Attribute definitions.""" + + system_mode: Final = ZCLAttributeDef( + id=0x0271, type=t.uint8_t, is_manufacturer_specific=True + ) + preset: Final = ZCLAttributeDef( + id=0x0272, type=t.uint8_t, is_manufacturer_specific=True + ) + window_detection: Final = ZCLAttributeDef( + id=0x0273, type=t.uint8_t, is_manufacturer_specific=True + ) + valve_detection: Final = ZCLAttributeDef( + id=0x0274, type=t.uint8_t, is_manufacturer_specific=True + ) + valve_alarm: Final = ZCLAttributeDef( + id=0x0275, type=t.uint8_t, is_manufacturer_specific=True + ) + schedule_settings: Final = ZCLAttributeDef( + id=0x0276, type=ScheduleSettings, is_manufacturer_specific=True + ) + child_lock: Final = ZCLAttributeDef( + id=0x0277, type=t.uint8_t, is_manufacturer_specific=True + ) + away_preset_temperature: Final = ZCLAttributeDef( + id=0x0279, type=t.uint32_t, is_manufacturer_specific=True + ) + window_open: Final = ZCLAttributeDef( + id=0x027A, type=t.uint8_t, is_manufacturer_specific=True + ) + calibrated: Final = ZCLAttributeDef( + id=0x027B, type=t.uint8_t, is_manufacturer_specific=True + ) + schedule: Final = ZCLAttributeDef( + id=0x027D, type=t.uint8_t, is_manufacturer_specific=True + ) + sensor: Final = ZCLAttributeDef( + id=0x027E, type=t.uint8_t, is_manufacturer_specific=True + ) + battery_percentage: Final = ZCLAttributeDef( + id=0x040A, type=t.uint8_t, is_manufacturer_specific=True + ) + sensor_temp: Final = ( + ZCLAttributeDef( # Fake address to pass external sensor temperature + id=0x1392, type=t.uint32_t, is_manufacturer_specific=True + ) + ) + sensor_attr: Final = ZCLAttributeDef( + id=0xFFF2, type=t.LVBytes, is_manufacturer_specific=True + ) + + def _update_attribute(self, attrid, value): + self.debug("Updating attribute on Xiaomi cluster %s with %s", attrid, value) + if attrid == self.AttributeDefs.battery_percentage.id: + self.endpoint.power.battery_percent_reported(value) + elif attrid == self.AttributeDefs.system_mode.id: + # update ZCL system_mode attribute (e.g. on attribute reports) + self.endpoint.thermostat.update_attribute( + ZCL_SYSTEM_MODE, XIAOMI_SYSTEM_MODE_MAP[value] + ) + super()._update_attribute(attrid, value) + + def aqara_header(self, counter: int, params: bytearray, action: int) -> bytearray: + """Create Aqara header for setting external sensor.""" + header = bytes([0xAA, 0x71, len(params) + 3, 0x44, counter]) + integrity = 512 - sum(header) + + return header + bytes([integrity, action, 0x41, len(params)]) + + def _float_to_hex(self, f): + """Convert float to hex.""" + return hex(struct.unpack(" list: + """Write attributes to device with internal 'attributes' validation.""" + attrs: dict[str | int, Any] = {} + + for attr, value in attributes.items(): + # implemented with help from https://github.com/Koenkk/zigbee-herdsman-converters/blob/master/devices/xiaomi.js + attr_def = self.find_attribute(attr) + + if attr_def and attr_def.id == self.AttributeDefs.sensor_temp.id: + # set external sensor temp. this function expect value to be passed multiplied by 100 + temperatureBuf = bytearray.fromhex( + self._float_to_hex(round(float(value)))[2:] + ) + + params = bytearray(Constants.SENSOR_ID) + params += bytes([0x00, 0x01, 0x00, 0x55]) + params += temperatureBuf + + attrs[self.AttributeDefs.sensor_attr.name] = ( + self.aqara_header(0x12, params, 0x05) + params + ) + + elif attr_def and attr_def.id == self.AttributeDefs.sensor.id: + # set internal/external temperature sensor + device = bytearray.fromhex( + f"{self.endpoint.device.ieee}".replace(":", "") + ) + + timestamp = bytes(reversed(t.uint32_t(int(time.time())).serialize())) + + if value == 0: + # internal sensor + params1 = timestamp + params1 += bytes([0x3D, 0x05]) + params1 += device + params1 += bytes(12) + + params2 = timestamp + params2 += bytes([0x3D, 0x04]) + params2 += device + params2 += bytes(12) + + attrs1 = {} + attrs1[self.AttributeDefs.sensor_attr.name] = ( + self.aqara_header(0x12, params1, 0x04) + params1 + ) + attrs[self.AttributeDefs.sensor_attr.name] = ( + self.aqara_header(0x13, params2, 0x04) + params2 + ) + + await super().write_attributes(attrs1, manufacturer) + else: + # external sensor + params1 = timestamp + params1 += bytes([0x3D, 0x04]) + params1 += device + params1 += Constants.SENSOR_ID + params1 += bytes([0x00, 0x01, 0x00, 0x55]) + params1 += bytes( + [ + 0x13, + 0x0A, + 0x02, + 0x00, + 0x00, + 0x64, + 0x04, + 0xCE, + 0xC2, + 0xB6, + 0xC8, + ] + ) + params1 += bytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x3D]) + params1 += bytes([0x64]) + params1 += bytes([0x65]) + + params2 = timestamp + params2 += bytes([0x3D, 0x05]) + params2 += device + params2 += Constants.SENSOR_ID + params2 += bytes([0x08, 0x00, 0x07, 0xFD]) + params2 += bytes( + [ + 0x16, + 0x0A, + 0x02, + 0x0A, + 0xC9, + 0xE8, + 0xB1, + 0xB8, + 0xD4, + 0xDA, + 0xCF, + 0xDF, + 0xC0, + 0xEB, + ] + ) + params2 += bytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x3D]) + params2 += bytes([0x04]) + params2 += bytes([0x65]) + + attrs1 = {} + attrs1[self.AttributeDefs.sensor_attr.name] = ( + self.aqara_header(0x12, params1, 0x02) + params1 + ) + attrs[self.AttributeDefs.sensor_attr.name] = ( + self.aqara_header(0x13, params2, 0x02) + params2 + ) + + await super().write_attributes(attrs1, manufacturer) + else: + attrs[attr] = value + + result = await super().write_attributes(attrs, manufacturer) + return result + + +class AGL001(XiaomiCustomDevice): + """Aqara E1 Radiator Thermostat (AGL001) Device.""" + + signature = { + # + MODELS_INFO: [(LUMI, "lumi.airrtc.agl001")], + ENDPOINTS: { + 1: { + PROFILE_ID: zha.PROFILE_ID, + DEVICE_TYPE: zha.DeviceType.THERMOSTAT, + INPUT_CLUSTERS: [ + Basic.cluster_id, + Identify.cluster_id, + Thermostat.cluster_id, + Time.cluster_id, + XiaomiPowerConfiguration.cluster_id, + AqaraThermostatSpecificCluster.cluster_id, + ], + OUTPUT_CLUSTERS: [ + Identify.cluster_id, + Thermostat.cluster_id, + AqaraThermostatSpecificCluster.cluster_id, + ], + } + }, + } + + replacement = { + ENDPOINTS: { + 1: { + INPUT_CLUSTERS: [ + Basic.cluster_id, + Identify.cluster_id, + ThermostatCluster, + Time.cluster_id, + XiaomiPowerConfiguration, + AqaraThermostatSpecificCluster, + ], + OUTPUT_CLUSTERS: [ + Identify.cluster_id, + ThermostatCluster, + AqaraThermostatSpecificCluster, + Ota.cluster_id, + ], + } + } + } From d3958ae8b616cb0b1cf16d3b3345deefd5b1903d Mon Sep 17 00:00:00 2001 From: Elyasin Shaladi Date: Mon, 3 Nov 2025 21:27:29 +0100 Subject: [PATCH 2/3] Refactors for maintainability --- tests/test_aqara_trv.py | 1 + zhaquirks/xiaomi/aqara/thermostat_agl001.py | 178 +++++++++++--------- 2 files changed, 98 insertions(+), 81 deletions(-) diff --git a/tests/test_aqara_trv.py b/tests/test_aqara_trv.py index 6d7df1be62..b4956888dc 100755 --- a/tests/test_aqara_trv.py +++ b/tests/test_aqara_trv.py @@ -10,6 +10,7 @@ AqaraThermostatSpecificCluster, ) +# Avoids writing out the AqaraThermostatSpecificCluster.AttributeDefs every time. Defs = AqaraThermostatSpecificCluster.AttributeDefs diff --git a/zhaquirks/xiaomi/aqara/thermostat_agl001.py b/zhaquirks/xiaomi/aqara/thermostat_agl001.py index 601c2aa2c7..895c808ebf 100755 --- a/zhaquirks/xiaomi/aqara/thermostat_agl001.py +++ b/zhaquirks/xiaomi/aqara/thermostat_agl001.py @@ -42,6 +42,36 @@ class Constants: """Constants specific for Aqara E1 TRV.""" SENSOR_ID = bytearray.fromhex("00158d00019d1b98") + SENSOR_ID_SUFFIX = bytes([0x00, 0x01, 0x00, 0x55]) + INTERNAL_SENSOR_ACTION_CODES = [bytes([0x3D, 0x05]), bytes([0x3D, 0x04])] + EXTERNAL_SENSOR_ACTION_CODES = [bytes([0x3D, 0x04]), bytes([0x3D, 0x05])] + INTERNAL_SENSOR_PADDING = bytes(12) + EXTERNAL_SENSOR_DATA_BLOCK_1 = bytes( + [0x13, 0x0A, 0x02, 0x00, 0x00, 0x64, 0x04, 0xCE, 0xC2, 0xB6, 0xC8] + ) + EXTERNAL_SENSOR_DATA_BLOCK_2 = bytes( + [ + 0x16, + 0x0A, + 0x02, + 0x0A, + 0xC9, + 0xE8, + 0xB1, + 0xB8, + 0xD4, + 0xDA, + 0xCF, + 0xDF, + 0xC0, + 0xEB, + ] + ) + EXTERNAL_SENSOR_TRAILING_1 = bytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x3D]) + EXTERNAL_SENSOR_TRAILING_2 = bytes([0x64]) + EXTERNAL_SENSOR_TRAILING_3 = bytes([0x65]) + EXTERNAL_SENSOR_PARAMS2_SUFFIX = bytes([0x08, 0x00, 0x07, 0xFD]) + EXTERNAL_SENSOR_TRAILING_4 = bytes([0x04]) DAYS_MAP = { @@ -463,6 +493,54 @@ def _float_to_hex(self, f): """Convert float to hex.""" return hex(struct.unpack(" list: @@ -495,90 +573,28 @@ async def write_attributes( timestamp = bytes(reversed(t.uint32_t(int(time.time())).serialize())) - if value == 0: - # internal sensor - params1 = timestamp - params1 += bytes([0x3D, 0x05]) - params1 += device - params1 += bytes(12) - - params2 = timestamp - params2 += bytes([0x3D, 0x04]) - params2 += device - params2 += bytes(12) - - attrs1 = {} - attrs1[self.AttributeDefs.sensor_attr.name] = ( - self.aqara_header(0x12, params1, 0x04) + params1 - ) - attrs[self.AttributeDefs.sensor_attr.name] = ( - self.aqara_header(0x13, params2, 0x04) + params2 - ) - - await super().write_attributes(attrs1, manufacturer) - else: - # external sensor - params1 = timestamp - params1 += bytes([0x3D, 0x04]) - params1 += device - params1 += Constants.SENSOR_ID - params1 += bytes([0x00, 0x01, 0x00, 0x55]) - params1 += bytes( - [ - 0x13, - 0x0A, - 0x02, - 0x00, - 0x00, - 0x64, - 0x04, - 0xCE, - 0xC2, - 0xB6, - 0xC8, - ] - ) - params1 += bytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x3D]) - params1 += bytes([0x64]) - params1 += bytes([0x65]) - - params2 = timestamp - params2 += bytes([0x3D, 0x05]) - params2 += device - params2 += Constants.SENSOR_ID - params2 += bytes([0x08, 0x00, 0x07, 0xFD]) - params2 += bytes( - [ - 0x16, - 0x0A, - 0x02, - 0x0A, - 0xC9, - 0xE8, - 0xB1, - 0xB8, - 0xD4, - 0xDA, - 0xCF, - 0xDF, - 0xC0, - 0xEB, - ] - ) - params2 += bytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x3D]) - params2 += bytes([0x04]) - params2 += bytes([0x65]) + is_external = bool(value) + params1, params2 = self._build_sensor_mode_params( + is_external, device, timestamp + ) - attrs1 = {} - attrs1[self.AttributeDefs.sensor_attr.name] = ( - self.aqara_header(0x12, params1, 0x02) + params1 - ) - attrs[self.AttributeDefs.sensor_attr.name] = ( - self.aqara_header(0x13, params2, 0x02) + params2 - ) + attrs1 = {} + attrs1[self.AttributeDefs.sensor_attr.name] = ( + self.aqara_header(0x12, params1, 0x02 if is_external else 0x04) + + params1 + ) + attrs[self.AttributeDefs.sensor_attr.name] = ( + self.aqara_header(0x13, params2, 0x02 if is_external else 0x04) + + params2 + ) - await super().write_attributes(attrs1, manufacturer) + await super().write_attributes(attrs1, manufacturer) else: + self.debug( + "Passing through attribute %r (value: %r) to base implementation; not handled by Aqara quirk.", + attr, + value, + ) attrs[attr] = value result = await super().write_attributes(attrs, manufacturer) From b722c2b2ded6fde515599522c9325f2460689a5c Mon Sep 17 00:00:00 2001 From: Elyasin Shaladi Date: Mon, 3 Nov 2025 21:43:23 +0100 Subject: [PATCH 3/3] Tests read forwarding and mapping for system mode --- tests/test_aqara_trv.py | 46 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/tests/test_aqara_trv.py b/tests/test_aqara_trv.py index b4956888dc..f543e6ac06 100755 --- a/tests/test_aqara_trv.py +++ b/tests/test_aqara_trv.py @@ -7,6 +7,8 @@ from zhaquirks.xiaomi.aqara.thermostat_agl001 import ( AGL001, + XIAOMI_SYSTEM_MODE_MAP, + ZCL_SYSTEM_MODE, AqaraThermostatSpecificCluster, ) @@ -184,3 +186,47 @@ async def async_success(*args, **kwargs): f"Message length changed: {len(first_value)} vs {len(second_value)}" ) assert first_value == second_value, "Message content changed between calls" + + +@pytest.mark.parametrize("quirk", (AGL001,)) +@pytest.mark.asyncio +async def test_system_mode_read_forwarding(zigpy_device_from_quirk, quirk): + """Test that system_mode read is forwarded to the Xiaomi cluster and mapped.""" + + # Create virtual device from the quirk + thermostat_dev = zigpy_device_from_quirk(quirk) + thermostat_cluster = thermostat_dev.endpoints[1].thermostat + + # Patch the Xiaomi cluster's read_attributes method + opple_cluster = thermostat_dev.endpoints[1].opple_cluster + # Simulate Xiaomi cluster returning system_mode = 1 (heat) + xiaomi_system_mode_value = 1 + xiaomi_attr_id = Defs.system_mode.id + xiaomi_result = ({xiaomi_attr_id: xiaomi_system_mode_value}, {}) + + with mock.patch.object( + opple_cluster, "read_attributes", new=mock.AsyncMock(return_value=xiaomi_result) + ) as mock_read: + # Test reading by attribute ID + successful_r, failed_r = await thermostat_cluster.read_attributes( + [ZCL_SYSTEM_MODE] + ) + assert mock_read.called + # Should be mapped to standard ZCL value + assert ZCL_SYSTEM_MODE in successful_r + assert ( + successful_r[ZCL_SYSTEM_MODE] + == XIAOMI_SYSTEM_MODE_MAP[xiaomi_system_mode_value] + ) + + # Test reading by attribute name + mock_read.reset_mock() + successful_r, failed_r = await thermostat_cluster.read_attributes( + [Defs.system_mode.name] + ) + assert mock_read.called + assert ZCL_SYSTEM_MODE in successful_r + assert ( + successful_r[ZCL_SYSTEM_MODE] + == XIAOMI_SYSTEM_MODE_MAP[xiaomi_system_mode_value] + )