Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ The basic example is in the [examples/hello_esp32/main.py](examples/hello_esp32/
- Connect to the Wokwi Simulator
- Upload a diagram and firmware files
- Start a simulation
- Monitor the serial output
- Monitor the serial output and write to them
- Read GPIO pins
- Control peripherals (buttons, MPU6050, etc.)

You can run the example with:

Expand Down
3 changes: 2 additions & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ Typed, asyncio-friendly Python SDK for the **Wokwi Simulation API**.
- Connect to the Wokwi Simulator from Python
- Upload diagrams and firmware files
- Start, pause, resume, and restart simulations
- Monitor serial output asynchronously
- Monitor serial output asynchronously and write to them
- Control peripherals and read GPIO pins
- Fully type-annotated and easy to use with asyncio

## Installation
Expand Down
40 changes: 39 additions & 1 deletion src/wokwi_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@

from .__version__ import get_version
from .constants import DEFAULT_WS_URL
from .control import set_control
from .event_queue import EventQueue
from .file_ops import upload, upload_file
from .pins import pin_listen, pin_read
from .protocol_types import EventMessage, ResponseMessage
from .serial import monitor_lines
from .serial import monitor_lines, write_serial
from .simulation import pause, restart, resume, start
from .transport import Transport

Expand Down Expand Up @@ -179,5 +181,41 @@ async def serial_monitor_cat(self) -> None:
async for line in monitor_lines(self._transport):
print(line.decode("utf-8"), end="", flush=True)

async def serial_write(self, data: bytes | str | list[int]) -> None:
"""Write data to the simulation serial monitor interface."""
await write_serial(self._transport, data)

def _on_pause(self, event: EventMessage) -> None:
self.last_pause_nanos = int(event["nanos"])

async def read_pin(self, part: str, pin: str) -> ResponseMessage:
"""Read the current state of a pin.

Args:
part: The part id (e.g. "uno").
pin: The pin name (e.g. "A2").
"""
return await pin_read(self._transport, part=part, pin=pin)

async def listen_pin(self, part: str, pin: str, listen: bool = True) -> ResponseMessage:
"""Start or stop listening for changes on a pin.

When enabled, "pin:change" events will be delivered via the transport's
event mechanism.

Args:
part: The part id.
pin: The pin name.
listen: True to start listening, False to stop.
"""
return await pin_listen(self._transport, part=part, pin=pin, listen=listen)

async def set_control(self, part: str, control: str, value: int | bool | float) -> ResponseMessage:
"""Set a control value (e.g. simulate button press).

Args:
part: Part id (e.g. "btn1").
control: Control name (e.g. "pressed").
value: Control value to set (float).
"""
return await set_control(self._transport, part=part, control=control, value=value)
31 changes: 31 additions & 0 deletions src/wokwi_client/control.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""Control command helpers for virtual parts.

Provides `set_control` to manipulate part controls (e.g. press a button).

Assumptions:
* Underlying websocket command name: "control:set".
* Parameter names expected by server: part, control, value.
"""

# SPDX-FileCopyrightText: 2025-present CodeMagic LTD
#
# SPDX-License-Identifier: MIT

from .protocol_types import ResponseMessage
from .transport import Transport


async def set_control(
transport: Transport, *, part: str, control: str, value: int | bool | float
) -> ResponseMessage:
"""Set a control value on a part (e.g. simulate button press/release).

Args:
transport: Active Transport.
part: Part identifier (e.g. "btn1").
control: Control name (e.g. "pressed").
value: Control value to set (float).
"""
return await transport.request(
"control:set", {"part": part, "control": control, "value": float(value)}
)
49 changes: 49 additions & 0 deletions src/wokwi_client/pins.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""Pin command helpers for the Wokwi Simulation API.

This module exposes helper coroutines for issuing pin-related commands:

* pin:read - Read the current state of a pin.
* pin:listen - Start/stop listening for changes on a pin (emits pin:change
events).
"""

# SPDX-FileCopyrightText: 2025-present CodeMagic LTD
#
# SPDX-License-Identifier: MIT

from .protocol_types import ResponseMessage
from .transport import Transport


async def pin_read(
transport: Transport, *, part: str, pin: str
) -> ResponseMessage:
"""Read the state of a pin.

Args:
transport: The active Transport instance.
part: Part identifier (e.g. "uno").
pin: Pin name (e.g. "A2", "13").
"""

return await transport.request("pin:read", {"part": part, "pin": pin})


async def pin_listen(
transport: Transport, *, part: str, pin: str, listen: bool = True
) -> ResponseMessage:
"""Enable or disable listening for changes on a pin.

When listening is enabled, "pin:change" events will be emitted with the
pin state.

Args:
transport: The active Transport instance.
part: Part identifier.
pin: Pin name.
listen: True to start listening, False to stop.
"""

return await transport.request(
"pin:listen", {"part": part, "pin": pin, "listen": listen}
)
16 changes: 15 additions & 1 deletion src/wokwi_client/serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#
# SPDX-License-Identifier: MIT

from collections.abc import AsyncGenerator
from collections.abc import AsyncGenerator, Iterable

from .event_queue import EventQueue
from .transport import Transport
Expand All @@ -14,3 +14,17 @@ async def monitor_lines(transport: Transport) -> AsyncGenerator[bytes, None]:
while True:
event_msg = await queue.get()
yield bytes(event_msg["payload"]["bytes"])


async def write_serial(transport: Transport, data: bytes | str | Iterable[int]) -> None:
"""Write data to the serial monitor.

Accepts bytes, str (encoded as utf-8), or an iterable of integer byte values.
"""
if isinstance(data, str):
payload = list(data.encode("utf-8"))
elif isinstance(data, bytes):
payload = list(data)
else:
payload = list(int(b) & 0xFF for b in data)
await transport.request("serial-monitor:write", {"bytes": payload})
Loading