Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 2 additions & 25 deletions examples/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,6 @@
logging.basicConfig(level=logging.INFO)


def _print_nearby(device: NearbyOfflineFindingDevice) -> None:
print(f"NEARBY Device - {device.mac_address}")
print(f" Status byte: {device.status:x}")
print(" Extra data:")
for k, v in sorted(device.additional_data.items()):
print(f" {k:20}: {v}")
print()


def _print_separated(device: SeparatedOfflineFindingDevice) -> None:
print(f"SEPARATED Device - {device.mac_address}")
print(f" Public key: {device.adv_key_b64}")
print(f" Lookup key: {device.hashed_adv_key_b64}")
print(f" Status byte: {device.status:x}")
print(f" Hint byte: {device.hint:x}")
print(" Extra data:")
for k, v in sorted(device.additional_data.items()):
print(f" {k:20}: {v}")
print()


async def scan(check_key: KeyPair | FindMyAccessory | None = None) -> bool:
scanner = await OfflineFindingScanner.create()

Expand All @@ -46,10 +25,8 @@ async def scan(check_key: KeyPair | FindMyAccessory | None = None) -> bool:
scan_device = None

async for device in scanner.scan_for(10, extend_timeout=True):
if isinstance(device, NearbyOfflineFindingDevice):
_print_nearby(device)
elif isinstance(device, SeparatedOfflineFindingDevice):
_print_separated(device)
if isinstance(device, (SeparatedOfflineFindingDevice, NearbyOfflineFindingDevice)):
device.print_device()
else:
print(f"Unknown device: {device}")
print()
Expand Down
167 changes: 147 additions & 20 deletions findmy/scanner/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@

logger = logging.getLogger(__name__)

APPLE_DEVICE_TYPE = {
0: "Apple Device",
1: "AirTag",
2: "Licensed 3rd Party Find My Device",
3: "AirPods",
}

BATTERY_LEVEL = {0: "Full", 1: "Medium", 2: "Low", 3: "Very Low"}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: maybe swap out the decimals here with their binary representation (0bXX) just for clarity



class OfflineFindingDevice(ABC):
"""Device discoverable through Apple's bluetooth-based Offline Finding protocol."""
Expand All @@ -35,12 +44,14 @@ def __init__(
mac_bytes: bytes,
status_byte: int,
detected_at: datetime,
rssi: int | None = None,
additional_data: dict[Any, Any] | None = None,
) -> None:
"""Instantiate an OfflineFindingDevice."""
self._mac_bytes: bytes = mac_bytes
self._status: int = status_byte
self._detected_at: datetime = detected_at
self._rssi: int | None = rssi
self._additional_data: dict[Any, Any] = additional_data or {}

@property
Expand All @@ -59,24 +70,47 @@ def detected_at(self) -> datetime:
"""Timezone-aware datetime of when the device was detected."""
return self._detected_at

@property
def rssi(self) -> int | None:
"""Received Signal Strength Indicator (RSSI) value."""
return self._rssi

@property
def additional_data(self) -> dict[Any, Any]:
"""Any additional data. No guarantees about the contents of this dictionary."""
return self._additional_data

@property
def device_type(self) -> str:
"""Get the device type from status byte."""
type_id = (self.status >> 4) & 0b00000011
return APPLE_DEVICE_TYPE.get(type_id, "Unknown")

@property
def battery_level(self) -> str:
"""Get the battery level from status byte."""
battery_id = (self.status >> 6) & 0b00000011
return BATTERY_LEVEL.get(battery_id, "Unknown")

@abstractmethod
def is_from(self, other_device: HasPublicKey | RollingKeyPairSource) -> bool:
"""Check whether the OF device's identity originates from a specific key source."""
raise NotImplementedError

@abstractmethod
def print_device(self) -> None:
"""Print human-readable information about the device."""
raise NotImplementedError

@classmethod
@abstractmethod
def from_payload(
cls,
mac_address: str,
payload: bytes,
detected_at: datetime,
additional_data: dict[Any, Any] | None,
rssi: int | None = None,
additional_data: dict[Any, Any] | None = None,
) -> OfflineFindingDevice | None:
"""Get a NearbyOfflineFindingDevice object from an OF message payload."""
raise NotImplementedError
Expand All @@ -87,6 +121,7 @@ def from_ble_payload(
mac_address: str,
ble_payload: bytes,
detected_at: datetime | None = None,
rssi: int | None = None,
additional_data: dict[Any, Any] | None = None,
) -> OfflineFindingDevice | None:
"""Get a NearbyOfflineFindingDevice object from a BLE packet payload."""
Expand All @@ -97,6 +132,7 @@ def from_ble_payload(
logger.debug("Unsupported OF type: %s", ble_payload[0])
return None

# Differentiate between Nearby and Separated advertisements by payload length
device_type = next(
(
dev
Expand All @@ -113,6 +149,7 @@ def from_ble_payload(
mac_address,
ble_payload[cls.OF_HEADER_SIZE :],
detected_at or datetime.now().astimezone(),
rssi,
additional_data,
)

Expand All @@ -131,21 +168,27 @@ def __hash__(self) -> int:
class NearbyOfflineFindingDevice(OfflineFindingDevice):
"""Offline-Finding device in nearby state."""

OF_PAYLOAD_LEN = 0x02 # 2
OF_PAYLOAD_LEN = 0x02 # 2 bytes (status, 2 bits of public key/mac address)

def __init__(
def __init__( # noqa: PLR0913
self,
mac_bytes: bytes,
status_byte: int,
first_adv_key_bytes: bytes,
detected_at: datetime,
rssi: int | None = None,
additional_data: dict[Any, Any] | None = None,
) -> None:
"""Instantiate a NearbyOfflineFindingDevice."""
super().__init__(mac_bytes, status_byte, detected_at, additional_data)

super().__init__(mac_bytes, status_byte, detected_at, rssi, additional_data)
# When nearby, only the first 6 bytes of the public key are transmitted
self._first_adv_key_bytes: bytes = first_adv_key_bytes

@property
def adv_key_bytes(self) -> bytes:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I like this method... I think it could be confusing as its signature is the same as the method on e.g. a KeyPair, but unlike a keypair, the result of this method is incomplete so they aren't directly comparable. I still think it's good to expose this information though, just maybe with a different method name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to standardize the interface but ended up not needing to. I just changed the name so it's still exposed.

"""Although not a full public key, still identifies device like one."""
return self._first_adv_key_bytes

@override
def is_from(self, other_device: HasPublicKey | RollingKeyPairSource) -> bool:
"""Check whether the OF device's identity originates from a specific key source."""
Expand Down Expand Up @@ -179,14 +222,15 @@ def from_payload(
mac_address: str,
payload: bytes,
detected_at: datetime,
rssi: int | None = None,
additional_data: dict[Any, Any] | None = None,
) -> NearbyOfflineFindingDevice | None:
"""Get a NearbyOfflineFindingDevice object from an OF message payload."""
if len(payload) != cls.OF_PAYLOAD_LEN:
logger.error(
"Invalid OF data length: %s instead of %s",
"Invalid OF data length for NearbyOfflineFindingDevice: %s instead of %s",
len(payload),
payload[1],
cls.OF_PAYLOAD_LEN,
)

mac_bytes = bytes.fromhex(mac_address.replace(":", "").replace("-", ""))
Expand All @@ -203,14 +247,27 @@ def from_payload(
status_byte,
partial_pubkey,
detected_at,
rssi,
additional_data,
)

@override
def print_device(self) -> None:
"""Print human-readable information about the device."""
logger.info("Nearby %s - %s", self.device_type, self.mac_address)
logger.info(" Status byte: 0x%x", self.status)
logger.info(" Battery lvl: %s", self.battery_level)
logger.info(" RSSI: %s", self.rssi)
logger.info(" Extra data:")
for k, v in sorted(self.additional_data.items()):
logger.info(" %s: %s", f"{k:20}", v)
logger.info("\n")
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably just use print instead (feel free to suppress the linter.) Otherwise the result of this method will depend on whatever logger settings that are currently applied (if any). Also, maybe a file argument to give to print to allow printing to files for example?



class SeparatedOfflineFindingDevice(OfflineFindingDevice, HasPublicKey):
"""Offline-Finding device in separated state."""

OF_PAYLOAD_LEN = 0x19 # 25
OF_PAYLOAD_LEN = 0x19 # 25 bytes (status, pubkey(22), first 2 bits of MAC/pubkey, hint)

def __init__( # noqa: PLR0913
self,
Expand All @@ -219,11 +276,11 @@ def __init__( # noqa: PLR0913
public_key: bytes,
hint: int,
detected_at: datetime,
rssi: int | None = None,
additional_data: dict[Any, Any] | None = None,
) -> None:
"""Initialize a :meth:`SeparatedOfflineFindingDevice`."""
super().__init__(mac_bytes, status, detected_at, additional_data)

super().__init__(mac_bytes, status, detected_at, rssi, additional_data)
self._public_key: bytes = public_key
self._hint: int = hint

Expand Down Expand Up @@ -271,14 +328,15 @@ def from_payload(
mac_address: str,
payload: bytes,
detected_at: datetime,
rssi: int | None = None,
additional_data: dict[Any, Any] | None = None,
) -> SeparatedOfflineFindingDevice | None:
"""Get a SeparatedOfflineFindingDevice object from an OF message payload."""
if len(payload) != cls.OF_PAYLOAD_LEN:
logger.error(
"Invalid OF data length: %s instead of %s",
"Invalid OF data length for SeparatedOfflineFindingDevice: %s instead of %s",
len(payload),
payload[1],
cls.OF_PAYLOAD_LEN,
)
return None

Expand All @@ -301,9 +359,25 @@ def from_payload(
pubkey,
hint,
detected_at,
rssi,
additional_data,
)

@override
def print_device(self) -> None:
"""Print human-readable information about the device."""
logger.info("Separated %s - %s", self.device_type, self.mac_address)
logger.info(" Public key: %s", self.adv_key_b64)
logger.info(" Lookup key: %s", self.hashed_adv_key_b64)
logger.info(" Status byte: 0x%x", self.status)
logger.info(" Battery lvl: %s", self.battery_level)
logger.info(" Hint byte: 0x%x", self.hint)
logger.info(" RSSI: %s", self.rssi)
logger.info(" Extra data:")
for k, v in sorted(self.additional_data.items()):
logger.info(" %s: %s", f"{k:20}", v)
logger.info("\n")
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, I think using print is probably better


@override
def __repr__(self) -> str:
"""Human-readable string representation of an OfflineFindingDevice."""
Expand Down Expand Up @@ -340,6 +414,38 @@ def __init__(self, loop: asyncio.AbstractEventLoop) -> None:

self._scanner_count: int = 0

def print_scanning_results(self, seen_devices: dict[str, list[OfflineFindingDevice]]) -> None:
"""Print summary of each device seen."""
logger.info("================ RESULTS =========================")
for mac, devices in seen_devices.items():
avg_rssi = sum(d.rssi for d in devices if d.rssi is not None) / len(
[d for d in devices if d.rssi is not None]
)
logger.info("Device %s seen %d times, average RSSI: %.1f", mac, len(devices), avg_rssi)
of_type = (
"nearby" if isinstance(devices[0], NearbyOfflineFindingDevice) else "separated"
)
logger.info(
" %s with %s battery (%s state)",
devices[0].device_type,
devices[0].battery_level,
of_type,
)

if isinstance(devices[0], SeparatedOfflineFindingDevice):
logger.info(" Public key: %s", devices[0].adv_key_b64)
logger.info(" Lookup key: %s", devices[0].hashed_adv_key_b64)

logger.info("===============================================")

device_type_counts: dict[str, int] = {}
for devs in seen_devices.values():
dev_type = devs[0].device_type
device_type_counts[dev_type] = device_type_counts.get(dev_type, 0) + 1

for dev_type, count in device_type_counts.items():
logger.info("Total %s: %d", dev_type, count)

@classmethod
async def create(cls) -> OfflineFindingScanner:
"""Create an instance of the scanner."""
Expand All @@ -365,8 +471,10 @@ async def _scan_callback(
device: BLEDevice,
data: AdvertisementData,
) -> None:
self._device_fut.set_result((device, data))
self._device_fut = self._loop.create_future()
# Ensure that only one waiting coroutine is notified
if not self._device_fut.done():
self._device_fut.set_result((device, data))
self._device_fut = self._loop.create_future()

async def _wait_for_device(self, timeout: float) -> OfflineFindingDevice | None:
device, data = await asyncio.wait_for(self._device_fut, timeout=timeout)
Expand All @@ -377,6 +485,11 @@ async def _wait_for_device(self, timeout: float) -> OfflineFindingDevice | None:

detected_at = datetime.now().astimezone()

# Extract RSSI if it exists
rssi = None
if data.rssi is not None:
rssi = data.rssi
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If data.rssi is already None | int, this if statement is probably redundant


try:
additional_data = device.details.get("props", {})
except AttributeError:
Expand All @@ -387,6 +500,7 @@ async def _wait_for_device(self, timeout: float) -> OfflineFindingDevice | None:
device.address,
apple_data,
detected_at,
rssi,
additional_data,
)

Expand All @@ -405,21 +519,34 @@ async def scan_for(
await self._start_scan()

stop_at = time.time() + timeout
devices_seen: set[OfflineFindingDevice] = set()
# Map MAC address to device objects (nearby doesn't send entire pubkey)
# This avoids double counting when device has different status/hint byte (but same pubkey)
devices_seen: dict[str, list[OfflineFindingDevice]] = {}

try:
time_left = stop_at - time.time()
while time_left > 0:
device = await self._wait_for_device(time_left)
if device is not None and device not in devices_seen:
devices_seen.add(device)
if extend_timeout:
stop_at = time.time() + timeout
yield device

if device is not None:
# Check if we have already seen this device
new_device = device.mac_address not in devices_seen

devices_seen[device.mac_address] = [
*devices_seen.get(device.mac_address, []),
device,
]

if new_device:
if extend_timeout:
stop_at = time.time() + timeout
yield device

time_left = stop_at - time.time()

except asyncio.TimeoutError: # timeout reached
self._device_fut = self._loop.create_future()
self.print_scanning_results(devices_seen)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think whether to print results or not should be left up to the caller instead of doing it by default

return
finally:
await self._stop_scan()