From 7e6c4993af14642d89082a2bf17e11e880200124 Mon Sep 17 00:00:00 2001 From: Jakub Andrysek Date: Mon, 11 Aug 2025 12:36:38 +0200 Subject: [PATCH 1/3] feat: add pin control and reading functionalities --- src/wokwi_client/client.py | 34 +++++++++++++++++++++++++ src/wokwi_client/control.py | 31 +++++++++++++++++++++++ src/wokwi_client/pins.py | 49 +++++++++++++++++++++++++++++++++++++ 3 files changed, 114 insertions(+) create mode 100644 src/wokwi_client/control.py create mode 100644 src/wokwi_client/pins.py diff --git a/src/wokwi_client/client.py b/src/wokwi_client/client.py index 01354ae..7308300 100644 --- a/src/wokwi_client/client.py +++ b/src/wokwi_client/client.py @@ -7,8 +7,10 @@ from .__version__ import get_version from .constants import DEFAULT_WS_URL +from .control import set_control from .event_queue import EventQueue from .file_ops import upload, upload_file +from .pins import pin_listen, pin_read from .protocol_types import EventMessage, ResponseMessage from .serial import monitor_lines from .simulation import pause, restart, resume, start @@ -181,3 +183,35 @@ async def serial_monitor_cat(self) -> None: def _on_pause(self, event: EventMessage) -> None: self.last_pause_nanos = int(event["nanos"]) + + async def read_pin(self, part: str, pin: str) -> ResponseMessage: + """Read the current state of a pin. + + Args: + part: The part id (e.g. "uno"). + pin: The pin name (e.g. "A2"). + """ + return await pin_read(self._transport, part=part, pin=pin) + + async def listen_pin(self, part: str, pin: str, listen: bool = True) -> ResponseMessage: + """Start or stop listening for changes on a pin. + + When enabled, "pin:change" events will be delivered via the transport's + event mechanism. + + Args: + part: The part id. + pin: The pin name. + listen: True to start listening, False to stop. + """ + return await pin_listen(self._transport, part=part, pin=pin, listen=listen) + + async def set_control(self, part: str, control: str, value: int | bool | float) -> ResponseMessage: + """Set a control value (e.g. simulate button press). + + Args: + part: Part id (e.g. "btn1"). + control: Control name (e.g. "pressed"). + value: Control value to set (float). + """ + return await set_control(self._transport, part=part, control=control, value=value) diff --git a/src/wokwi_client/control.py b/src/wokwi_client/control.py new file mode 100644 index 0000000..0d5a612 --- /dev/null +++ b/src/wokwi_client/control.py @@ -0,0 +1,31 @@ +"""Control command helpers for virtual parts. + +Provides `set_control` to manipulate part controls (e.g. press a button). + +Assumptions: +* Underlying websocket command name: "control:set". +* Parameter names expected by server: part, control, value. +""" + +# SPDX-FileCopyrightText: 2025-present CodeMagic LTD +# +# SPDX-License-Identifier: MIT + +from .protocol_types import ResponseMessage +from .transport import Transport + + +async def set_control( + transport: Transport, *, part: str, control: str, value: int | bool | float +) -> ResponseMessage: + """Set a control value on a part (e.g. simulate button press/release). + + Args: + transport: Active Transport. + part: Part identifier (e.g. "btn1"). + control: Control name (e.g. "pressed"). + value: Control value to set (float). + """ + return await transport.request( + "control:set", {"part": part, "control": control, "value": float(value)} + ) diff --git a/src/wokwi_client/pins.py b/src/wokwi_client/pins.py new file mode 100644 index 0000000..5e7ce7f --- /dev/null +++ b/src/wokwi_client/pins.py @@ -0,0 +1,49 @@ +"""Pin command helpers for the Wokwi Simulation API. + +This module exposes helper coroutines for issuing pin-related commands: + +* pin:read - Read the current state of a pin. +* pin:listen - Start/stop listening for changes on a pin (emits pin:change + events). +""" + +# SPDX-FileCopyrightText: 2025-present CodeMagic LTD +# +# SPDX-License-Identifier: MIT + +from .protocol_types import ResponseMessage +from .transport import Transport + + +async def pin_read( + transport: Transport, *, part: str, pin: str +) -> ResponseMessage: + """Read the state of a pin. + + Args: + transport: The active Transport instance. + part: Part identifier (e.g. "uno"). + pin: Pin name (e.g. "A2", "13"). + """ + + return await transport.request("pin:read", {"part": part, "pin": pin}) + + +async def pin_listen( + transport: Transport, *, part: str, pin: str, listen: bool = True +) -> ResponseMessage: + """Enable or disable listening for changes on a pin. + + When listening is enabled, "pin:change" events will be emitted with the + pin state. + + Args: + transport: The active Transport instance. + part: Part identifier. + pin: Pin name. + listen: True to start listening, False to stop. + """ + + return await transport.request( + "pin:listen", {"part": part, "pin": pin, "listen": listen} + ) From 6f068f39af4d10d12005890007a0b83716ad26d1 Mon Sep 17 00:00:00 2001 From: Jakub Andrysek Date: Mon, 11 Aug 2025 12:38:43 +0200 Subject: [PATCH 2/3] docs: update features list to include control of peripherals and GPIO pins --- docs/index.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/index.md b/docs/index.md index 953341f..6e5553d 100644 --- a/docs/index.md +++ b/docs/index.md @@ -8,6 +8,7 @@ Typed, asyncio-friendly Python SDK for the **Wokwi Simulation API**. - Upload diagrams and firmware files - Start, pause, resume, and restart simulations - Monitor serial output asynchronously +- Control peripherals and read GPIO pins - Fully type-annotated and easy to use with asyncio ## Installation From 538191a0252942d9db8309eb52948bc558b46129 Mon Sep 17 00:00:00 2001 From: Jakub Andrysek Date: Tue, 12 Aug 2025 12:36:18 +0200 Subject: [PATCH 3/3] feat: enhance serial monitor functionality with write support and update documentation --- README.md | 4 +++- docs/index.md | 2 +- src/wokwi_client/client.py | 6 +++++- src/wokwi_client/serial.py | 16 +++++++++++++++- 4 files changed, 24 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index b3be48e..999efaa 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,9 @@ The basic example is in the [examples/hello_esp32/main.py](examples/hello_esp32/ - Connect to the Wokwi Simulator - Upload a diagram and firmware files - Start a simulation -- Monitor the serial output +- Monitor the serial output and write to them +- Read GPIO pins +- Control peripherals (buttons, MPU6050, etc.) You can run the example with: diff --git a/docs/index.md b/docs/index.md index 6e5553d..5f684c6 100644 --- a/docs/index.md +++ b/docs/index.md @@ -7,7 +7,7 @@ Typed, asyncio-friendly Python SDK for the **Wokwi Simulation API**. - Connect to the Wokwi Simulator from Python - Upload diagrams and firmware files - Start, pause, resume, and restart simulations -- Monitor serial output asynchronously +- Monitor serial output asynchronously and write to them - Control peripherals and read GPIO pins - Fully type-annotated and easy to use with asyncio diff --git a/src/wokwi_client/client.py b/src/wokwi_client/client.py index 7308300..9858c40 100644 --- a/src/wokwi_client/client.py +++ b/src/wokwi_client/client.py @@ -12,7 +12,7 @@ from .file_ops import upload, upload_file from .pins import pin_listen, pin_read from .protocol_types import EventMessage, ResponseMessage -from .serial import monitor_lines +from .serial import monitor_lines, write_serial from .simulation import pause, restart, resume, start from .transport import Transport @@ -181,6 +181,10 @@ async def serial_monitor_cat(self) -> None: async for line in monitor_lines(self._transport): print(line.decode("utf-8"), end="", flush=True) + async def serial_write(self, data: bytes | str | list[int]) -> None: + """Write data to the simulation serial monitor interface.""" + await write_serial(self._transport, data) + def _on_pause(self, event: EventMessage) -> None: self.last_pause_nanos = int(event["nanos"]) diff --git a/src/wokwi_client/serial.py b/src/wokwi_client/serial.py index fbaea4c..3563740 100644 --- a/src/wokwi_client/serial.py +++ b/src/wokwi_client/serial.py @@ -2,7 +2,7 @@ # # SPDX-License-Identifier: MIT -from collections.abc import AsyncGenerator +from collections.abc import AsyncGenerator, Iterable from .event_queue import EventQueue from .transport import Transport @@ -14,3 +14,17 @@ async def monitor_lines(transport: Transport) -> AsyncGenerator[bytes, None]: while True: event_msg = await queue.get() yield bytes(event_msg["payload"]["bytes"]) + + +async def write_serial(transport: Transport, data: bytes | str | Iterable[int]) -> None: + """Write data to the serial monitor. + + Accepts bytes, str (encoded as utf-8), or an iterable of integer byte values. + """ + if isinstance(data, str): + payload = list(data.encode("utf-8")) + elif isinstance(data, bytes): + payload = list(data) + else: + payload = list(int(b) & 0xFF for b in data) + await transport.request("serial-monitor:write", {"bytes": payload})