diff --git a/.gitignore b/.gitignore index a47fb1d..97b39f1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,37 @@ -.idea -__pycache__ \ No newline at end of file +# Diretórios de ambiente virtual +venv/ +.env/ +.venv/ + +# Cache do Python +__pycache__/ +*.py[cod] +*$py.class + +# Arquivos de log e temporários +*.log +*.tmp + +# Configurações de IDE/editor +.vscode/ +.idea/ + +# Arquivos de testes +htmlcov/ +.tox/ +.coverage +.cache/ +.pytest_cache/ + +# Arquivos de pacotes +*.egg +*.egg-info/ +dist/ +build/ + +# Jupyter Notebooks checkpoints (se usar) +.ipynb_checkpoints/ + +# Arquivos de sistema +.DS_Store +Thumbs.db diff --git a/README.md b/README.md index a7a2a95..0e58af9 100644 --- a/README.md +++ b/README.md @@ -1,53 +1,27 @@ -# **AirStatus for Linux** -#### Check your AirPods battery level on Linux +# AirStatus for Linux -#### What is it? -This is a Python 3.6 script, forked from [faglo/AirStatus](https://github.com/faglo/AirStatus) that allows you to check AirPods battery level from your terminal, as JSON output. +Check your AirPods battery level on Linux — now in real time via WebSocket! -### Usage +## 🔍 What is it? -``` -python3 main.py [output_file] -``` - -Output will be stored in `output_file` if specified. +This is a Python 3+ WebSocket server, forked from [`faglo/AirStatus`](https://github.com/faglo/AirStatus), that scans for your AirPods using Bluetooth Low Energy and streams battery and charging information as JSON in real time through a WebSocket connection. -#### Example output +Originally a terminal script, this version was upgraded to serve clients over the web using FastAPI. -``` -{"status": 1, "charge": {"left": 95, "right": 95, "case": -1}, "charging_left": false, "charging_right": false, "charging_case": false, "model": "AirPodsPro", "date": "2021-12-22 11:09:05"} -``` +--- -### Installing as a service +## 🚀 Features -Create the file `/etc/systemd/system/airstatus.service` (as root) containing: -``` -[Unit] -Description=AirPods Battery Monitor +- 🔋 Real-time AirPods battery and charging status +- 📡 WebSocket endpoint for easy integration with web or mobile apps +- 📦 JSON format output +- ⚙️ Configurable update interval -[Service] -ExecStart=/usr/bin/python3 /PATH/TO/AirStatus/main.py /tmp/airstatus.out -Restart=always -RestartSec=3 +--- -[Install] -WantedBy=default.target -``` +## ⚙️ Requirements -Start the service: -``` -sudo systemctl start airstatus -``` +```bash +pip install -r requirements.txt -Enable service on boot: - ``` -sudo systemctl enable airstatus ``` - -#### Can I customize it easily? -**Yes, you can!** - -You can change the **update frequency** within the main.py file - -#### Used materials -* Some code from [this repo](https://github.com/ohanedan/Airpods-Windows-Service) diff --git a/main.py b/main.py index 438420d..973a3d5 100644 --- a/main.py +++ b/main.py @@ -1,17 +1,18 @@ -from bleak import discover -from asyncio import new_event_loop, set_event_loop, get_event_loop -from time import sleep, time_ns +from fastapi import FastAPI, WebSocket, WebSocketDisconnect +from bleak import BleakScanner +from asyncio import sleep from binascii import hexlify -from json import dumps -from sys import argv from datetime import datetime +import json +from time import time_ns + +app = FastAPI() -# Configure update duration (update after n seconds) -UPDATE_DURATION = 1 MIN_RSSI = -60 AIRPODS_MANUFACTURER = 76 AIRPODS_DATA_LENGTH = 54 -RECENT_BEACONS_MAX_T_NS = 10000000000 # 10 Seconds +RECENT_BEACONS_MAX_T_NS = 10_000_000_000 # 10 segundos +UPDATE_DURATION = 1 recent_beacons = [] @@ -21,129 +22,70 @@ def get_best_result(device): "time": time_ns(), "device": device }) - strongest_beacon = None + strongest = None i = 0 while i < len(recent_beacons): - if(time_ns() - recent_beacons[i]["time"] > RECENT_BEACONS_MAX_T_NS): + if time_ns() - recent_beacons[i]["time"] > RECENT_BEACONS_MAX_T_NS: recent_beacons.pop(i) continue - if (strongest_beacon == None or strongest_beacon.rssi < recent_beacons[i]["device"].rssi): - strongest_beacon = recent_beacons[i]["device"] + if strongest is None or strongest.rssi < recent_beacons[i]["device"].rssi: + strongest = recent_beacons[i]["device"] i += 1 + return device if strongest and strongest.address == device.address else strongest - if (strongest_beacon != None and strongest_beacon.address == device.address): - strongest_beacon = device - - return strongest_beacon - -# Getting data with hex format async def get_device(): - # Scanning for devices - devices = await discover() + devices = await BleakScanner.discover() for d in devices: - # Checking for AirPods d = get_best_result(d) - if d.rssi >= MIN_RSSI and AIRPODS_MANUFACTURER in d.metadata['manufacturer_data']: - data_hex = hexlify(bytearray(d.metadata['manufacturer_data'][AIRPODS_MANUFACTURER])) - data_length = len(hexlify(bytearray(d.metadata['manufacturer_data'][AIRPODS_MANUFACTURER]))) - if data_length == AIRPODS_DATA_LENGTH: + if d and d.rssi >= MIN_RSSI and AIRPODS_MANUFACTURER in d.metadata.get("manufacturer_data", {}): + data_hex = hexlify(bytearray(d.metadata["manufacturer_data"][AIRPODS_MANUFACTURER])) + if len(data_hex) == AIRPODS_DATA_LENGTH: return data_hex - return False - - -# Same as get_device() but it's standalone method instead of async -def get_data_hex(): - new_loop = new_event_loop() - set_event_loop(new_loop) - loop = get_event_loop() - a = loop.run_until_complete(get_device()) - loop.close() - return a - + return None -# Getting data from hex string and converting it to dict(json) -# Getting data from hex string and converting it to dict(json) -def get_data(): - raw = get_data_hex() - # Return blank data if airpods not found - if not raw: - return dict(status=0, model="AirPods not found") - - flip: bool = is_flipped(raw) - - # On 7th position we can get AirPods model, gen1, gen2, Pro or Max - if chr(raw[7]) == 'e': - model = "AirPodsPro" - elif chr(raw[7]) == '3': - model = "AirPods3" - elif chr(raw[7]) == 'f': - model = "AirPods2" - elif chr(raw[7]) == '2': - model = "AirPods1" - elif chr(raw[7]) == 'a': - model = "AirPodsMax" - else: - model = "unknown" - - # Checking left AirPod for availability and storing charge in variable - status_tmp = int("" + chr(raw[12 if flip else 13]), 16) - left_status = (100 if status_tmp == 10 else (status_tmp * 10 + 5 if status_tmp <= 10 else -1)) - - # Checking right AirPod for availability and storing charge in variable - status_tmp = int("" + chr(raw[13 if flip else 12]), 16) - right_status = (100 if status_tmp == 10 else (status_tmp * 10 + 5 if status_tmp <= 10 else -1)) - - # Checking AirPods case for availability and storing charge in variable - status_tmp = int("" + chr(raw[15]), 16) - case_status = (100 if status_tmp == 10 else (status_tmp * 10 + 5 if status_tmp <= 10 else -1)) - - # On 14th position we can get charge status of AirPods - charging_status = int("" + chr(raw[14]), 16) - charging_left:bool = (charging_status & (0b00000010 if flip else 0b00000001)) != 0 - charging_right:bool = (charging_status & (0b00000001 if flip else 0b00000010)) != 0 - charging_case:bool = (charging_status & 0b00000100) != 0 - - # Return result info in dict format - return dict( - status=1, - charge=dict( - left=left_status, - right=right_status, - case=case_status - ), - charging_left=charging_left, - charging_right=charging_right, - charging_case=charging_case, - model=model, - date=datetime.now().strftime('%Y-%m-%d %H:%M:%S'), - raw=raw.decode("utf-8") - ) - - -# Return if left and right is flipped in the data def is_flipped(raw): - return (int("" + chr(raw[10]), 16) & 0x02) == 0 - - -def run(): - output_file = argv[-1] - - while True: - data = get_data() + return (int(chr(raw[10]), 16) & 0x02) == 0 - if data["status"] == 1: - json_data = dumps(data) - if len(argv) > 1: - f = open(output_file, "a") - f.write(json_data+"\n") - f.close() - else: - print(json_data) - sleep(UPDATE_DURATION) - - -if __name__ == '__main__': - run() +def parse_data(raw): + if not raw: + return {"status": 0, "model": "AirPods not found"} + + flip = is_flipped(raw) + model_codes = {'e': 'AirPodsPro', '3': 'AirPods3', 'f': 'AirPods2', '2': 'AirPods1', 'a': 'AirPodsMax'} + model = model_codes.get(chr(raw[7]), "unknown") + + def parse_battery(index): + val = int(chr(raw[index]), 16) + return 100 if val == 10 else (val * 10 + 5 if val <= 10 else -1) + + left = parse_battery(12 if flip else 13) + right = parse_battery(13 if flip else 12) + case = parse_battery(15) + + charging = int(chr(raw[14]), 16) + return { + "status": 1, + "charge": {"left": left, "right": right, "case": case}, + "charging_left": bool(charging & (0b00000010 if flip else 0b00000001)), + "charging_right": bool(charging & (0b00000001 if flip else 0b00000010)), + "charging_case": bool(charging & 0b00000100), + "model": model, + "date": datetime.now().strftime('%Y-%m-%d %H:%M:%S'), + "raw": raw.decode() + } + + +@app.websocket("/ws/airpods") +async def websocket_endpoint(websocket: WebSocket): + await websocket.accept() + try: + while True: + raw = await get_device() + data = parse_data(raw) + await websocket.send_text(json.dumps(data)) + await sleep(UPDATE_DURATION) + except WebSocketDisconnect: + print("Cliente desconectado") diff --git a/requirements.txt b/requirements.txt index 739a751..e0cf51d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,3 @@ bleak~=0.13.0 +fastapi +uvicorn