diff --git a/examples/scanner.py b/examples/scanner.py index 8f929cd..3dfd37c 100644 --- a/examples/scanner.py +++ b/examples/scanner.py @@ -8,35 +8,12 @@ from findmy import ( FindMyAccessory, KeyPair, - NearbyOfflineFindingDevice, OfflineFindingScanner, - SeparatedOfflineFindingDevice, ) logging.basicConfig(level=logging.INFO) -def _print_nearby(device: NearbyOfflineFindingDevice) -> None: - print(f"NEARBY Device - {device.mac_address}") - print(f" Status byte: {device.status:x}") - print(" Extra data:") - for k, v in sorted(device.additional_data.items()): - print(f" {k:20}: {v}") - print() - - -def _print_separated(device: SeparatedOfflineFindingDevice) -> None: - print(f"SEPARATED Device - {device.mac_address}") - print(f" Public key: {device.adv_key_b64}") - print(f" Lookup key: {device.hashed_adv_key_b64}") - print(f" Status byte: {device.status:x}") - print(f" Hint byte: {device.hint:x}") - print(" Extra data:") - for k, v in sorted(device.additional_data.items()): - print(f" {k:20}: {v}") - print() - - async def scan(check_key: KeyPair | FindMyAccessory | None = None) -> bool: scanner = await OfflineFindingScanner.create() @@ -45,16 +22,12 @@ async def scan(check_key: KeyPair | FindMyAccessory | None = None) -> bool: scan_device = None - async for device in scanner.scan_for(10, extend_timeout=True): - if isinstance(device, NearbyOfflineFindingDevice): - _print_nearby(device) - elif isinstance(device, SeparatedOfflineFindingDevice): - _print_separated(device) - else: - print(f"Unknown device: {device}") - print() - continue + scan_out_file = Path("scan_results.jsonl") + # Clear previous scan results + if scan_out_file.exists(): + scan_out_file.unlink() + async for device in scanner.scan_for(10, extend_timeout=True, print_summary=True): if check_key and device.is_from(check_key): scan_device = device diff --git a/findmy/scanner/scanner.py b/findmy/scanner/scanner.py index 1f9d7b6..3b97372 100644 --- a/findmy/scanner/scanner.py +++ b/findmy/scanner/scanner.py @@ -18,11 +18,50 @@ if TYPE_CHECKING: from collections.abc import AsyncGenerator + from _typeshed import SupportsWrite from bleak.backends.device import BLEDevice from bleak.backends.scanner import AdvertisementData logger = logging.getLogger(__name__) +APPLE_DEVICE_TYPE = { + 0b00: "Apple Device", + 0b01: "AirTag", + 0b10: "Licensed 3rd Party Find My Device", + 0b11: "AirPods", +} + +BATTERY_LEVEL = {0b00: "Full", 0b01: "Medium", 0b10: "Low", 0b11: "Very Low"} + + +def _print_scanning_results(seen_devices: dict[str, list[OfflineFindingDevice]]) -> None: + """Print summary of each device seen.""" + # ruff: noqa: T201 + print("================ RESULTS =========================") + for mac, devices in seen_devices.items(): + avg_rssi = sum(d.rssi for d in devices if d.rssi is not None) / len( + [d for d in devices if d.rssi is not None] + ) + print(f"Device {mac} seen {len(devices)} times, average RSSI: {avg_rssi:.1f}") + of_type = "nearby" if isinstance(devices[0], NearbyOfflineFindingDevice) else "separated" + print( + f" {devices[0].device_type} with {devices[0].battery_level} battery" + f" ({of_type} state)" + ) + + if isinstance(devices[0], SeparatedOfflineFindingDevice): + print(f" Public key: {devices[0].adv_key_b64}") + print(f" Lookup key: {devices[0].hashed_adv_key_b64}") + print("===============================================") + + device_type_counts: dict[str, int] = {} + for devs in seen_devices.values(): + dev_type = devs[0].device_type + device_type_counts[dev_type] = device_type_counts.get(dev_type, 0) + 1 + + for dev_type, count in device_type_counts.items(): + print(f"Total {dev_type}: {count}") + class OfflineFindingDevice(ABC): """Device discoverable through Apple's bluetooth-based Offline Finding protocol.""" @@ -35,12 +74,14 @@ def __init__( mac_bytes: bytes, status_byte: int, detected_at: datetime, + rssi: int | None = None, additional_data: dict[Any, Any] | None = None, ) -> None: """Instantiate an OfflineFindingDevice.""" self._mac_bytes: bytes = mac_bytes self._status: int = status_byte self._detected_at: datetime = detected_at + self._rssi: int | None = rssi self._additional_data: dict[Any, Any] = additional_data or {} @property @@ -59,16 +100,38 @@ def detected_at(self) -> datetime: """Timezone-aware datetime of when the device was detected.""" return self._detected_at + @property + def rssi(self) -> int | None: + """Received Signal Strength Indicator (RSSI) value.""" + return self._rssi + @property def additional_data(self) -> dict[Any, Any]: """Any additional data. No guarantees about the contents of this dictionary.""" return self._additional_data + @property + def device_type(self) -> str: + """Get the device type from status byte.""" + type_id = (self.status >> 4) & 0b00000011 + return APPLE_DEVICE_TYPE.get(type_id, "Unknown") + + @property + def battery_level(self) -> str: + """Get the battery level from status byte.""" + battery_id = (self.status >> 6) & 0b00000011 + return BATTERY_LEVEL.get(battery_id, "Unknown") + @abstractmethod def is_from(self, other_device: HasPublicKey | RollingKeyPairSource) -> bool: """Check whether the OF device's identity originates from a specific key source.""" raise NotImplementedError + @abstractmethod + def print(self, file: SupportsWrite[str] | None = None) -> None: + """Print human-readable information about the device to stdout or file.""" + raise NotImplementedError + @classmethod @abstractmethod def from_payload( @@ -76,7 +139,8 @@ def from_payload( mac_address: str, payload: bytes, detected_at: datetime, - additional_data: dict[Any, Any] | None, + rssi: int | None = None, + additional_data: dict[Any, Any] | None = None, ) -> OfflineFindingDevice | None: """Get a NearbyOfflineFindingDevice object from an OF message payload.""" raise NotImplementedError @@ -87,6 +151,7 @@ def from_ble_payload( mac_address: str, ble_payload: bytes, detected_at: datetime | None = None, + rssi: int | None = None, additional_data: dict[Any, Any] | None = None, ) -> OfflineFindingDevice | None: """Get a NearbyOfflineFindingDevice object from a BLE packet payload.""" @@ -97,6 +162,7 @@ def from_ble_payload( logger.debug("Unsupported OF type: %s", ble_payload[0]) return None + # Differentiate between Nearby and Separated advertisements by payload length device_type = next( ( dev @@ -113,6 +179,7 @@ def from_ble_payload( mac_address, ble_payload[cls.OF_HEADER_SIZE :], detected_at or datetime.now().astimezone(), + rssi, additional_data, ) @@ -131,21 +198,27 @@ def __hash__(self) -> int: class NearbyOfflineFindingDevice(OfflineFindingDevice): """Offline-Finding device in nearby state.""" - OF_PAYLOAD_LEN = 0x02 # 2 + OF_PAYLOAD_LEN = 0x02 # 2 bytes (status, 2 bits of public key/mac address) - def __init__( + def __init__( # noqa: PLR0913 self, mac_bytes: bytes, status_byte: int, first_adv_key_bytes: bytes, detected_at: datetime, + rssi: int | None = None, additional_data: dict[Any, Any] | None = None, ) -> None: """Instantiate a NearbyOfflineFindingDevice.""" - super().__init__(mac_bytes, status_byte, detected_at, additional_data) - + super().__init__(mac_bytes, status_byte, detected_at, rssi, additional_data) + # When nearby, only the first 6 bytes of the public key are transmitted self._first_adv_key_bytes: bytes = first_adv_key_bytes + @property + def partial_adv_key(self) -> bytes: + """Although not a full public key, still identifies device like one.""" + return self._first_adv_key_bytes + @override def is_from(self, other_device: HasPublicKey | RollingKeyPairSource) -> bool: """Check whether the OF device's identity originates from a specific key source.""" @@ -179,14 +252,15 @@ def from_payload( mac_address: str, payload: bytes, detected_at: datetime, + rssi: int | None = None, additional_data: dict[Any, Any] | None = None, ) -> NearbyOfflineFindingDevice | None: """Get a NearbyOfflineFindingDevice object from an OF message payload.""" if len(payload) != cls.OF_PAYLOAD_LEN: logger.error( - "Invalid OF data length: %s instead of %s", + "Invalid OF data length for NearbyOfflineFindingDevice: %s instead of %s", len(payload), - payload[1], + cls.OF_PAYLOAD_LEN, ) mac_bytes = bytes.fromhex(mac_address.replace(":", "").replace("-", "")) @@ -203,14 +277,28 @@ def from_payload( status_byte, partial_pubkey, detected_at, + rssi, additional_data, ) + @override + def print(self, file: SupportsWrite[str] | None = None) -> None: + """Print human-readable information about the device to stdout or file.""" + # ruff: noqa: T201 + print(f"Nearby {self.device_type} - {self.mac_address}", file=file) + print(f" Status byte: 0x{self.status:x}", file=file) + print(f" Battery lvl: {self.battery_level}", file=file) + print(f" RSSI: {self.rssi}", file=file) + print(" Extra data:", file=file) + for k, v in sorted(self.additional_data.items()): + print(f" {k:20}: {v}", file=file) + print("\n", file=file) + class SeparatedOfflineFindingDevice(OfflineFindingDevice, HasPublicKey): """Offline-Finding device in separated state.""" - OF_PAYLOAD_LEN = 0x19 # 25 + OF_PAYLOAD_LEN = 0x19 # 25 bytes (status, pubkey(22), first 2 bits of MAC/pubkey, hint) def __init__( # noqa: PLR0913 self, @@ -219,11 +307,11 @@ def __init__( # noqa: PLR0913 public_key: bytes, hint: int, detected_at: datetime, + rssi: int | None = None, additional_data: dict[Any, Any] | None = None, ) -> None: """Initialize a :meth:`SeparatedOfflineFindingDevice`.""" - super().__init__(mac_bytes, status, detected_at, additional_data) - + super().__init__(mac_bytes, status, detected_at, rssi, additional_data) self._public_key: bytes = public_key self._hint: int = hint @@ -271,14 +359,15 @@ def from_payload( mac_address: str, payload: bytes, detected_at: datetime, + rssi: int | None = None, additional_data: dict[Any, Any] | None = None, ) -> SeparatedOfflineFindingDevice | None: """Get a SeparatedOfflineFindingDevice object from an OF message payload.""" if len(payload) != cls.OF_PAYLOAD_LEN: logger.error( - "Invalid OF data length: %s instead of %s", + "Invalid OF data length for SeparatedOfflineFindingDevice: %s instead of %s", len(payload), - payload[1], + cls.OF_PAYLOAD_LEN, ) return None @@ -301,9 +390,26 @@ def from_payload( pubkey, hint, detected_at, + rssi, additional_data, ) + @override + def print(self, file: SupportsWrite[str] | None = None) -> None: + """Print human-readable information about the device to stdout or file.""" + # ruff: noqa: T201 + print(f"Separated {self.device_type} - {self.mac_address}", file=file) + print(f" Public key: {self.adv_key_b64}", file=file) + print(f" Lookup key: {self.hashed_adv_key_b64}", file=file) + print(f" Status byte: 0x{self.status:x}", file=file) + print(f" Battery lvl: {self.battery_level}", file=file) + print(f" Hint byte: 0x{self.hint:x}", file=file) + print(f" RSSI: {self.rssi}", file=file) + print(" Extra data:", file=file) + for k, v in sorted(self.additional_data.items()): + print(f" {k:20}: {v}", file=file) + print("\n", file=file) + @override def __repr__(self) -> str: """Human-readable string representation of an OfflineFindingDevice.""" @@ -365,8 +471,10 @@ async def _scan_callback( device: BLEDevice, data: AdvertisementData, ) -> None: - self._device_fut.set_result((device, data)) - self._device_fut = self._loop.create_future() + # Ensure that only one waiting coroutine is notified + if not self._device_fut.done(): + self._device_fut.set_result((device, data)) + self._device_fut = self._loop.create_future() async def _wait_for_device(self, timeout: float) -> OfflineFindingDevice | None: device, data = await asyncio.wait_for(self._device_fut, timeout=timeout) @@ -377,6 +485,9 @@ async def _wait_for_device(self, timeout: float) -> OfflineFindingDevice | None: detected_at = datetime.now().astimezone() + # Extract RSSI (which may be None) + rssi = data.rssi + try: additional_data = device.details.get("props", {}) except AttributeError: @@ -387,6 +498,7 @@ async def _wait_for_device(self, timeout: float) -> OfflineFindingDevice | None: device.address, apple_data, detected_at, + rssi, additional_data, ) @@ -395,6 +507,7 @@ async def scan_for( timeout: float = 10, *, extend_timeout: bool = False, + print_summary: bool = False, ) -> AsyncGenerator[OfflineFindingDevice, None]: """ Scan for :meth:`OfflineFindingDevice`s for up to :meth:`timeout` seconds. @@ -405,21 +518,35 @@ async def scan_for( await self._start_scan() stop_at = time.time() + timeout - devices_seen: set[OfflineFindingDevice] = set() + # Map MAC address to device objects (nearby doesn't send entire pubkey) + # This avoids double counting when device has different status/hint byte (but same pubkey) + devices_seen: dict[str, list[OfflineFindingDevice]] = {} try: time_left = stop_at - time.time() while time_left > 0: device = await self._wait_for_device(time_left) - if device is not None and device not in devices_seen: - devices_seen.add(device) - if extend_timeout: - stop_at = time.time() + timeout - yield device + + if device is not None: + # Check if we have already seen this device + new_device = device.mac_address not in devices_seen + + devices_seen[device.mac_address] = [ + *devices_seen.get(device.mac_address, []), + device, + ] + + if new_device: + if extend_timeout: + stop_at = time.time() + timeout + yield device time_left = stop_at - time.time() + except asyncio.TimeoutError: # timeout reached self._device_fut = self._loop.create_future() + if print_summary: + _print_scanning_results(devices_seen) return finally: await self._stop_scan()