-
-
Notifications
You must be signed in to change notification settings - Fork 104
Scanner Update #210
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Scanner Update #210
Changes from 1 commit
688369b
3dc5fb5
90986ef
4e4527d
22085a6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,6 +23,15 @@ | |
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| APPLE_DEVICE_TYPE = { | ||
| 0: "Apple Device", | ||
| 1: "AirTag", | ||
| 2: "Licensed 3rd Party Find My Device", | ||
| 3: "AirPods", | ||
| } | ||
|
|
||
| BATTERY_LEVEL = {0: "Full", 1: "Medium", 2: "Low", 3: "Very Low"} | ||
|
|
||
|
|
||
| class OfflineFindingDevice(ABC): | ||
| """Device discoverable through Apple's bluetooth-based Offline Finding protocol.""" | ||
|
|
@@ -35,12 +44,14 @@ def __init__( | |
| mac_bytes: bytes, | ||
| status_byte: int, | ||
| detected_at: datetime, | ||
| rssi: int | None = None, | ||
| additional_data: dict[Any, Any] | None = None, | ||
| ) -> None: | ||
| """Instantiate an OfflineFindingDevice.""" | ||
| self._mac_bytes: bytes = mac_bytes | ||
| self._status: int = status_byte | ||
| self._detected_at: datetime = detected_at | ||
| self._rssi: int | None = rssi | ||
| self._additional_data: dict[Any, Any] = additional_data or {} | ||
|
|
||
| @property | ||
|
|
@@ -59,24 +70,47 @@ def detected_at(self) -> datetime: | |
| """Timezone-aware datetime of when the device was detected.""" | ||
| return self._detected_at | ||
|
|
||
| @property | ||
| def rssi(self) -> int | None: | ||
| """Received Signal Strength Indicator (RSSI) value.""" | ||
| return self._rssi | ||
|
|
||
| @property | ||
| def additional_data(self) -> dict[Any, Any]: | ||
| """Any additional data. No guarantees about the contents of this dictionary.""" | ||
| return self._additional_data | ||
|
|
||
| @property | ||
| def device_type(self) -> str: | ||
| """Get the device type from status byte.""" | ||
| type_id = (self.status >> 4) & 0b00000011 | ||
| return APPLE_DEVICE_TYPE.get(type_id, "Unknown") | ||
|
|
||
| @property | ||
| def battery_level(self) -> str: | ||
| """Get the battery level from status byte.""" | ||
| battery_id = (self.status >> 6) & 0b00000011 | ||
| return BATTERY_LEVEL.get(battery_id, "Unknown") | ||
|
|
||
| @abstractmethod | ||
| def is_from(self, other_device: HasPublicKey | RollingKeyPairSource) -> bool: | ||
| """Check whether the OF device's identity originates from a specific key source.""" | ||
| raise NotImplementedError | ||
|
|
||
| @abstractmethod | ||
| def print_device(self) -> None: | ||
| """Print human-readable information about the device.""" | ||
| raise NotImplementedError | ||
|
|
||
| @classmethod | ||
| @abstractmethod | ||
| def from_payload( | ||
| cls, | ||
| mac_address: str, | ||
| payload: bytes, | ||
| detected_at: datetime, | ||
| additional_data: dict[Any, Any] | None, | ||
| rssi: int | None = None, | ||
| additional_data: dict[Any, Any] | None = None, | ||
| ) -> OfflineFindingDevice | None: | ||
| """Get a NearbyOfflineFindingDevice object from an OF message payload.""" | ||
| raise NotImplementedError | ||
|
|
@@ -87,6 +121,7 @@ def from_ble_payload( | |
| mac_address: str, | ||
| ble_payload: bytes, | ||
| detected_at: datetime | None = None, | ||
| rssi: int | None = None, | ||
| additional_data: dict[Any, Any] | None = None, | ||
| ) -> OfflineFindingDevice | None: | ||
| """Get a NearbyOfflineFindingDevice object from a BLE packet payload.""" | ||
|
|
@@ -97,6 +132,7 @@ def from_ble_payload( | |
| logger.debug("Unsupported OF type: %s", ble_payload[0]) | ||
| return None | ||
|
|
||
| # Differentiate between Nearby and Separated advertisements by payload length | ||
| device_type = next( | ||
| ( | ||
| dev | ||
|
|
@@ -113,6 +149,7 @@ def from_ble_payload( | |
| mac_address, | ||
| ble_payload[cls.OF_HEADER_SIZE :], | ||
| detected_at or datetime.now().astimezone(), | ||
| rssi, | ||
| additional_data, | ||
| ) | ||
|
|
||
|
|
@@ -131,21 +168,27 @@ def __hash__(self) -> int: | |
| class NearbyOfflineFindingDevice(OfflineFindingDevice): | ||
| """Offline-Finding device in nearby state.""" | ||
|
|
||
| OF_PAYLOAD_LEN = 0x02 # 2 | ||
| OF_PAYLOAD_LEN = 0x02 # 2 bytes (status, 2 bits of public key/mac address) | ||
|
|
||
| def __init__( | ||
| def __init__( # noqa: PLR0913 | ||
| self, | ||
| mac_bytes: bytes, | ||
| status_byte: int, | ||
| first_adv_key_bytes: bytes, | ||
| detected_at: datetime, | ||
| rssi: int | None = None, | ||
| additional_data: dict[Any, Any] | None = None, | ||
| ) -> None: | ||
| """Instantiate a NearbyOfflineFindingDevice.""" | ||
| super().__init__(mac_bytes, status_byte, detected_at, additional_data) | ||
|
|
||
| super().__init__(mac_bytes, status_byte, detected_at, rssi, additional_data) | ||
| # When nearby, only the first 6 bytes of the public key are transmitted | ||
| self._first_adv_key_bytes: bytes = first_adv_key_bytes | ||
|
|
||
| @property | ||
| def adv_key_bytes(self) -> bytes: | ||
|
||
| """Although not a full public key, still identifies device like one.""" | ||
| return self._first_adv_key_bytes | ||
|
|
||
| @override | ||
| def is_from(self, other_device: HasPublicKey | RollingKeyPairSource) -> bool: | ||
| """Check whether the OF device's identity originates from a specific key source.""" | ||
|
|
@@ -179,14 +222,15 @@ def from_payload( | |
| mac_address: str, | ||
| payload: bytes, | ||
| detected_at: datetime, | ||
| rssi: int | None = None, | ||
| additional_data: dict[Any, Any] | None = None, | ||
| ) -> NearbyOfflineFindingDevice | None: | ||
| """Get a NearbyOfflineFindingDevice object from an OF message payload.""" | ||
| if len(payload) != cls.OF_PAYLOAD_LEN: | ||
| logger.error( | ||
| "Invalid OF data length: %s instead of %s", | ||
| "Invalid OF data length for NearbyOfflineFindingDevice: %s instead of %s", | ||
| len(payload), | ||
| payload[1], | ||
| cls.OF_PAYLOAD_LEN, | ||
| ) | ||
|
|
||
| mac_bytes = bytes.fromhex(mac_address.replace(":", "").replace("-", "")) | ||
|
|
@@ -203,14 +247,27 @@ def from_payload( | |
| status_byte, | ||
| partial_pubkey, | ||
| detected_at, | ||
| rssi, | ||
| additional_data, | ||
| ) | ||
|
|
||
| @override | ||
| def print_device(self) -> None: | ||
| """Print human-readable information about the device.""" | ||
| logger.info("Nearby %s - %s", self.device_type, self.mac_address) | ||
| logger.info(" Status byte: 0x%x", self.status) | ||
| logger.info(" Battery lvl: %s", self.battery_level) | ||
| logger.info(" RSSI: %s", self.rssi) | ||
| logger.info(" Extra data:") | ||
| for k, v in sorted(self.additional_data.items()): | ||
| logger.info(" %s: %s", f"{k:20}", v) | ||
| logger.info("\n") | ||
|
||
|
|
||
|
|
||
| class SeparatedOfflineFindingDevice(OfflineFindingDevice, HasPublicKey): | ||
| """Offline-Finding device in separated state.""" | ||
|
|
||
| OF_PAYLOAD_LEN = 0x19 # 25 | ||
| OF_PAYLOAD_LEN = 0x19 # 25 bytes (status, pubkey(22), first 2 bits of MAC/pubkey, hint) | ||
|
|
||
| def __init__( # noqa: PLR0913 | ||
| self, | ||
|
|
@@ -219,11 +276,11 @@ def __init__( # noqa: PLR0913 | |
| public_key: bytes, | ||
| hint: int, | ||
| detected_at: datetime, | ||
| rssi: int | None = None, | ||
| additional_data: dict[Any, Any] | None = None, | ||
| ) -> None: | ||
| """Initialize a :meth:`SeparatedOfflineFindingDevice`.""" | ||
| super().__init__(mac_bytes, status, detected_at, additional_data) | ||
|
|
||
| super().__init__(mac_bytes, status, detected_at, rssi, additional_data) | ||
| self._public_key: bytes = public_key | ||
| self._hint: int = hint | ||
|
|
||
|
|
@@ -271,14 +328,15 @@ def from_payload( | |
| mac_address: str, | ||
| payload: bytes, | ||
| detected_at: datetime, | ||
| rssi: int | None = None, | ||
| additional_data: dict[Any, Any] | None = None, | ||
| ) -> SeparatedOfflineFindingDevice | None: | ||
| """Get a SeparatedOfflineFindingDevice object from an OF message payload.""" | ||
| if len(payload) != cls.OF_PAYLOAD_LEN: | ||
| logger.error( | ||
| "Invalid OF data length: %s instead of %s", | ||
| "Invalid OF data length for SeparatedOfflineFindingDevice: %s instead of %s", | ||
| len(payload), | ||
| payload[1], | ||
| cls.OF_PAYLOAD_LEN, | ||
| ) | ||
| return None | ||
|
|
||
|
|
@@ -301,9 +359,25 @@ def from_payload( | |
| pubkey, | ||
| hint, | ||
| detected_at, | ||
| rssi, | ||
| additional_data, | ||
| ) | ||
|
|
||
| @override | ||
| def print_device(self) -> None: | ||
| """Print human-readable information about the device.""" | ||
| logger.info("Separated %s - %s", self.device_type, self.mac_address) | ||
| logger.info(" Public key: %s", self.adv_key_b64) | ||
| logger.info(" Lookup key: %s", self.hashed_adv_key_b64) | ||
| logger.info(" Status byte: 0x%x", self.status) | ||
| logger.info(" Battery lvl: %s", self.battery_level) | ||
| logger.info(" Hint byte: 0x%x", self.hint) | ||
| logger.info(" RSSI: %s", self.rssi) | ||
| logger.info(" Extra data:") | ||
| for k, v in sorted(self.additional_data.items()): | ||
| logger.info(" %s: %s", f"{k:20}", v) | ||
| logger.info("\n") | ||
|
||
|
|
||
| @override | ||
| def __repr__(self) -> str: | ||
| """Human-readable string representation of an OfflineFindingDevice.""" | ||
|
|
@@ -340,6 +414,38 @@ def __init__(self, loop: asyncio.AbstractEventLoop) -> None: | |
|
|
||
| self._scanner_count: int = 0 | ||
|
|
||
| def print_scanning_results(self, seen_devices: dict[str, list[OfflineFindingDevice]]) -> None: | ||
| """Print summary of each device seen.""" | ||
| logger.info("================ RESULTS =========================") | ||
| for mac, devices in seen_devices.items(): | ||
| avg_rssi = sum(d.rssi for d in devices if d.rssi is not None) / len( | ||
| [d for d in devices if d.rssi is not None] | ||
| ) | ||
| logger.info("Device %s seen %d times, average RSSI: %.1f", mac, len(devices), avg_rssi) | ||
| of_type = ( | ||
| "nearby" if isinstance(devices[0], NearbyOfflineFindingDevice) else "separated" | ||
| ) | ||
| logger.info( | ||
| " %s with %s battery (%s state)", | ||
| devices[0].device_type, | ||
| devices[0].battery_level, | ||
| of_type, | ||
| ) | ||
|
|
||
| if isinstance(devices[0], SeparatedOfflineFindingDevice): | ||
| logger.info(" Public key: %s", devices[0].adv_key_b64) | ||
| logger.info(" Lookup key: %s", devices[0].hashed_adv_key_b64) | ||
|
|
||
| logger.info("===============================================") | ||
|
|
||
| device_type_counts: dict[str, int] = {} | ||
| for devs in seen_devices.values(): | ||
| dev_type = devs[0].device_type | ||
| device_type_counts[dev_type] = device_type_counts.get(dev_type, 0) + 1 | ||
|
|
||
| for dev_type, count in device_type_counts.items(): | ||
| logger.info("Total %s: %d", dev_type, count) | ||
|
|
||
| @classmethod | ||
| async def create(cls) -> OfflineFindingScanner: | ||
| """Create an instance of the scanner.""" | ||
|
|
@@ -365,8 +471,10 @@ async def _scan_callback( | |
| device: BLEDevice, | ||
| data: AdvertisementData, | ||
| ) -> None: | ||
| self._device_fut.set_result((device, data)) | ||
| self._device_fut = self._loop.create_future() | ||
| # Ensure that only one waiting coroutine is notified | ||
| if not self._device_fut.done(): | ||
| self._device_fut.set_result((device, data)) | ||
| self._device_fut = self._loop.create_future() | ||
|
|
||
| async def _wait_for_device(self, timeout: float) -> OfflineFindingDevice | None: | ||
| device, data = await asyncio.wait_for(self._device_fut, timeout=timeout) | ||
|
|
@@ -377,6 +485,11 @@ async def _wait_for_device(self, timeout: float) -> OfflineFindingDevice | None: | |
|
|
||
| detected_at = datetime.now().astimezone() | ||
|
|
||
| # Extract RSSI if it exists | ||
| rssi = None | ||
| if data.rssi is not None: | ||
| rssi = data.rssi | ||
|
||
|
|
||
| try: | ||
| additional_data = device.details.get("props", {}) | ||
| except AttributeError: | ||
|
|
@@ -387,6 +500,7 @@ async def _wait_for_device(self, timeout: float) -> OfflineFindingDevice | None: | |
| device.address, | ||
| apple_data, | ||
| detected_at, | ||
| rssi, | ||
| additional_data, | ||
| ) | ||
|
|
||
|
|
@@ -405,21 +519,34 @@ async def scan_for( | |
| await self._start_scan() | ||
|
|
||
| stop_at = time.time() + timeout | ||
| devices_seen: set[OfflineFindingDevice] = set() | ||
| # Map MAC address to device objects (nearby doesn't send entire pubkey) | ||
| # This avoids double counting when device has different status/hint byte (but same pubkey) | ||
| devices_seen: dict[str, list[OfflineFindingDevice]] = {} | ||
|
|
||
| try: | ||
| time_left = stop_at - time.time() | ||
| while time_left > 0: | ||
| device = await self._wait_for_device(time_left) | ||
| if device is not None and device not in devices_seen: | ||
| devices_seen.add(device) | ||
| if extend_timeout: | ||
| stop_at = time.time() + timeout | ||
| yield device | ||
|
|
||
| if device is not None: | ||
| # Check if we have already seen this device | ||
| new_device = device.mac_address not in devices_seen | ||
|
|
||
| devices_seen[device.mac_address] = [ | ||
| *devices_seen.get(device.mac_address, []), | ||
| device, | ||
| ] | ||
|
|
||
| if new_device: | ||
| if extend_timeout: | ||
| stop_at = time.time() + timeout | ||
| yield device | ||
|
|
||
| time_left = stop_at - time.time() | ||
|
|
||
| except asyncio.TimeoutError: # timeout reached | ||
| self._device_fut = self._loop.create_future() | ||
| self.print_scanning_results(devices_seen) | ||
|
||
| return | ||
| finally: | ||
| await self._stop_scan() | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: maybe swap out the decimals here with their binary representation (0bXX) just for clarity