From 688369b8b4694902be3caabe967521b57fd9015c Mon Sep 17 00:00:00 2001 From: Andrew Gaylord <113562174+AGaylord6@users.noreply.github.com> Date: Fri, 21 Nov 2025 11:52:53 -0500 Subject: [PATCH 1/5] Extract rssi/device type, fix crash, and move device printing --- examples/scanner.py | 27 +----- findmy/scanner/scanner.py | 167 +++++++++++++++++++++++++++++++++----- 2 files changed, 149 insertions(+), 45 deletions(-) diff --git a/examples/scanner.py b/examples/scanner.py index 8f929cd..d4a0414 100644 --- a/examples/scanner.py +++ b/examples/scanner.py @@ -16,27 +16,6 @@ logging.basicConfig(level=logging.INFO) -def _print_nearby(device: NearbyOfflineFindingDevice) -> None: - print(f"NEARBY Device - {device.mac_address}") - print(f" Status byte: {device.status:x}") - print(" Extra data:") - for k, v in sorted(device.additional_data.items()): - print(f" {k:20}: {v}") - print() - - -def _print_separated(device: SeparatedOfflineFindingDevice) -> None: - print(f"SEPARATED Device - {device.mac_address}") - print(f" Public key: {device.adv_key_b64}") - print(f" Lookup key: {device.hashed_adv_key_b64}") - print(f" Status byte: {device.status:x}") - print(f" Hint byte: {device.hint:x}") - print(" Extra data:") - for k, v in sorted(device.additional_data.items()): - print(f" {k:20}: {v}") - print() - - async def scan(check_key: KeyPair | FindMyAccessory | None = None) -> bool: scanner = await OfflineFindingScanner.create() @@ -46,10 +25,8 @@ async def scan(check_key: KeyPair | FindMyAccessory | None = None) -> bool: scan_device = None async for device in scanner.scan_for(10, extend_timeout=True): - if isinstance(device, NearbyOfflineFindingDevice): - _print_nearby(device) - elif isinstance(device, SeparatedOfflineFindingDevice): - _print_separated(device) + if isinstance(device, (SeparatedOfflineFindingDevice, NearbyOfflineFindingDevice)): + device.print_device() else: print(f"Unknown device: {device}") print() diff --git a/findmy/scanner/scanner.py b/findmy/scanner/scanner.py index 1f9d7b6..365525f 100644 --- a/findmy/scanner/scanner.py +++ b/findmy/scanner/scanner.py @@ -23,6 +23,15 @@ logger = logging.getLogger(__name__) +APPLE_DEVICE_TYPE = { + 0: "Apple Device", + 1: "AirTag", + 2: "Licensed 3rd Party Find My Device", + 3: "AirPods", +} + +BATTERY_LEVEL = {0: "Full", 1: "Medium", 2: "Low", 3: "Very Low"} + class OfflineFindingDevice(ABC): """Device discoverable through Apple's bluetooth-based Offline Finding protocol.""" @@ -35,12 +44,14 @@ def __init__( mac_bytes: bytes, status_byte: int, detected_at: datetime, + rssi: int | None = None, additional_data: dict[Any, Any] | None = None, ) -> None: """Instantiate an OfflineFindingDevice.""" self._mac_bytes: bytes = mac_bytes self._status: int = status_byte self._detected_at: datetime = detected_at + self._rssi: int | None = rssi self._additional_data: dict[Any, Any] = additional_data or {} @property @@ -59,16 +70,38 @@ def detected_at(self) -> datetime: """Timezone-aware datetime of when the device was detected.""" return self._detected_at + @property + def rssi(self) -> int | None: + """Received Signal Strength Indicator (RSSI) value.""" + return self._rssi + @property def additional_data(self) -> dict[Any, Any]: """Any additional data. No guarantees about the contents of this dictionary.""" return self._additional_data + @property + def device_type(self) -> str: + """Get the device type from status byte.""" + type_id = (self.status >> 4) & 0b00000011 + return APPLE_DEVICE_TYPE.get(type_id, "Unknown") + + @property + def battery_level(self) -> str: + """Get the battery level from status byte.""" + battery_id = (self.status >> 6) & 0b00000011 + return BATTERY_LEVEL.get(battery_id, "Unknown") + @abstractmethod def is_from(self, other_device: HasPublicKey | RollingKeyPairSource) -> bool: """Check whether the OF device's identity originates from a specific key source.""" raise NotImplementedError + @abstractmethod + def print_device(self) -> None: + """Print human-readable information about the device.""" + raise NotImplementedError + @classmethod @abstractmethod def from_payload( @@ -76,7 +109,8 @@ def from_payload( mac_address: str, payload: bytes, detected_at: datetime, - additional_data: dict[Any, Any] | None, + rssi: int | None = None, + additional_data: dict[Any, Any] | None = None, ) -> OfflineFindingDevice | None: """Get a NearbyOfflineFindingDevice object from an OF message payload.""" raise NotImplementedError @@ -87,6 +121,7 @@ def from_ble_payload( mac_address: str, ble_payload: bytes, detected_at: datetime | None = None, + rssi: int | None = None, additional_data: dict[Any, Any] | None = None, ) -> OfflineFindingDevice | None: """Get a NearbyOfflineFindingDevice object from a BLE packet payload.""" @@ -97,6 +132,7 @@ def from_ble_payload( logger.debug("Unsupported OF type: %s", ble_payload[0]) return None + # Differentiate between Nearby and Separated advertisements by payload length device_type = next( ( dev @@ -113,6 +149,7 @@ def from_ble_payload( mac_address, ble_payload[cls.OF_HEADER_SIZE :], detected_at or datetime.now().astimezone(), + rssi, additional_data, ) @@ -131,21 +168,27 @@ def __hash__(self) -> int: class NearbyOfflineFindingDevice(OfflineFindingDevice): """Offline-Finding device in nearby state.""" - OF_PAYLOAD_LEN = 0x02 # 2 + OF_PAYLOAD_LEN = 0x02 # 2 bytes (status, 2 bits of public key/mac address) - def __init__( + def __init__( # noqa: PLR0913 self, mac_bytes: bytes, status_byte: int, first_adv_key_bytes: bytes, detected_at: datetime, + rssi: int | None = None, additional_data: dict[Any, Any] | None = None, ) -> None: """Instantiate a NearbyOfflineFindingDevice.""" - super().__init__(mac_bytes, status_byte, detected_at, additional_data) - + super().__init__(mac_bytes, status_byte, detected_at, rssi, additional_data) + # When nearby, only the first 6 bytes of the public key are transmitted self._first_adv_key_bytes: bytes = first_adv_key_bytes + @property + def adv_key_bytes(self) -> bytes: + """Although not a full public key, still identifies device like one.""" + return self._first_adv_key_bytes + @override def is_from(self, other_device: HasPublicKey | RollingKeyPairSource) -> bool: """Check whether the OF device's identity originates from a specific key source.""" @@ -179,14 +222,15 @@ def from_payload( mac_address: str, payload: bytes, detected_at: datetime, + rssi: int | None = None, additional_data: dict[Any, Any] | None = None, ) -> NearbyOfflineFindingDevice | None: """Get a NearbyOfflineFindingDevice object from an OF message payload.""" if len(payload) != cls.OF_PAYLOAD_LEN: logger.error( - "Invalid OF data length: %s instead of %s", + "Invalid OF data length for NearbyOfflineFindingDevice: %s instead of %s", len(payload), - payload[1], + cls.OF_PAYLOAD_LEN, ) mac_bytes = bytes.fromhex(mac_address.replace(":", "").replace("-", "")) @@ -203,14 +247,27 @@ def from_payload( status_byte, partial_pubkey, detected_at, + rssi, additional_data, ) + @override + def print_device(self) -> None: + """Print human-readable information about the device.""" + logger.info("Nearby %s - %s", self.device_type, self.mac_address) + logger.info(" Status byte: 0x%x", self.status) + logger.info(" Battery lvl: %s", self.battery_level) + logger.info(" RSSI: %s", self.rssi) + logger.info(" Extra data:") + for k, v in sorted(self.additional_data.items()): + logger.info(" %s: %s", f"{k:20}", v) + logger.info("\n") + class SeparatedOfflineFindingDevice(OfflineFindingDevice, HasPublicKey): """Offline-Finding device in separated state.""" - OF_PAYLOAD_LEN = 0x19 # 25 + OF_PAYLOAD_LEN = 0x19 # 25 bytes (status, pubkey(22), first 2 bits of MAC/pubkey, hint) def __init__( # noqa: PLR0913 self, @@ -219,11 +276,11 @@ def __init__( # noqa: PLR0913 public_key: bytes, hint: int, detected_at: datetime, + rssi: int | None = None, additional_data: dict[Any, Any] | None = None, ) -> None: """Initialize a :meth:`SeparatedOfflineFindingDevice`.""" - super().__init__(mac_bytes, status, detected_at, additional_data) - + super().__init__(mac_bytes, status, detected_at, rssi, additional_data) self._public_key: bytes = public_key self._hint: int = hint @@ -271,14 +328,15 @@ def from_payload( mac_address: str, payload: bytes, detected_at: datetime, + rssi: int | None = None, additional_data: dict[Any, Any] | None = None, ) -> SeparatedOfflineFindingDevice | None: """Get a SeparatedOfflineFindingDevice object from an OF message payload.""" if len(payload) != cls.OF_PAYLOAD_LEN: logger.error( - "Invalid OF data length: %s instead of %s", + "Invalid OF data length for SeparatedOfflineFindingDevice: %s instead of %s", len(payload), - payload[1], + cls.OF_PAYLOAD_LEN, ) return None @@ -301,9 +359,25 @@ def from_payload( pubkey, hint, detected_at, + rssi, additional_data, ) + @override + def print_device(self) -> None: + """Print human-readable information about the device.""" + logger.info("Separated %s - %s", self.device_type, self.mac_address) + logger.info(" Public key: %s", self.adv_key_b64) + logger.info(" Lookup key: %s", self.hashed_adv_key_b64) + logger.info(" Status byte: 0x%x", self.status) + logger.info(" Battery lvl: %s", self.battery_level) + logger.info(" Hint byte: 0x%x", self.hint) + logger.info(" RSSI: %s", self.rssi) + logger.info(" Extra data:") + for k, v in sorted(self.additional_data.items()): + logger.info(" %s: %s", f"{k:20}", v) + logger.info("\n") + @override def __repr__(self) -> str: """Human-readable string representation of an OfflineFindingDevice.""" @@ -340,6 +414,38 @@ def __init__(self, loop: asyncio.AbstractEventLoop) -> None: self._scanner_count: int = 0 + def print_scanning_results(self, seen_devices: dict[str, list[OfflineFindingDevice]]) -> None: + """Print summary of each device seen.""" + logger.info("================ RESULTS =========================") + for mac, devices in seen_devices.items(): + avg_rssi = sum(d.rssi for d in devices if d.rssi is not None) / len( + [d for d in devices if d.rssi is not None] + ) + logger.info("Device %s seen %d times, average RSSI: %.1f", mac, len(devices), avg_rssi) + of_type = ( + "nearby" if isinstance(devices[0], NearbyOfflineFindingDevice) else "separated" + ) + logger.info( + " %s with %s battery (%s state)", + devices[0].device_type, + devices[0].battery_level, + of_type, + ) + + if isinstance(devices[0], SeparatedOfflineFindingDevice): + logger.info(" Public key: %s", devices[0].adv_key_b64) + logger.info(" Lookup key: %s", devices[0].hashed_adv_key_b64) + + logger.info("===============================================") + + device_type_counts: dict[str, int] = {} + for devs in seen_devices.values(): + dev_type = devs[0].device_type + device_type_counts[dev_type] = device_type_counts.get(dev_type, 0) + 1 + + for dev_type, count in device_type_counts.items(): + logger.info("Total %s: %d", dev_type, count) + @classmethod async def create(cls) -> OfflineFindingScanner: """Create an instance of the scanner.""" @@ -365,8 +471,10 @@ async def _scan_callback( device: BLEDevice, data: AdvertisementData, ) -> None: - self._device_fut.set_result((device, data)) - self._device_fut = self._loop.create_future() + # Ensure that only one waiting coroutine is notified + if not self._device_fut.done(): + self._device_fut.set_result((device, data)) + self._device_fut = self._loop.create_future() async def _wait_for_device(self, timeout: float) -> OfflineFindingDevice | None: device, data = await asyncio.wait_for(self._device_fut, timeout=timeout) @@ -377,6 +485,11 @@ async def _wait_for_device(self, timeout: float) -> OfflineFindingDevice | None: detected_at = datetime.now().astimezone() + # Extract RSSI if it exists + rssi = None + if data.rssi is not None: + rssi = data.rssi + try: additional_data = device.details.get("props", {}) except AttributeError: @@ -387,6 +500,7 @@ async def _wait_for_device(self, timeout: float) -> OfflineFindingDevice | None: device.address, apple_data, detected_at, + rssi, additional_data, ) @@ -405,21 +519,34 @@ async def scan_for( await self._start_scan() stop_at = time.time() + timeout - devices_seen: set[OfflineFindingDevice] = set() + # Map MAC address to device objects (nearby doesn't send entire pubkey) + # This avoids double counting when device has different status/hint byte (but same pubkey) + devices_seen: dict[str, list[OfflineFindingDevice]] = {} try: time_left = stop_at - time.time() while time_left > 0: device = await self._wait_for_device(time_left) - if device is not None and device not in devices_seen: - devices_seen.add(device) - if extend_timeout: - stop_at = time.time() + timeout - yield device + + if device is not None: + # Check if we have already seen this device + new_device = device.mac_address not in devices_seen + + devices_seen[device.mac_address] = [ + *devices_seen.get(device.mac_address, []), + device, + ] + + if new_device: + if extend_timeout: + stop_at = time.time() + timeout + yield device time_left = stop_at - time.time() + except asyncio.TimeoutError: # timeout reached self._device_fut = self._loop.create_future() + self.print_scanning_results(devices_seen) return finally: await self._stop_scan() From 3dc5fb579f1839c0cb2cf7360e009767b7e099d2 Mon Sep 17 00:00:00 2001 From: Andrew Gaylord <113562174+AGaylord6@users.noreply.github.com> Date: Wed, 26 Nov 2025 18:22:28 -0500 Subject: [PATCH 2/5] Allow user to specify printing to file --- examples/scanner.py | 9 ++- findmy/scanner/scanner.py | 132 ++++++++++++++++++++++++-------------- 2 files changed, 91 insertions(+), 50 deletions(-) diff --git a/examples/scanner.py b/examples/scanner.py index d4a0414..893d574 100644 --- a/examples/scanner.py +++ b/examples/scanner.py @@ -24,9 +24,14 @@ async def scan(check_key: KeyPair | FindMyAccessory | None = None) -> bool: scan_device = None - async for device in scanner.scan_for(10, extend_timeout=True): + scan_out_file = Path("scan_results.jsonl") + # Clear previous scan results + if scan_out_file.exists(): + scan_out_file.unlink() + + async for device in scanner.scan_for(10, extend_timeout=True, print_summary=True): if isinstance(device, (SeparatedOfflineFindingDevice, NearbyOfflineFindingDevice)): - device.print_device() + device.print_device(out_file=scan_out_file) else: print(f"Unknown device: {device}") print() diff --git a/findmy/scanner/scanner.py b/findmy/scanner/scanner.py index 365525f..8c4739f 100644 --- a/findmy/scanner/scanner.py +++ b/findmy/scanner/scanner.py @@ -3,6 +3,7 @@ from __future__ import annotations import asyncio +import json import logging import time from abc import ABC, abstractmethod @@ -17,6 +18,7 @@ if TYPE_CHECKING: from collections.abc import AsyncGenerator + from pathlib import Path from bleak.backends.device import BLEDevice from bleak.backends.scanner import AdvertisementData @@ -24,13 +26,13 @@ logger = logging.getLogger(__name__) APPLE_DEVICE_TYPE = { - 0: "Apple Device", - 1: "AirTag", - 2: "Licensed 3rd Party Find My Device", - 3: "AirPods", + 0b00: "Apple Device", + 0b01: "AirTag", + 0b10: "Licensed 3rd Party Find My Device", + 0b11: "AirPods", } -BATTERY_LEVEL = {0: "Full", 1: "Medium", 2: "Low", 3: "Very Low"} +BATTERY_LEVEL = {0b00: "Full", 0b01: "Medium", 0b10: "Low", 0b11: "Very Low"} class OfflineFindingDevice(ABC): @@ -98,8 +100,8 @@ def is_from(self, other_device: HasPublicKey | RollingKeyPairSource) -> bool: raise NotImplementedError @abstractmethod - def print_device(self) -> None: - """Print human-readable information about the device.""" + def print_device(self, out_file: Path | None = None) -> None: + """Print human-readable information about the device to stdout or file.""" raise NotImplementedError @classmethod @@ -185,7 +187,7 @@ def __init__( # noqa: PLR0913 self._first_adv_key_bytes: bytes = first_adv_key_bytes @property - def adv_key_bytes(self) -> bytes: + def partial_adv_key(self) -> bytes: """Although not a full public key, still identifies device like one.""" return self._first_adv_key_bytes @@ -252,16 +254,33 @@ def from_payload( ) @override - def print_device(self) -> None: - """Print human-readable information about the device.""" - logger.info("Nearby %s - %s", self.device_type, self.mac_address) - logger.info(" Status byte: 0x%x", self.status) - logger.info(" Battery lvl: %s", self.battery_level) - logger.info(" RSSI: %s", self.rssi) - logger.info(" Extra data:") - for k, v in sorted(self.additional_data.items()): - logger.info(" %s: %s", f"{k:20}", v) - logger.info("\n") + def print_device(self, out_file: Path | None = None) -> None: + """Print human-readable information about the device to stdout or file.""" + # ruff: noqa: T201 + if out_file: + data = { + "mac_address": self.mac_address, + "status": self.status, + "device_type": self.device_type, + "battery_level": self.battery_level, + "rssi": self.rssi, + "detected_at": self.detected_at.isoformat(), + "additional_data": self.additional_data, + } + + with out_file.open("a", encoding="utf-8") as f: + # Indent json for readability + json.dump(data, f, indent=4) + f.write("\n") + else: + print(f"Nearby {self.device_type} - {self.mac_address}") + print(f" Status byte: 0x{self.status:x}") + print(f" Battery lvl: {self.battery_level}") + print(f" RSSI: {self.rssi}") + print(" Extra data:") + for k, v in sorted(self.additional_data.items()): + print(f" {k:20}: {v}") + print("\n") class SeparatedOfflineFindingDevice(OfflineFindingDevice, HasPublicKey): @@ -364,19 +383,38 @@ def from_payload( ) @override - def print_device(self) -> None: - """Print human-readable information about the device.""" - logger.info("Separated %s - %s", self.device_type, self.mac_address) - logger.info(" Public key: %s", self.adv_key_b64) - logger.info(" Lookup key: %s", self.hashed_adv_key_b64) - logger.info(" Status byte: 0x%x", self.status) - logger.info(" Battery lvl: %s", self.battery_level) - logger.info(" Hint byte: 0x%x", self.hint) - logger.info(" RSSI: %s", self.rssi) - logger.info(" Extra data:") - for k, v in sorted(self.additional_data.items()): - logger.info(" %s: %s", f"{k:20}", v) - logger.info("\n") + def print_device(self, out_file: Path | None = None) -> None: + """Print human-readable information about the device to stdout or file.""" + # ruff: noqa: T201 + if out_file: + data = { + "mac_address": self.mac_address, + "public_key": self.adv_key_b64, + "lookup_key": self.hashed_adv_key_b64, + "status": self.status, + "device_type": self.device_type, + "battery_level": self.battery_level, + "hint": self.hint, + "rssi": self.rssi, + "detected_at": self.detected_at.isoformat(), + "additional_data": self.additional_data, + } + + with out_file.open("a", encoding="utf-8") as f: + json.dump(data, f, indent=4) + f.write("\n") + else: + print(f"Separated {self.device_type} - {self.mac_address}") + print(f" Public key: {self.adv_key_b64}") + print(f" Lookup key: {self.hashed_adv_key_b64}") + print(f" Status byte: 0x{self.status:x}") + print(f" Battery lvl: {self.battery_level}") + print(f" Hint byte: 0x{self.hint:x}") + print(f" RSSI: {self.rssi}") + print(" Extra data:") + for k, v in sorted(self.additional_data.items()): + print(f" {k:20}: {v}") + print("\n") @override def __repr__(self) -> str: @@ -416,27 +454,25 @@ def __init__(self, loop: asyncio.AbstractEventLoop) -> None: def print_scanning_results(self, seen_devices: dict[str, list[OfflineFindingDevice]]) -> None: """Print summary of each device seen.""" - logger.info("================ RESULTS =========================") + # ruff: noqa: T201 + print("================ RESULTS =========================") for mac, devices in seen_devices.items(): avg_rssi = sum(d.rssi for d in devices if d.rssi is not None) / len( [d for d in devices if d.rssi is not None] ) - logger.info("Device %s seen %d times, average RSSI: %.1f", mac, len(devices), avg_rssi) + print(f"Device {mac} seen {len(devices)} times, average RSSI: {avg_rssi:.1f}") of_type = ( "nearby" if isinstance(devices[0], NearbyOfflineFindingDevice) else "separated" ) - logger.info( - " %s with %s battery (%s state)", - devices[0].device_type, - devices[0].battery_level, - of_type, + print( + f" {devices[0].device_type} with {devices[0].battery_level} battery" + f" ({of_type} state)" ) if isinstance(devices[0], SeparatedOfflineFindingDevice): - logger.info(" Public key: %s", devices[0].adv_key_b64) - logger.info(" Lookup key: %s", devices[0].hashed_adv_key_b64) - - logger.info("===============================================") + print(f" Public key: {devices[0].adv_key_b64}") + print(f" Lookup key: {devices[0].hashed_adv_key_b64}") + print("===============================================") device_type_counts: dict[str, int] = {} for devs in seen_devices.values(): @@ -444,7 +480,7 @@ def print_scanning_results(self, seen_devices: dict[str, list[OfflineFindingDevi device_type_counts[dev_type] = device_type_counts.get(dev_type, 0) + 1 for dev_type, count in device_type_counts.items(): - logger.info("Total %s: %d", dev_type, count) + print(f"Total {dev_type}: {count}") @classmethod async def create(cls) -> OfflineFindingScanner: @@ -485,10 +521,8 @@ async def _wait_for_device(self, timeout: float) -> OfflineFindingDevice | None: detected_at = datetime.now().astimezone() - # Extract RSSI if it exists - rssi = None - if data.rssi is not None: - rssi = data.rssi + # Extract RSSI (which may be None) + rssi = data.rssi try: additional_data = device.details.get("props", {}) @@ -509,6 +543,7 @@ async def scan_for( timeout: float = 10, *, extend_timeout: bool = False, + print_summary: bool = False, ) -> AsyncGenerator[OfflineFindingDevice, None]: """ Scan for :meth:`OfflineFindingDevice`s for up to :meth:`timeout` seconds. @@ -546,7 +581,8 @@ async def scan_for( except asyncio.TimeoutError: # timeout reached self._device_fut = self._loop.create_future() - self.print_scanning_results(devices_seen) + if print_summary: + self.print_scanning_results(devices_seen) return finally: await self._stop_scan() From 90986ef7320b3de6f9c818c76c930af45386a8c8 Mon Sep 17 00:00:00 2001 From: "Mike A." Date: Thu, 27 Nov 2025 17:33:57 +0100 Subject: [PATCH 3/5] Allow printing to any `SupportsWrite` for ofdevice --- findmy/scanner/scanner.py | 81 +++++++++++---------------------------- 1 file changed, 23 insertions(+), 58 deletions(-) diff --git a/findmy/scanner/scanner.py b/findmy/scanner/scanner.py index 8c4739f..d18e4bc 100644 --- a/findmy/scanner/scanner.py +++ b/findmy/scanner/scanner.py @@ -3,7 +3,6 @@ from __future__ import annotations import asyncio -import json import logging import time from abc import ABC, abstractmethod @@ -18,8 +17,8 @@ if TYPE_CHECKING: from collections.abc import AsyncGenerator - from pathlib import Path + from _typeshed import SupportsWrite from bleak.backends.device import BLEDevice from bleak.backends.scanner import AdvertisementData @@ -100,7 +99,7 @@ def is_from(self, other_device: HasPublicKey | RollingKeyPairSource) -> bool: raise NotImplementedError @abstractmethod - def print_device(self, out_file: Path | None = None) -> None: + def print(self, file: SupportsWrite[str] | None = None) -> None: """Print human-readable information about the device to stdout or file.""" raise NotImplementedError @@ -254,33 +253,17 @@ def from_payload( ) @override - def print_device(self, out_file: Path | None = None) -> None: + def print(self, file: SupportsWrite[str] | None = None) -> None: """Print human-readable information about the device to stdout or file.""" # ruff: noqa: T201 - if out_file: - data = { - "mac_address": self.mac_address, - "status": self.status, - "device_type": self.device_type, - "battery_level": self.battery_level, - "rssi": self.rssi, - "detected_at": self.detected_at.isoformat(), - "additional_data": self.additional_data, - } - - with out_file.open("a", encoding="utf-8") as f: - # Indent json for readability - json.dump(data, f, indent=4) - f.write("\n") - else: - print(f"Nearby {self.device_type} - {self.mac_address}") - print(f" Status byte: 0x{self.status:x}") - print(f" Battery lvl: {self.battery_level}") - print(f" RSSI: {self.rssi}") - print(" Extra data:") - for k, v in sorted(self.additional_data.items()): - print(f" {k:20}: {v}") - print("\n") + print(f"Nearby {self.device_type} - {self.mac_address}", file=file) + print(f" Status byte: 0x{self.status:x}", file=file) + print(f" Battery lvl: {self.battery_level}", file=file) + print(f" RSSI: {self.rssi}", file=file) + print(" Extra data:", file=file) + for k, v in sorted(self.additional_data.items()): + print(f" {k:20}: {v}", file=file) + print("\n", file=file) class SeparatedOfflineFindingDevice(OfflineFindingDevice, HasPublicKey): @@ -383,38 +366,20 @@ def from_payload( ) @override - def print_device(self, out_file: Path | None = None) -> None: + def print(self, file: SupportsWrite[str] | None = None) -> None: """Print human-readable information about the device to stdout or file.""" # ruff: noqa: T201 - if out_file: - data = { - "mac_address": self.mac_address, - "public_key": self.adv_key_b64, - "lookup_key": self.hashed_adv_key_b64, - "status": self.status, - "device_type": self.device_type, - "battery_level": self.battery_level, - "hint": self.hint, - "rssi": self.rssi, - "detected_at": self.detected_at.isoformat(), - "additional_data": self.additional_data, - } - - with out_file.open("a", encoding="utf-8") as f: - json.dump(data, f, indent=4) - f.write("\n") - else: - print(f"Separated {self.device_type} - {self.mac_address}") - print(f" Public key: {self.adv_key_b64}") - print(f" Lookup key: {self.hashed_adv_key_b64}") - print(f" Status byte: 0x{self.status:x}") - print(f" Battery lvl: {self.battery_level}") - print(f" Hint byte: 0x{self.hint:x}") - print(f" RSSI: {self.rssi}") - print(" Extra data:") - for k, v in sorted(self.additional_data.items()): - print(f" {k:20}: {v}") - print("\n") + print(f"Separated {self.device_type} - {self.mac_address}", file=file) + print(f" Public key: {self.adv_key_b64}", file=file) + print(f" Lookup key: {self.hashed_adv_key_b64}", file=file) + print(f" Status byte: 0x{self.status:x}", file=file) + print(f" Battery lvl: {self.battery_level}", file=file) + print(f" Hint byte: 0x{self.hint:x}", file=file) + print(f" RSSI: {self.rssi}", file=file) + print(" Extra data:", file=file) + for k, v in sorted(self.additional_data.items()): + print(f" {k:20}: {v}", file=file) + print("\n", file=file) @override def __repr__(self) -> str: From 4e4527d6b1127c4a10779c2b340156f8bdc8ef3e Mon Sep 17 00:00:00 2001 From: "Mike A." Date: Thu, 27 Nov 2025 17:38:23 +0100 Subject: [PATCH 4/5] Make `print_scanning_results` private --- findmy/scanner/scanner.py | 61 +++++++++++++++++++-------------------- 1 file changed, 30 insertions(+), 31 deletions(-) diff --git a/findmy/scanner/scanner.py b/findmy/scanner/scanner.py index d18e4bc..3b97372 100644 --- a/findmy/scanner/scanner.py +++ b/findmy/scanner/scanner.py @@ -34,6 +34,35 @@ BATTERY_LEVEL = {0b00: "Full", 0b01: "Medium", 0b10: "Low", 0b11: "Very Low"} +def _print_scanning_results(seen_devices: dict[str, list[OfflineFindingDevice]]) -> None: + """Print summary of each device seen.""" + # ruff: noqa: T201 + print("================ RESULTS =========================") + for mac, devices in seen_devices.items(): + avg_rssi = sum(d.rssi for d in devices if d.rssi is not None) / len( + [d for d in devices if d.rssi is not None] + ) + print(f"Device {mac} seen {len(devices)} times, average RSSI: {avg_rssi:.1f}") + of_type = "nearby" if isinstance(devices[0], NearbyOfflineFindingDevice) else "separated" + print( + f" {devices[0].device_type} with {devices[0].battery_level} battery" + f" ({of_type} state)" + ) + + if isinstance(devices[0], SeparatedOfflineFindingDevice): + print(f" Public key: {devices[0].adv_key_b64}") + print(f" Lookup key: {devices[0].hashed_adv_key_b64}") + print("===============================================") + + device_type_counts: dict[str, int] = {} + for devs in seen_devices.values(): + dev_type = devs[0].device_type + device_type_counts[dev_type] = device_type_counts.get(dev_type, 0) + 1 + + for dev_type, count in device_type_counts.items(): + print(f"Total {dev_type}: {count}") + + class OfflineFindingDevice(ABC): """Device discoverable through Apple's bluetooth-based Offline Finding protocol.""" @@ -417,36 +446,6 @@ def __init__(self, loop: asyncio.AbstractEventLoop) -> None: self._scanner_count: int = 0 - def print_scanning_results(self, seen_devices: dict[str, list[OfflineFindingDevice]]) -> None: - """Print summary of each device seen.""" - # ruff: noqa: T201 - print("================ RESULTS =========================") - for mac, devices in seen_devices.items(): - avg_rssi = sum(d.rssi for d in devices if d.rssi is not None) / len( - [d for d in devices if d.rssi is not None] - ) - print(f"Device {mac} seen {len(devices)} times, average RSSI: {avg_rssi:.1f}") - of_type = ( - "nearby" if isinstance(devices[0], NearbyOfflineFindingDevice) else "separated" - ) - print( - f" {devices[0].device_type} with {devices[0].battery_level} battery" - f" ({of_type} state)" - ) - - if isinstance(devices[0], SeparatedOfflineFindingDevice): - print(f" Public key: {devices[0].adv_key_b64}") - print(f" Lookup key: {devices[0].hashed_adv_key_b64}") - print("===============================================") - - device_type_counts: dict[str, int] = {} - for devs in seen_devices.values(): - dev_type = devs[0].device_type - device_type_counts[dev_type] = device_type_counts.get(dev_type, 0) + 1 - - for dev_type, count in device_type_counts.items(): - print(f"Total {dev_type}: {count}") - @classmethod async def create(cls) -> OfflineFindingScanner: """Create an instance of the scanner.""" @@ -547,7 +546,7 @@ async def scan_for( except asyncio.TimeoutError: # timeout reached self._device_fut = self._loop.create_future() if print_summary: - self.print_scanning_results(devices_seen) + _print_scanning_results(devices_seen) return finally: await self._stop_scan() From 22085a680ebf9c2d5eafc95ad8eb66e08dd65b22 Mon Sep 17 00:00:00 2001 From: "Mike A." Date: Thu, 27 Nov 2025 17:39:40 +0100 Subject: [PATCH 5/5] Fix scanner example --- examples/scanner.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/examples/scanner.py b/examples/scanner.py index 893d574..3dfd37c 100644 --- a/examples/scanner.py +++ b/examples/scanner.py @@ -8,9 +8,7 @@ from findmy import ( FindMyAccessory, KeyPair, - NearbyOfflineFindingDevice, OfflineFindingScanner, - SeparatedOfflineFindingDevice, ) logging.basicConfig(level=logging.INFO) @@ -30,13 +28,6 @@ async def scan(check_key: KeyPair | FindMyAccessory | None = None) -> bool: scan_out_file.unlink() async for device in scanner.scan_for(10, extend_timeout=True, print_summary=True): - if isinstance(device, (SeparatedOfflineFindingDevice, NearbyOfflineFindingDevice)): - device.print_device(out_file=scan_out_file) - else: - print(f"Unknown device: {device}") - print() - continue - if check_key and device.is_from(check_key): scan_device = device