diff --git a/README.md b/README.md index b3be48e..999efaa 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,9 @@ The basic example is in the [examples/hello_esp32/main.py](examples/hello_esp32/ - Connect to the Wokwi Simulator - Upload a diagram and firmware files - Start a simulation -- Monitor the serial output +- Monitor the serial output and write to them +- Read GPIO pins +- Control peripherals (buttons, MPU6050, etc.) You can run the example with: diff --git a/docs/index.md b/docs/index.md index 953341f..5f684c6 100644 --- a/docs/index.md +++ b/docs/index.md @@ -7,7 +7,8 @@ Typed, asyncio-friendly Python SDK for the **Wokwi Simulation API**. - Connect to the Wokwi Simulator from Python - Upload diagrams and firmware files - Start, pause, resume, and restart simulations -- Monitor serial output asynchronously +- Monitor serial output asynchronously and write to them +- Control peripherals and read GPIO pins - Fully type-annotated and easy to use with asyncio ## Installation diff --git a/src/wokwi_client/client.py b/src/wokwi_client/client.py index 01354ae..9858c40 100644 --- a/src/wokwi_client/client.py +++ b/src/wokwi_client/client.py @@ -7,10 +7,12 @@ from .__version__ import get_version from .constants import DEFAULT_WS_URL +from .control import set_control from .event_queue import EventQueue from .file_ops import upload, upload_file +from .pins import pin_listen, pin_read from .protocol_types import EventMessage, ResponseMessage -from .serial import monitor_lines +from .serial import monitor_lines, write_serial from .simulation import pause, restart, resume, start from .transport import Transport @@ -179,5 +181,41 @@ async def serial_monitor_cat(self) -> None: async for line in monitor_lines(self._transport): print(line.decode("utf-8"), end="", flush=True) + async def serial_write(self, data: bytes | str | list[int]) -> None: + """Write data to the simulation serial monitor interface.""" + await write_serial(self._transport, data) + def _on_pause(self, event: EventMessage) -> None: self.last_pause_nanos = int(event["nanos"]) + + async def read_pin(self, part: str, pin: str) -> ResponseMessage: + """Read the current state of a pin. + + Args: + part: The part id (e.g. "uno"). + pin: The pin name (e.g. "A2"). + """ + return await pin_read(self._transport, part=part, pin=pin) + + async def listen_pin(self, part: str, pin: str, listen: bool = True) -> ResponseMessage: + """Start or stop listening for changes on a pin. + + When enabled, "pin:change" events will be delivered via the transport's + event mechanism. + + Args: + part: The part id. + pin: The pin name. + listen: True to start listening, False to stop. + """ + return await pin_listen(self._transport, part=part, pin=pin, listen=listen) + + async def set_control(self, part: str, control: str, value: int | bool | float) -> ResponseMessage: + """Set a control value (e.g. simulate button press). + + Args: + part: Part id (e.g. "btn1"). + control: Control name (e.g. "pressed"). + value: Control value to set (float). + """ + return await set_control(self._transport, part=part, control=control, value=value) diff --git a/src/wokwi_client/control.py b/src/wokwi_client/control.py new file mode 100644 index 0000000..0d5a612 --- /dev/null +++ b/src/wokwi_client/control.py @@ -0,0 +1,31 @@ +"""Control command helpers for virtual parts. + +Provides `set_control` to manipulate part controls (e.g. press a button). + +Assumptions: +* Underlying websocket command name: "control:set". +* Parameter names expected by server: part, control, value. +""" + +# SPDX-FileCopyrightText: 2025-present CodeMagic LTD +# +# SPDX-License-Identifier: MIT + +from .protocol_types import ResponseMessage +from .transport import Transport + + +async def set_control( + transport: Transport, *, part: str, control: str, value: int | bool | float +) -> ResponseMessage: + """Set a control value on a part (e.g. simulate button press/release). + + Args: + transport: Active Transport. + part: Part identifier (e.g. "btn1"). + control: Control name (e.g. "pressed"). + value: Control value to set (float). + """ + return await transport.request( + "control:set", {"part": part, "control": control, "value": float(value)} + ) diff --git a/src/wokwi_client/pins.py b/src/wokwi_client/pins.py new file mode 100644 index 0000000..5e7ce7f --- /dev/null +++ b/src/wokwi_client/pins.py @@ -0,0 +1,49 @@ +"""Pin command helpers for the Wokwi Simulation API. + +This module exposes helper coroutines for issuing pin-related commands: + +* pin:read - Read the current state of a pin. +* pin:listen - Start/stop listening for changes on a pin (emits pin:change + events). +""" + +# SPDX-FileCopyrightText: 2025-present CodeMagic LTD +# +# SPDX-License-Identifier: MIT + +from .protocol_types import ResponseMessage +from .transport import Transport + + +async def pin_read( + transport: Transport, *, part: str, pin: str +) -> ResponseMessage: + """Read the state of a pin. + + Args: + transport: The active Transport instance. + part: Part identifier (e.g. "uno"). + pin: Pin name (e.g. "A2", "13"). + """ + + return await transport.request("pin:read", {"part": part, "pin": pin}) + + +async def pin_listen( + transport: Transport, *, part: str, pin: str, listen: bool = True +) -> ResponseMessage: + """Enable or disable listening for changes on a pin. + + When listening is enabled, "pin:change" events will be emitted with the + pin state. + + Args: + transport: The active Transport instance. + part: Part identifier. + pin: Pin name. + listen: True to start listening, False to stop. + """ + + return await transport.request( + "pin:listen", {"part": part, "pin": pin, "listen": listen} + ) diff --git a/src/wokwi_client/serial.py b/src/wokwi_client/serial.py index fbaea4c..3563740 100644 --- a/src/wokwi_client/serial.py +++ b/src/wokwi_client/serial.py @@ -2,7 +2,7 @@ # # SPDX-License-Identifier: MIT -from collections.abc import AsyncGenerator +from collections.abc import AsyncGenerator, Iterable from .event_queue import EventQueue from .transport import Transport @@ -14,3 +14,17 @@ async def monitor_lines(transport: Transport) -> AsyncGenerator[bytes, None]: while True: event_msg = await queue.get() yield bytes(event_msg["payload"]["bytes"]) + + +async def write_serial(transport: Transport, data: bytes | str | Iterable[int]) -> None: + """Write data to the serial monitor. + + Accepts bytes, str (encoded as utf-8), or an iterable of integer byte values. + """ + if isinstance(data, str): + payload = list(data.encode("utf-8")) + elif isinstance(data, bytes): + payload = list(data) + else: + payload = list(int(b) & 0xFF for b in data) + await transport.request("serial-monitor:write", {"bytes": payload})