diff --git a/README.md b/README.md index 9f2a54f..71c281d 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Wokwi Python Client 🚀 -Typed, asyncio-friendly Python SDK for the **Wokwi Simulation API** +Typed Python SDK for the **Wokwi Simulation API**, with async and sync APIs [![PyPI version](https://img.shields.io/pypi/v/wokwi-client?logo=pypi)](https://pypi.org/project/wokwi-client/) [![Python versions](https://img.shields.io/pypi/pyversions/wokwi-client)](https://pypi.org/project/wokwi-client/) @@ -32,7 +32,9 @@ The basic example is in the [examples/hello_esp32/main.py](examples/hello_esp32/ - Connect to the Wokwi Simulator - Upload a diagram and firmware files - Start a simulation -- Monitor the serial output +- Monitor the serial output and write to them +- Read GPIO pins +- Control peripherals (buttons, MPU6050, etc.) You can run the example with: diff --git a/docs/index.md b/docs/index.md index 953341f..934d73f 100644 --- a/docs/index.md +++ b/docs/index.md @@ -1,14 +1,16 @@ # Wokwi Python Client Library -Typed, asyncio-friendly Python SDK for the **Wokwi Simulation API**. +Typed Python SDK for the **Wokwi Simulation API**, with async and sync APIs. ## Features - Connect to the Wokwi Simulator from Python - Upload diagrams and firmware files - Start, pause, resume, and restart simulations -- Monitor serial output asynchronously +- Monitor serial output asynchronously and write to them +- Control peripherals and read GPIO pins - Fully type-annotated and easy to use with asyncio +- Async and sync APIs ## Installation @@ -24,37 +26,74 @@ Get your API token from [https://wokwi.com/dashboard/ci](https://wokwi.com/dashb ## Quickstart Example -```python -import asyncio -import os -from wokwi_client import WokwiClient, GET_TOKEN_URL - - -async def main(): - token = os.getenv("WOKWI_CLI_TOKEN") - if not token: - raise SystemExit( - f"Set WOKWI_CLI_TOKEN in your environment. You can get it from {GET_TOKEN_URL}." - ) - - client = WokwiClient(token) - await client.connect() - await client.upload_file("diagram.json") - await client.upload_file("firmware.bin") - await client.start_simulation(firmware="firmware.bin") - serial_task = asyncio.create_task( - client.serial_monitor_cat() - ) # Stream serial output - await client.wait_until_simulation_time(10) # Run simulation for 10 seconds - serial_task.cancel() - await client.disconnect() - - -if __name__ == "__main__": - asyncio.run(main()) -``` +=== "Async (recommended)" + + ```python + import asyncio + import os + from wokwi_client import WokwiClient, GET_TOKEN_URL + + + async def main(): + token = os.getenv("WOKWI_CLI_TOKEN") + if not token: + raise SystemExit( + f"Set WOKWI_CLI_TOKEN in your environment. You can get it from {GET_TOKEN_URL}." + ) + + client = WokwiClient(token) + await client.connect() + await client.upload_file("diagram.json") + await client.upload_file("firmware.bin") + await client.start_simulation(firmware="firmware.bin") + + serial_task = asyncio.create_task( + client.serial_monitor_cat() + ) # Stream serial output + await client.wait_until_simulation_time(10) # Run simulation for 10 seconds + serial_task.cancel() + await client.disconnect() + + + if __name__ == "__main__": + asyncio.run(main()) + ``` + +=== "Sync" + + ```python + import os + from wokwi_client import WokwiClientSync, GET_TOKEN_URL + + + def main(): + token = os.getenv("WOKWI_CLI_TOKEN") + if not token: + raise SystemExit( + f"Set WOKWI_CLI_TOKEN in your environment. You can get it from {GET_TOKEN_URL}." + ) + + client = WokwiClientSync(token) + client.connect() + client.upload_file("diagram.json") + client.upload_file("firmware.bin") + client.start_simulation(firmware="firmware.bin") + + # Stream serial output concurrently for 10 seconds + client.monitor_serial(lambda line: print(line.decode("utf-8"), end="", flush=True)) + client.wait_until_simulation_time(10) + client.disconnect() + + + if __name__ == "__main__": + main() + ``` + +## Examples -See the [examples/hello_esp32/main.py](https://github.com/wokwi/wokwi-python-client/blob/main/examples/hello_esp32/main.py) for a full example including serial monitoring, and [examples/micropython_esp32/main.py](https://github.com/wokwi/wokwi-python-client/blob/main/examples/micropython_esp32/main.py) for an example of running MicroPython on a simulated ESP32 board. +- [examples/hello_esp32/main.py](https://github.com/wokwi/wokwi-python-client/blob/main/examples/hello_esp32/main.py) full async example including serial monitoring +- [examples/hello_esp32_sync/main.py](https://github.com/wokwi/wokwi-python-client/blob/main/examples/hello_esp32_sync/main.py) sync wrapper usage +- [examples/micropython_esp32/main.py](https://github.com/wokwi/wokwi-python-client/blob/main/examples/micropython_esp32/main.py) example of running MicroPython on a simulated ESP32 board. ## API Reference diff --git a/examples/hello_esp32_sync/.gitignore b/examples/hello_esp32_sync/.gitignore new file mode 100644 index 0000000..359127f --- /dev/null +++ b/examples/hello_esp32_sync/.gitignore @@ -0,0 +1,3 @@ +# Ignore the firmware files, as they are downloaded from the internet +hello_world.bin +hello_world.elf diff --git a/examples/hello_esp32_sync/__init__.py b/examples/hello_esp32_sync/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/hello_esp32_sync/diagram.json b/examples/hello_esp32_sync/diagram.json new file mode 100644 index 0000000..6b9b1dd --- /dev/null +++ b/examples/hello_esp32_sync/diagram.json @@ -0,0 +1,22 @@ +{ + "version": 1, + "author": "Uri Shaked", + "editor": "wokwi", + "parts": [ + { + "type": "wokwi-esp32-devkit-v1", + "id": "esp", + "top": 0, + "left": 0, + "attrs": { "fullBoot": "1" } + } + ], + "connections": [ + ["esp:TX0", "$serialMonitor:RX", "", []], + ["esp:RX0", "$serialMonitor:TX", "", []] + ], + "serialMonitor": { + "display": "terminal" + }, + "dependencies": {} +} diff --git a/examples/hello_esp32_sync/main.py b/examples/hello_esp32_sync/main.py new file mode 100644 index 0000000..e1e009c --- /dev/null +++ b/examples/hello_esp32_sync/main.py @@ -0,0 +1,63 @@ +# SPDX-License-Identifier: MIT +# Copyright (C) 2025, CodeMagic LTD + +import os +from pathlib import Path + +import requests + +from wokwi_client import GET_TOKEN_URL, WokwiClientSync + +EXAMPLE_DIR = Path(__file__).parent +HELLO_WORLD_URL = "https://github.com/wokwi/esp-idf-hello-world/raw/refs/heads/main/bin" +FIRMWARE_FILES = { + "hello_world.bin": f"{HELLO_WORLD_URL}/hello_world.bin", + "hello_world.elf": f"{HELLO_WORLD_URL}/hello_world.elf", +} +SLEEP_TIME = int(os.getenv("WOKWI_SLEEP_TIME", "10")) + + +def main() -> None: + token = os.getenv("WOKWI_CLI_TOKEN") + if not token: + raise SystemExit( + f"Set WOKWI_CLI_TOKEN in your environment. You can get it from {GET_TOKEN_URL}." + ) + + for filename, url in FIRMWARE_FILES.items(): + if (EXAMPLE_DIR / filename).exists(): + continue + print(f"Downloading {filename} from {url}") + response = requests.get(url) + response.raise_for_status() + with open(EXAMPLE_DIR / filename, "wb") as f: + f.write(response.content) + + client = WokwiClientSync(token) + print(f"Wokwi client library version: {client.version}") + + hello = client.connect() + print("Connected to Wokwi Simulator, server version:", hello["version"]) + + # Upload the diagram and firmware files + client.upload_file("diagram.json", EXAMPLE_DIR / "diagram.json") + client.upload_file("hello_world.bin", EXAMPLE_DIR / "hello_world.bin") + client.upload_file("hello_world.elf", EXAMPLE_DIR / "hello_world.elf") + + # Start the simulation + client.start_simulation( + firmware="hello_world.bin", + elf="hello_world.elf", + ) + + # Stream serial output for a few seconds (non-blocking) + client.monitor_serial(lambda line: print(line.decode("utf-8"), end="", flush=True)) + print(f"Simulation started, waiting for {SLEEP_TIME} seconds…") + client.wait_until_simulation_time(SLEEP_TIME) + + # Disconnect from the simulator + client.disconnect() + + +if __name__ == "__main__": + main() diff --git a/mkdocs.yml b/mkdocs.yml index 8bbdf5d..303ad77 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -42,6 +42,8 @@ markdown_extensions: - pymdownx.inlinehilite - pymdownx.superfences - pymdownx.tabbed + - pymdownx.tabbed: + alternate_style: true - toc: permalink: "¶" diff --git a/src/wokwi_client/__init__.py b/src/wokwi_client/__init__.py index 7fbe634..0b91573 100644 --- a/src/wokwi_client/__init__.py +++ b/src/wokwi_client/__init__.py @@ -12,7 +12,8 @@ from .__version__ import get_version from .client import WokwiClient +from .client_sync import WokwiClientSync from .constants import GET_TOKEN_URL __version__ = get_version() -__all__ = ["WokwiClient", "__version__", "GET_TOKEN_URL"] +__all__ = ["WokwiClient", "WokwiClientSync", "__version__", "GET_TOKEN_URL"] diff --git a/src/wokwi_client/client.py b/src/wokwi_client/client.py index 6253b34..c0cde89 100644 --- a/src/wokwi_client/client.py +++ b/src/wokwi_client/client.py @@ -2,15 +2,25 @@ # # SPDX-License-Identifier: MIT +import typing from pathlib import Path from typing import Any, Optional +from wokwi_client.framebuffer import ( + compare_framebuffer_png, + framebuffer_png_bytes, + framebuffer_read, + save_framebuffer_png, +) + from .__version__ import get_version from .constants import DEFAULT_WS_URL +from .control import set_control from .event_queue import EventQueue -from .file_ops import upload, upload_file +from .file_ops import download, download_file, upload, upload_file +from .pins import gpio_list, pin_listen, pin_read from .protocol_types import EventMessage, ResponseMessage -from .serial import monitor_lines +from .serial import monitor_lines, write_serial from .simulation import pause, restart, resume, start from .transport import Transport @@ -39,7 +49,8 @@ def __init__(self, token: str, server: Optional[str] = None): self._transport = Transport(token, server or DEFAULT_WS_URL) self.last_pause_nanos = 0 self._transport.add_event_listener("sim:pause", self._on_pause) - self._pause_queue = EventQueue(self._transport, "sim:pause") + # Lazily create the pause queue inside the running event loop + self._pause_queue: Optional[EventQueue] = None async def connect(self) -> dict[str, Any]: """ @@ -84,6 +95,28 @@ async def upload_file( """ return await upload_file(self._transport, filename, local_path) + async def download(self, name: str) -> ResponseMessage: + """ + Download a file from the simulator. + + Args: + name: The name of the file to download. + + Returns: + The response message from the server. + """ + return await download(self._transport, name) + + async def download_file(self, name: str, local_path: Optional[Path] = None) -> None: + """ + Download a file from the simulator and save it to a local path. + + Args: + name: The name of the file to download. + local_path: The local path to save the downloaded file. If not provided, uses the name as the path. + """ + await download_file(self._transport, name, local_path) + async def start_simulation( self, firmware: str, @@ -156,6 +189,8 @@ async def wait_until_simulation_time(self, seconds: float) -> None: await pause(self._transport) remaining_nanos = seconds * 1e9 - self.last_pause_nanos if remaining_nanos > 0: + if self._pause_queue is None: + self._pause_queue = EventQueue(self._transport, "sim:pause") self._pause_queue.flush() await resume(self._transport, int(remaining_nanos)) await self._pause_queue.get() @@ -191,5 +226,67 @@ async def serial_monitor_cat(self, decode_utf8: bool = True, errors: str = "repl else: print(line, end="", flush=True) + async def serial_write(self, data: typing.Union[bytes, str, list[int]]) -> None: + """Write data to the simulation serial monitor interface.""" + await write_serial(self._transport, data) + def _on_pause(self, event: EventMessage) -> None: self.last_pause_nanos = int(event["nanos"]) + + async def read_pin(self, part: str, pin: str) -> ResponseMessage: + """Read the current state of a pin. + + Args: + part: The part id (e.g. "uno"). + pin: The pin name (e.g. "A2"). + """ + return await pin_read(self._transport, part=part, pin=pin) + + async def listen_pin(self, part: str, pin: str, listen: bool = True) -> ResponseMessage: + """Start or stop listening for changes on a pin. + + When enabled, "pin:change" events will be delivered via the transport's + event mechanism. + + Args: + part: The part id. + pin: The pin name. + listen: True to start listening, False to stop. + """ + return await pin_listen(self._transport, part=part, pin=pin, listen=listen) + + async def gpio_list(self) -> ResponseMessage: + """Get a list of all GPIO pins available in the simulation.""" + return await gpio_list(self._transport) + + async def set_control( + self, part: str, control: str, value: typing.Union[int, bool, float] + ) -> ResponseMessage: + """Set a control value (e.g. simulate button press). + + Args: + part: Part id (e.g. "btn1"). + control: Control name (e.g. "pressed"). + value: Control value to set (float). + """ + return await set_control(self._transport, part=part, control=control, value=value) + + async def framebuffer_read(self, id: str) -> ResponseMessage: + """Read the current framebuffer for the given device id.""" + return await framebuffer_read(self._transport, id=id) + + async def framebuffer_png_bytes(self, id: str) -> bytes: + """Return the current framebuffer as PNG bytes.""" + return await framebuffer_png_bytes(self._transport, id=id) + + async def save_framebuffer_png(self, id: str, path: Path, overwrite: bool = True) -> Path: + """Save the current framebuffer as a PNG file.""" + return await save_framebuffer_png(self._transport, id=id, path=path, overwrite=overwrite) + + async def compare_framebuffer_png( + self, id: str, reference: Path, save_mismatch: Optional[Path] = None + ) -> bool: + """Compare the current framebuffer with a reference PNG file.""" + return await compare_framebuffer_png( + self._transport, id=id, reference=reference, save_mismatch=save_mismatch + ) diff --git a/src/wokwi_client/client_sync.py b/src/wokwi_client/client_sync.py new file mode 100644 index 0000000..7b10277 --- /dev/null +++ b/src/wokwi_client/client_sync.py @@ -0,0 +1,249 @@ +# SPDX-FileCopyrightText: 2025-present CodeMagic LTD +# +# SPDX-License-Identifier: MIT + +from __future__ import annotations + +import asyncio +import logging +import threading +import typing as t +from concurrent.futures import Future +from pathlib import Path + +from wokwi_client import WokwiClient +from wokwi_client.serial import monitor_lines as monitor_serial_lines + +if t.TYPE_CHECKING: + pass + + +class WokwiClientSync: + """Synchronous wrapper around the async WokwiClient.""" + + token: str + server: str | None + _loop: asyncio.AbstractEventLoop | None + _loop_thread: threading.Thread | None + _client: WokwiClient | None + _monitor_task: Future[t.Any] | None + _connected: bool + + def __init__(self, token: str, server: str | None = None) -> None: + self.token = token + self.server = server + self._loop = None + self._loop_thread = None + self._client = None + self._monitor_task = None + self._connected = False + + def _ensure_loop(self) -> None: + if self._loop is None: + self._loop = asyncio.new_event_loop() + self._loop_thread = threading.Thread(target=self._loop.run_forever, daemon=True) + self._loop_thread.start() + + def _run_async(self, coro: t.Coroutine[t.Any, t.Any, t.Any], timeout: float = 30) -> t.Any: + self._ensure_loop() + assert self._loop is not None + future = asyncio.run_coroutine_threadsafe(coro, self._loop) + return future.result(timeout=timeout) + + def connect(self) -> dict[str, t.Any]: + if not self._connected: + self._client = WokwiClient(self.token, self.server) + result: dict[str, t.Any] = t.cast( + dict[str, t.Any], self._run_async(self._client.connect()) + ) + self._connected = True + return result + return {} + + def disconnect(self) -> None: + if self._connected and self._client: + try: + if self._monitor_task: + self._monitor_task.cancel() + self._run_async(self._client.disconnect(), timeout=5) + except Exception as e: + logging.debug(f"Error during disconnect: {e}") + finally: + self._connected = False + self._client = None + if self._loop and self._loop_thread: + try: + self._loop.call_soon_threadsafe(self._loop.stop) + self._loop_thread.join(timeout=2) + except Exception as e: + logging.debug(f"Error stopping event loop: {e}") + finally: + self._loop = None + self._loop_thread = None + + def upload(self, name: str, content: bytes) -> t.Any: + if not self._connected: + raise RuntimeError("Client not connected") + assert self._client is not None + return self._run_async(self._client.upload(name, content)) + + def upload_file(self, filename: str, local_path: Path | None = None) -> t.Any: + if not self._connected: + raise RuntimeError("Client not connected") + assert self._client is not None + return self._run_async(self._client.upload_file(filename, local_path)) + + def download(self, name: str) -> t.Any: + if not self._connected: + raise RuntimeError("Client not connected") + assert self._client is not None + return self._run_async(self._client.download(name)) + + def download_file(self, name: str, local_path: Path | None = None) -> t.Any: + if not self._connected: + raise RuntimeError("Client not connected") + assert self._client is not None + return self._run_async(self._client.download_file(name, local_path)) + + def start_simulation( + self, + firmware: str, + elf: str | None = None, + pause: bool = False, + chips: list[str] | None = None, + ) -> t.Any: + if not self._connected: + raise RuntimeError("Client not connected") + assert self._client is not None + return self._run_async(self._client.start_simulation(firmware, elf, pause, chips or [])) + + def pause_simulation(self) -> t.Any: + if not self._connected: + raise RuntimeError("Client not connected") + assert self._client is not None + return self._run_async(self._client.pause_simulation()) + + def resume_simulation(self, pause_after: int | None = None) -> t.Any: + if not self._connected: + raise RuntimeError("Client not connected") + assert self._client is not None + return self._run_async(self._client.resume_simulation(pause_after)) + + def wait_until_simulation_time(self, seconds: float) -> t.Any: + if not self._connected: + raise RuntimeError("Client not connected") + assert self._client is not None + return self._run_async(self._client.wait_until_simulation_time(seconds)) + + def restart_simulation(self, pause: bool = False) -> t.Any: + if not self._connected: + raise RuntimeError("Client not connected") + assert self._client is not None + return self._run_async(self._client.restart_simulation(pause)) + + def serial_monitor_cat(self, decode_utf8: bool = True, errors: str = "replace") -> t.Any: + if not self._connected: + raise RuntimeError("Client not connected") + assert self._client is not None + return self._run_async( + self._client.serial_monitor_cat(decode_utf8=decode_utf8, errors=errors) + ) + + def write_serial(self, data: bytes | str | list[int]) -> t.Any: + if not self._connected: + raise RuntimeError("Client not connected") + assert self._client is not None + return self._run_async(self._client.serial_write(data)) + + def read_pin(self, part: str, pin: str) -> t.Any: + if not self._connected: + raise RuntimeError("Client not connected") + assert self._client is not None + return self._run_async(self._client.read_pin(part, pin)) + + def listen_pin(self, part: str, pin: str, listen: bool = True) -> t.Any: + if not self._connected: + raise RuntimeError("Client not connected") + assert self._client is not None + return self._run_async(self._client.listen_pin(part, pin, listen)) + + def gpio_list(self) -> t.Any: + if not self._connected: + raise RuntimeError("Client not connected") + assert self._client is not None + return self._run_async(self._client.gpio_list()) + + def monitor_serial(self, callback: t.Callable[[bytes], None]) -> None: + if not self._connected: + raise RuntimeError("Client not connected") + + async def _monitor() -> None: + try: + assert self._client is not None + async for line in monitor_serial_lines(self._client._transport): + if not self._connected: + break + try: + callback(line) + except Exception as e: + logging.error(f"Error in serial monitor callback: {e}") + break + except Exception as e: + logging.error(f"Error in serial monitor: {e}") + + assert self._loop is not None + self._monitor_task = asyncio.run_coroutine_threadsafe(_monitor(), self._loop) + + def set_control(self, part: str, control: str, value: int | bool | float) -> t.Any: + if not self._connected: + raise RuntimeError("Client not connected") + assert self._client is not None + return self._run_async(self._client.set_control(part, control, value)) + + def framebuffer_read(self, id: str) -> t.Any: + """Read the current framebuffer for the given device id.""" + if not self._connected: + raise RuntimeError("Client not connected") + assert self._client is not None + return self._run_async(self._client.framebuffer_read(id)) + + def framebuffer_png_bytes(self, id: str) -> bytes: + """Return the current framebuffer as PNG bytes.""" + if not self._connected: + raise RuntimeError("Client not connected") + assert self._client is not None + return t.cast(bytes, self._run_async(self._client.framebuffer_png_bytes(id))) + + def save_framebuffer_png(self, id: str, path: Path, overwrite: bool = True) -> Path: + """Save the current framebuffer as a PNG file.""" + if not self._connected: + raise RuntimeError("Client not connected") + assert self._client is not None + return t.cast( + Path, self._run_async(self._client.save_framebuffer_png(id, path, overwrite=overwrite)) + ) + + def compare_framebuffer_png( + self, id: str, reference: Path, save_mismatch: Path | None = None + ) -> bool: + """Compare the current framebuffer with a reference PNG file.""" + if not self._connected: + raise RuntimeError("Client not connected") + assert self._client is not None + return t.cast( + bool, + self._run_async(self._client.compare_framebuffer_png(id, reference, save_mismatch)), + ) + + @property + def version(self) -> str: + if self._client: + return self._client.version + client = WokwiClient(self.token, self.server) + return client.version + + @property + def last_pause_nanos(self) -> int: + if self._client: + return self._client.last_pause_nanos + return 0 diff --git a/src/wokwi_client/control.py b/src/wokwi_client/control.py new file mode 100644 index 0000000..93e6dbd --- /dev/null +++ b/src/wokwi_client/control.py @@ -0,0 +1,33 @@ +"""Control command helpers for virtual parts. + +Provides `set_control` to manipulate part controls (e.g. press a button). + +Assumptions: +* Underlying websocket command name: "control:set". +* Parameter names expected by server: part, control, value. +""" + +# SPDX-FileCopyrightText: 2025-present CodeMagic LTD +# +# SPDX-License-Identifier: MIT + +import typing + +from .protocol_types import ResponseMessage +from .transport import Transport + + +async def set_control( + transport: Transport, *, part: str, control: str, value: typing.Union[int, bool, float] +) -> ResponseMessage: + """Set a control value on a part (e.g. simulate button press/release). + + Args: + transport: Active Transport. + part: Part identifier (e.g. "btn1"). + control: Control name (e.g. "pressed"). + value: Control value to set (float). + """ + return await transport.request( + "control:set", {"part": part, "control": control, "value": float(value)} + ) diff --git a/src/wokwi_client/event_queue.py b/src/wokwi_client/event_queue.py index c4c17c9..c49dede 100644 --- a/src/wokwi_client/event_queue.py +++ b/src/wokwi_client/event_queue.py @@ -23,12 +23,15 @@ class EventQueue: """A queue for events from a specific event type.""" def __init__(self, transport: Transport, event_type: str) -> None: + # Bind the queue to the current running loop + self._loop = asyncio.get_running_loop() self._queue: asyncio.Queue[EventMessage] = asyncio.Queue() self._transport = transport self._event_type = event_type def listener(event: EventMessage) -> None: - self._queue.put_nowait(event) + # Ensure put happens on the queue's loop (safe across threads/loops) + self._loop.call_soon_threadsafe(self._queue.put_nowait, event) self._listener = listener self._transport.add_event_listener(self._event_type, self._listener) diff --git a/src/wokwi_client/file_ops.py b/src/wokwi_client/file_ops.py index 032bfd9..0907509 100644 --- a/src/wokwi_client/file_ops.py +++ b/src/wokwi_client/file_ops.py @@ -22,3 +22,16 @@ async def upload_file( async def upload(transport: Transport, name: str, content: bytes) -> ResponseMessage: params = UploadParams(name=name, binary=base64.b64encode(content).decode()) return await transport.request("file:upload", params.model_dump()) + + +async def download(transport: Transport, name: str) -> ResponseMessage: + return await transport.request("file:download", {"name": name}) + + +async def download_file(transport: Transport, name: str, local_path: Optional[Path] = None) -> None: + if local_path is None: + local_path = Path(name) + + result = await download(transport, name) + with open(local_path, "wb") as f: + f.write(base64.b64decode(result["result"]["binary"])) diff --git a/src/wokwi_client/framebuffer.py b/src/wokwi_client/framebuffer.py new file mode 100644 index 0000000..ef3ae53 --- /dev/null +++ b/src/wokwi_client/framebuffer.py @@ -0,0 +1,93 @@ +"""Framebuffer command helpers. + +Provides utilities to interact with devices exposing a framebuffer (e.g. LCD +modules) via the `framebuffer:read` command. + +Exposed helpers: +* framebuffer_read -> raw response (contains base64 PNG at result.png) +* framebuffer_png_bytes -> decoded PNG bytes +* save_framebuffer_png -> save PNG to disk +* compare_framebuffer_png -> compare current framebuffer against reference +""" + +# SPDX-FileCopyrightText: 2025-present CodeMagic LTD +# +# SPDX-License-Identifier: MIT + +from __future__ import annotations + +import base64 +from pathlib import Path + +from .exceptions import WokwiError +from .protocol_types import ResponseMessage +from .transport import Transport + +__all__ = [ + "framebuffer_read", + "framebuffer_png_bytes", + "save_framebuffer_png", + "compare_framebuffer_png", +] + + +async def framebuffer_read(transport: Transport, *, id: str) -> ResponseMessage: + """Issue `framebuffer:read` for the given device id and return raw response.""" + return await transport.request("framebuffer:read", {"id": id}) + + +def _extract_png_b64(resp: ResponseMessage) -> str: + result = resp.get("result", {}) + png_b64 = result.get("png") + if not isinstance(png_b64, str): # pragma: no cover - defensive + raise WokwiError("Malformed framebuffer:read response: missing 'png' base64 string") + return png_b64 + + +async def framebuffer_png_bytes(transport: Transport, *, id: str) -> bytes: + """Return decoded PNG bytes for the framebuffer of device `id`.""" + resp = await framebuffer_read(transport, id=id) + return base64.b64decode(_extract_png_b64(resp)) + + +async def save_framebuffer_png( + transport: Transport, *, id: str, path: Path, overwrite: bool = True +) -> Path: + """Save the framebuffer PNG to `path` and return the path. + + Args: + transport: Active transport. + id: Device id (e.g. "lcd1"). + path: Destination file path. + overwrite: Overwrite existing file (default True). If False and file + exists, raises WokwiError. + """ + if path.exists() and not overwrite: + raise WokwiError(f"File already exists and overwrite=False: {path}") + data = await framebuffer_png_bytes(transport, id=id) + path.parent.mkdir(parents=True, exist_ok=True) + with open(path, "wb") as f: + f.write(data) + return path + + +async def compare_framebuffer_png( + transport: Transport, *, id: str, reference: Path, save_mismatch: Path | None = None +) -> bool: + """Compare the current framebuffer PNG with a reference file. + + Performs a byte-for-byte comparison. If different and `save_mismatch` is + provided, writes the current framebuffer PNG there. + + Returns True if identical, False otherwise. + """ + if not reference.exists(): + raise WokwiError(f"Reference image does not exist: {reference}") + current = await framebuffer_png_bytes(transport, id=id) + ref_bytes = reference.read_bytes() + if current == ref_bytes: + return True + if save_mismatch: + save_mismatch.parent.mkdir(parents=True, exist_ok=True) + save_mismatch.write_bytes(current) + return False diff --git a/src/wokwi_client/models.py b/src/wokwi_client/models.py index e085a31..58a006f 100644 --- a/src/wokwi_client/models.py +++ b/src/wokwi_client/models.py @@ -10,6 +10,10 @@ class UploadParams(BaseModel): binary: str # base64 +class DownloadParams(BaseModel): + binary: str # base64 + + class SimulationParams(BaseModel): firmware: str elf: str diff --git a/src/wokwi_client/pins.py b/src/wokwi_client/pins.py new file mode 100644 index 0000000..87d96a9 --- /dev/null +++ b/src/wokwi_client/pins.py @@ -0,0 +1,55 @@ +"""Pin command helpers for the Wokwi Simulation API. + +This module exposes helper coroutines for issuing pin-related commands: + +* pin:read - Read the current state of a pin. +* pin:listen - Start/stop listening for changes on a pin (emits pin:change + events). +""" + +# SPDX-FileCopyrightText: 2025-present CodeMagic LTD +# +# SPDX-License-Identifier: MIT + +from .protocol_types import ResponseMessage +from .transport import Transport + + +async def pin_read(transport: Transport, *, part: str, pin: str) -> ResponseMessage: + """Read the state of a pin. + + Args: + transport: The active Transport instance. + part: Part identifier (e.g. "uno"). + pin: Pin name (e.g. "A2", "13"). + """ + + return await transport.request("pin:read", {"part": part, "pin": pin}) + + +async def pin_listen( + transport: Transport, *, part: str, pin: str, listen: bool = True +) -> ResponseMessage: + """Enable or disable listening for changes on a pin. + + When listening is enabled, "pin:change" events will be emitted with the + pin state. + + Args: + transport: The active Transport instance. + part: Part identifier. + pin: Pin name. + listen: True to start listening, False to stop. + """ + + return await transport.request("pin:listen", {"part": part, "pin": pin, "listen": listen}) + + +async def gpio_list(transport: Transport) -> ResponseMessage: + """List all GPIO pins and their current states. + + Args: + transport: The active Transport instance. + """ + + return await transport.request("gpio:list", {}) diff --git a/src/wokwi_client/serial.py b/src/wokwi_client/serial.py index fbaea4c..7d7ad4f 100644 --- a/src/wokwi_client/serial.py +++ b/src/wokwi_client/serial.py @@ -2,7 +2,8 @@ # # SPDX-License-Identifier: MIT -from collections.abc import AsyncGenerator +import typing +from collections.abc import AsyncGenerator, Iterable from .event_queue import EventQueue from .transport import Transport @@ -14,3 +15,17 @@ async def monitor_lines(transport: Transport) -> AsyncGenerator[bytes, None]: while True: event_msg = await queue.get() yield bytes(event_msg["payload"]["bytes"]) + + +async def write_serial(transport: Transport, data: typing.Union[bytes, str, Iterable[int]]) -> None: + """Write data to the serial monitor. + + Accepts bytes, str (encoded as utf-8), or an iterable of integer byte values. + """ + if isinstance(data, str): + payload = list(data.encode("utf-8")) + elif isinstance(data, bytes): + payload = list(data) + else: + payload = list(int(b) & 0xFF for b in data) + await transport.request("serial-monitor:write", {"bytes": payload}) diff --git a/tests/test_hello_esp32.py b/tests/test_hello_esp32.py index 348233d..21f0e31 100644 --- a/tests/test_hello_esp32.py +++ b/tests/test_hello_esp32.py @@ -2,25 +2,18 @@ # # SPDX-License-Identifier: MIT -import os -import subprocess -import sys +from .utils import run_example_module def test_hello_esp32_example() -> None: - """`python -m examples.hello_esp32.main` runs the hello_esp32 example and exits with 0.""" - - assert os.environ.get("WOKWI_CLI_TOKEN") is not None, ( - "WOKWI_CLI_TOKEN environment variable is not set. You can get it from https://wokwi.com/dashboard/ci." - ) + """Async hello_esp32 example should run and exit with 0.""" + result = run_example_module("examples.hello_esp32.main") + assert result.returncode == 0 + assert "main_task: Calling app_main()" in result.stdout - result = subprocess.run( - [sys.executable, "-m", "examples.hello_esp32.main"], - check=False, - capture_output=True, - text=True, - env={**os.environ, "WOKWI_SLEEP_TIME": "1"}, - ) +def test_hello_esp32_sync_example() -> None: + """Sync hello_esp32 example should run and exit with 0.""" + result = run_example_module("examples.hello_esp32_sync.main") assert result.returncode == 0 assert "main_task: Calling app_main()" in result.stdout diff --git a/tests/test_micropython_esp32.py b/tests/test_micropython_esp32.py new file mode 100644 index 0000000..e7be202 --- /dev/null +++ b/tests/test_micropython_esp32.py @@ -0,0 +1,14 @@ +# SPDX-FileCopyrightText: 2025-present CodeMagic LTD +# +# SPDX-License-Identifier: MIT + +from .utils import run_example_module + + +def test_micropython_esp32_example() -> None: + """MicroPython ESP32 example should run and print MicroPython banner.""" + # MicroPython boot can take a bit longer; give it a few seconds + result = run_example_module("examples.micropython_esp32.main", sleep_time="3") + assert result.returncode == 0 + # Expect a line from the injected MicroPython script + assert "Hello, MicroPython! I'm running on a Wokwi ESP32 simulator." in result.stdout diff --git a/tests/utils.py b/tests/utils.py new file mode 100644 index 0000000..1bdaabe --- /dev/null +++ b/tests/utils.py @@ -0,0 +1,40 @@ +""" +Test utilities for running example modules. + +Provides a helper to execute `python -m ` with a short sleep to keep +CI fast and shared environment handling (WOKWI_CLI_TOKEN, etc.). +""" + +from __future__ import annotations + +import os +import subprocess +import sys +from collections.abc import Mapping +from subprocess import CompletedProcess + + +def run_example_module( + module: str, *, sleep_time: str = "1", extra_env: Mapping[str, str] | None = None +) -> CompletedProcess[str]: + """Run an example module with a short simulation time. + + Requires WOKWI_CLI_TOKEN to be set in the environment. + Returns the CompletedProcess so tests can assert on return code and output. + """ + + assert os.environ.get("WOKWI_CLI_TOKEN") is not None, ( + "WOKWI_CLI_TOKEN environment variable is not set. You can get it from https://wokwi.com/dashboard/ci." + ) + + env = {**os.environ, "WOKWI_SLEEP_TIME": sleep_time} + if extra_env: + env.update(extra_env) + + return subprocess.run( + [sys.executable, "-m", module], + check=False, + capture_output=True, + text=True, + env=env, + )