Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 37 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,37 @@
.idea
__pycache__
# Diretórios de ambiente virtual
venv/
.env/
.venv/

# Cache do Python
__pycache__/
*.py[cod]
*$py.class

# Arquivos de log e temporários
*.log
*.tmp

# Configurações de IDE/editor
.vscode/
.idea/

# Arquivos de testes
htmlcov/
.tox/
.coverage
.cache/
.pytest_cache/

# Arquivos de pacotes
*.egg
*.egg-info/
dist/
build/

# Jupyter Notebooks checkpoints (se usar)
.ipynb_checkpoints/

# Arquivos de sistema
.DS_Store
Thumbs.db
56 changes: 15 additions & 41 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,53 +1,27 @@
# **AirStatus for Linux**
#### Check your AirPods battery level on Linux
# AirStatus for Linux

#### What is it?
This is a Python 3.6 script, forked from [faglo/AirStatus](https://github.com/faglo/AirStatus) that allows you to check AirPods battery level from your terminal, as JSON output.
Check your AirPods battery level on Linux — now in real time via WebSocket!

### Usage
## 🔍 What is it?

```
python3 main.py [output_file]
```

Output will be stored in `output_file` if specified.
This is a Python 3+ WebSocket server, forked from [`faglo/AirStatus`](https://github.com/faglo/AirStatus), that scans for your AirPods using Bluetooth Low Energy and streams battery and charging information as JSON in real time through a WebSocket connection.

#### Example output
Originally a terminal script, this version was upgraded to serve clients over the web using FastAPI.

```
{"status": 1, "charge": {"left": 95, "right": 95, "case": -1}, "charging_left": false, "charging_right": false, "charging_case": false, "model": "AirPodsPro", "date": "2021-12-22 11:09:05"}
```
---

### Installing as a service
## 🚀 Features

Create the file `/etc/systemd/system/airstatus.service` (as root) containing:
```
[Unit]
Description=AirPods Battery Monitor
- 🔋 Real-time AirPods battery and charging status
- 📡 WebSocket endpoint for easy integration with web or mobile apps
- 📦 JSON format output
- ⚙️ Configurable update interval

[Service]
ExecStart=/usr/bin/python3 /PATH/TO/AirStatus/main.py /tmp/airstatus.out
Restart=always
RestartSec=3
---

[Install]
WantedBy=default.target
```
## ⚙️ Requirements

Start the service:
```
sudo systemctl start airstatus
```
```bash
pip install -r requirements.txt

Enable service on boot:
```
sudo systemctl enable airstatus
```

#### Can I customize it easily?
**Yes, you can!**

You can change the **update frequency** within the main.py file

#### Used materials
* Some code from [this repo](https://github.com/ohanedan/Airpods-Windows-Service)
178 changes: 60 additions & 118 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
from bleak import discover
from asyncio import new_event_loop, set_event_loop, get_event_loop
from time import sleep, time_ns
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from bleak import BleakScanner
from asyncio import sleep
from binascii import hexlify
from json import dumps
from sys import argv
from datetime import datetime
import json
from time import time_ns

app = FastAPI()

# Configure update duration (update after n seconds)
UPDATE_DURATION = 1
MIN_RSSI = -60
AIRPODS_MANUFACTURER = 76
AIRPODS_DATA_LENGTH = 54
RECENT_BEACONS_MAX_T_NS = 10000000000 # 10 Seconds
RECENT_BEACONS_MAX_T_NS = 10_000_000_000 # 10 segundos
UPDATE_DURATION = 1

recent_beacons = []

Expand All @@ -21,129 +22,70 @@ def get_best_result(device):
"time": time_ns(),
"device": device
})
strongest_beacon = None
strongest = None
i = 0
while i < len(recent_beacons):
if(time_ns() - recent_beacons[i]["time"] > RECENT_BEACONS_MAX_T_NS):
if time_ns() - recent_beacons[i]["time"] > RECENT_BEACONS_MAX_T_NS:
recent_beacons.pop(i)
continue
if (strongest_beacon == None or strongest_beacon.rssi < recent_beacons[i]["device"].rssi):
strongest_beacon = recent_beacons[i]["device"]
if strongest is None or strongest.rssi < recent_beacons[i]["device"].rssi:
strongest = recent_beacons[i]["device"]
i += 1
return device if strongest and strongest.address == device.address else strongest

if (strongest_beacon != None and strongest_beacon.address == device.address):
strongest_beacon = device

return strongest_beacon


# Getting data with hex format
async def get_device():
# Scanning for devices
devices = await discover()
devices = await BleakScanner.discover()
for d in devices:
# Checking for AirPods
d = get_best_result(d)
if d.rssi >= MIN_RSSI and AIRPODS_MANUFACTURER in d.metadata['manufacturer_data']:
data_hex = hexlify(bytearray(d.metadata['manufacturer_data'][AIRPODS_MANUFACTURER]))
data_length = len(hexlify(bytearray(d.metadata['manufacturer_data'][AIRPODS_MANUFACTURER])))
if data_length == AIRPODS_DATA_LENGTH:
if d and d.rssi >= MIN_RSSI and AIRPODS_MANUFACTURER in d.metadata.get("manufacturer_data", {}):
data_hex = hexlify(bytearray(d.metadata["manufacturer_data"][AIRPODS_MANUFACTURER]))
if len(data_hex) == AIRPODS_DATA_LENGTH:
return data_hex
return False


# Same as get_device() but it's standalone method instead of async
def get_data_hex():
new_loop = new_event_loop()
set_event_loop(new_loop)
loop = get_event_loop()
a = loop.run_until_complete(get_device())
loop.close()
return a

return None

# Getting data from hex string and converting it to dict(json)
# Getting data from hex string and converting it to dict(json)
def get_data():
raw = get_data_hex()

# Return blank data if airpods not found
if not raw:
return dict(status=0, model="AirPods not found")

flip: bool = is_flipped(raw)

# On 7th position we can get AirPods model, gen1, gen2, Pro or Max
if chr(raw[7]) == 'e':
model = "AirPodsPro"
elif chr(raw[7]) == '3':
model = "AirPods3"
elif chr(raw[7]) == 'f':
model = "AirPods2"
elif chr(raw[7]) == '2':
model = "AirPods1"
elif chr(raw[7]) == 'a':
model = "AirPodsMax"
else:
model = "unknown"

# Checking left AirPod for availability and storing charge in variable
status_tmp = int("" + chr(raw[12 if flip else 13]), 16)
left_status = (100 if status_tmp == 10 else (status_tmp * 10 + 5 if status_tmp <= 10 else -1))

# Checking right AirPod for availability and storing charge in variable
status_tmp = int("" + chr(raw[13 if flip else 12]), 16)
right_status = (100 if status_tmp == 10 else (status_tmp * 10 + 5 if status_tmp <= 10 else -1))

# Checking AirPods case for availability and storing charge in variable
status_tmp = int("" + chr(raw[15]), 16)
case_status = (100 if status_tmp == 10 else (status_tmp * 10 + 5 if status_tmp <= 10 else -1))

# On 14th position we can get charge status of AirPods
charging_status = int("" + chr(raw[14]), 16)
charging_left:bool = (charging_status & (0b00000010 if flip else 0b00000001)) != 0
charging_right:bool = (charging_status & (0b00000001 if flip else 0b00000010)) != 0
charging_case:bool = (charging_status & 0b00000100) != 0

# Return result info in dict format
return dict(
status=1,
charge=dict(
left=left_status,
right=right_status,
case=case_status
),
charging_left=charging_left,
charging_right=charging_right,
charging_case=charging_case,
model=model,
date=datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
raw=raw.decode("utf-8")
)


# Return if left and right is flipped in the data
def is_flipped(raw):
return (int("" + chr(raw[10]), 16) & 0x02) == 0


def run():
output_file = argv[-1]

while True:
data = get_data()
return (int(chr(raw[10]), 16) & 0x02) == 0

if data["status"] == 1:
json_data = dumps(data)
if len(argv) > 1:
f = open(output_file, "a")
f.write(json_data+"\n")
f.close()
else:
print(json_data)

sleep(UPDATE_DURATION)


if __name__ == '__main__':
run()
def parse_data(raw):
if not raw:
return {"status": 0, "model": "AirPods not found"}

flip = is_flipped(raw)
model_codes = {'e': 'AirPodsPro', '3': 'AirPods3', 'f': 'AirPods2', '2': 'AirPods1', 'a': 'AirPodsMax'}
model = model_codes.get(chr(raw[7]), "unknown")

def parse_battery(index):
val = int(chr(raw[index]), 16)
return 100 if val == 10 else (val * 10 + 5 if val <= 10 else -1)

left = parse_battery(12 if flip else 13)
right = parse_battery(13 if flip else 12)
case = parse_battery(15)

charging = int(chr(raw[14]), 16)
return {
"status": 1,
"charge": {"left": left, "right": right, "case": case},
"charging_left": bool(charging & (0b00000010 if flip else 0b00000001)),
"charging_right": bool(charging & (0b00000001 if flip else 0b00000010)),
"charging_case": bool(charging & 0b00000100),
"model": model,
"date": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
"raw": raw.decode()
}


@app.websocket("/ws/airpods")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
raw = await get_device()
data = parse_data(raw)
await websocket.send_text(json.dumps(data))
await sleep(UPDATE_DURATION)
except WebSocketDisconnect:
print("Cliente desconectado")
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
bleak~=0.13.0
fastapi
uvicorn