diff --git a/src/wokwi_client/client.py b/src/wokwi_client/client.py index 1c72573..03cf878 100644 --- a/src/wokwi_client/client.py +++ b/src/wokwi_client/client.py @@ -6,19 +6,18 @@ from pathlib import Path from typing import Any, Optional, Union, cast -from wokwi_client.exceptions import ProtocolError -from wokwi_client.framebuffer import ( - read_framebuffer_png_bytes, - save_framebuffer_png, -) - from .__version__ import get_version from .constants import DEFAULT_WS_URL from .control import set_control from .event_queue import EventQueue +from .exceptions import ProtocolError from .file_ops import download, upload, upload_file -from .pins import gpio_list, pin_listen, pin_read -from .protocol_types import EventMessage, ResponseMessage +from .framebuffer import ( + read_framebuffer_png_bytes, + save_framebuffer_png, +) +from .pins import PinReadMessage, gpio_list, pin_listen, pin_read +from .protocol_types import EventMessage from .serial import monitor_lines, write_serial from .simulation import pause, restart, resume, start from .transport import Transport @@ -65,33 +64,25 @@ async def disconnect(self) -> None: """ await self._transport.close() - async def upload(self, name: str, content: bytes) -> ResponseMessage: + async def upload(self, name: str, content: bytes) -> None: """ Upload a file to the simulator from bytes content. Args: name: The name to use for the uploaded file. content: The file content as bytes. - - Returns: - The response message from the server. """ - return await upload(self._transport, name, content) + await upload(self._transport, name, content) - async def upload_file( - self, filename: str, local_path: Optional[Path] = None - ) -> ResponseMessage: + async def upload_file(self, filename: str, local_path: Optional[Path] = None) -> None: """ Upload a local file to the simulator. Args: filename: The name to use for the uploaded file. local_path: Optional path to the local file. If not provided, uses filename as the path. - - Returns: - The response message from the server. """ - return await upload_file(self._transport, filename, local_path) + await upload_file(self._transport, filename, local_path) async def download(self, name: str) -> bytes: """ @@ -127,7 +118,7 @@ async def start_simulation( elf: Optional[str] = None, pause: bool = False, chips: list[str] = [], - ) -> ResponseMessage: + ) -> None: """ Start a new simulation with the given parameters. @@ -150,11 +141,8 @@ async def start_simulation( elf: The ELF file filename (optional). pause: Whether to start the simulation paused (default: False). chips: List of custom chips to load into the simulation (default: empty list). - - Returns: - The response message from the server. """ - return await start( + await start( self._transport, firmware=firmware, elf=elf, @@ -162,26 +150,20 @@ async def start_simulation( chips=chips, ) - async def pause_simulation(self) -> ResponseMessage: + async def pause_simulation(self) -> None: """ Pause the running simulation. - - Returns: - The response message from the server. """ - return await pause(self._transport) + await pause(self._transport) - async def resume_simulation(self, pause_after: Optional[int] = None) -> ResponseMessage: + async def resume_simulation(self, pause_after: Optional[int] = None) -> None: """ Resume the simulation, optionally pausing after a given number of nanoseconds. Args: pause_after: Number of nanoseconds to run before pausing again (optional). - - Returns: - The response message from the server. """ - return await resume(self._transport, pause_after) + await resume(self._transport, pause_after) async def wait_until_simulation_time(self, seconds: float) -> None: """ @@ -197,17 +179,14 @@ async def wait_until_simulation_time(self, seconds: float) -> None: await resume(self._transport, int(remaining_nanos)) await self._pause_queue.get() - async def restart_simulation(self, pause: bool = False) -> ResponseMessage: + async def restart_simulation(self, pause: bool = False) -> None: """ Restart the simulation, optionally starting paused. Args: pause: Whether to start the simulation paused (default: False). - - Returns: - The response message from the server. """ - return await restart(self._transport, pause) + await restart(self._transport, pause) async def serial_monitor_cat(self, decode_utf8: bool = True, errors: str = "replace") -> None: """ @@ -235,16 +214,17 @@ async def serial_write(self, data: Union[bytes, str, list[int]]) -> None: def _on_pause(self, event: EventMessage) -> None: self.last_pause_nanos = int(event["nanos"]) - async def read_pin(self, part: str, pin: str) -> ResponseMessage: + async def read_pin(self, part: str, pin: str) -> PinReadMessage: """Read the current state of a pin. Args: part: The part id (e.g. "uno"). pin: The pin name (e.g. "A2"). """ - return await pin_read(self._transport, part=part, pin=pin) + pin_data = await pin_read(self._transport, part=part, pin=pin) + return cast(PinReadMessage, pin_data["result"]) - async def listen_pin(self, part: str, pin: str, listen: bool = True) -> ResponseMessage: + async def listen_pin(self, part: str, pin: str, listen: bool = True) -> None: """Start or stop listening for changes on a pin. When enabled, "pin:change" events will be delivered via the transport's @@ -255,7 +235,7 @@ async def listen_pin(self, part: str, pin: str, listen: bool = True) -> Response pin: The pin name. listen: True to start listening, False to stop. """ - return await pin_listen(self._transport, part=part, pin=pin, listen=listen) + await pin_listen(self._transport, part=part, pin=pin, listen=listen) async def gpio_list(self) -> list[str]: """Get a list of all GPIO pins available in the simulation. @@ -269,9 +249,7 @@ async def gpio_list(self) -> list[str]: raise ProtocolError("Malformed gpio:list response: expected result.pins: list[str]") return cast(list[str], pins_val) - async def set_control( - self, part: str, control: str, value: Union[int, bool, float] - ) -> ResponseMessage: + async def set_control(self, part: str, control: str, value: Union[int, bool, float]) -> None: """Set a control value (e.g. simulate button press). Args: @@ -279,7 +257,7 @@ async def set_control( control: Control name (e.g. "pressed"). value: Control value to set (float). """ - return await set_control(self._transport, part=part, control=control, value=value) + await set_control(self._transport, part=part, control=control, value=value) async def read_framebuffer_png_bytes(self, id: str) -> bytes: """Return the current framebuffer as PNG bytes.""" diff --git a/src/wokwi_client/pins.py b/src/wokwi_client/pins.py index 87d96a9..aa4c79b 100644 --- a/src/wokwi_client/pins.py +++ b/src/wokwi_client/pins.py @@ -11,10 +11,30 @@ # # SPDX-License-Identifier: MIT -from .protocol_types import ResponseMessage +from typing import TypedDict, Union + +from wokwi_client.protocol_types import ResponseMessage + from .transport import Transport +class PinReadMessage(TypedDict): + pin: str + direction: str + value: Union[float, int, bool] + pullUp: bool + pullDown: bool + + +class PinListenEvent(TypedDict): + part: str + pin: str + direction: str + value: Union[float, int, bool] + pullUp: bool + pullDown: bool + + async def pin_read(transport: Transport, *, part: str, pin: str) -> ResponseMessage: """Read the state of a pin. @@ -27,9 +47,7 @@ async def pin_read(transport: Transport, *, part: str, pin: str) -> ResponseMess return await transport.request("pin:read", {"part": part, "pin": pin}) -async def pin_listen( - transport: Transport, *, part: str, pin: str, listen: bool = True -) -> ResponseMessage: +async def pin_listen(transport: Transport, *, part: str, pin: str, listen: bool = True) -> None: """Enable or disable listening for changes on a pin. When listening is enabled, "pin:change" events will be emitted with the @@ -42,7 +60,7 @@ async def pin_listen( listen: True to start listening, False to stop. """ - return await transport.request("pin:listen", {"part": part, "pin": pin, "listen": listen}) + await transport.request("pin:listen", {"part": part, "pin": pin, "listen": listen}) async def gpio_list(transport: Transport) -> ResponseMessage: