Skip to content

Commit 93c29b0

Browse files
committed
fix(client_sync): improve event loop initialization and background task management
1 parent 7f84826 commit 93c29b0

File tree

2 files changed

+77
-62
lines changed

2 files changed

+77
-62
lines changed

src/wokwi_client/client_sync.py

Lines changed: 72 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -44,29 +44,32 @@ class WokwiClientSync:
4444
last_pause_nanos: int # this proxy resolves via __getattr__
4545

4646
def __init__(self, token: str, server: str | None = None):
47-
# Create a fresh event loop + thread (daemon so it won't prevent process exit).
47+
# Create a new event loop for the background thread
4848
self._loop = asyncio.new_event_loop()
49+
# Event to signal that the event loop is running
50+
self._loop_started_event = threading.Event()
51+
# Start background thread running the event loop
4952
self._thread = threading.Thread(
5053
target=self._run_loop, args=(self._loop,), daemon=True, name="wokwi-sync-loop"
5154
)
5255
self._thread.start()
53-
54-
# Underlying async client
56+
# **Wait until loop is fully started before proceeding** (prevents race conditions)
57+
if not self._loop_started_event.wait(timeout=5.0): # timeout to avoid deadlock
58+
raise RuntimeError("WokwiClientSync event loop failed to start")
59+
# Initialize underlying async client on the running loop
5560
self._async_client = WokwiClient(token, server)
56-
57-
# Mirror library version for quick access
61+
# Mirror version attribute for quick access
5862
self.version = self._async_client.version
59-
60-
# Track background tasks created via run_coroutine_threadsafe (serial monitors)
63+
# Track background monitor tasks (futures) for cancellation on exit
6164
self._bg_futures: set[Future[Any]] = set()
62-
63-
# Idempotent disconnect guard
65+
# Flag to avoid double-closing
6466
self._closed = False
6567

66-
@staticmethod
67-
def _run_loop(loop: asyncio.AbstractEventLoop) -> None:
68-
"""Background thread loop runner."""
68+
def _run_loop(self, loop: asyncio.AbstractEventLoop) -> None:
69+
"""Target function for the background thread: runs the asyncio event loop."""
6970
asyncio.set_event_loop(loop)
71+
# Signal that the loop is now running and ready to accept tasks
72+
loop.call_soon(self._loop_started_event.set)
7073
loop.run_forever()
7174

7275
# ----- Internal helpers -------------------------------------------------
@@ -75,8 +78,11 @@ def _submit(self, coro: Coroutine[Any, Any, T]) -> Future[T]:
7578
return asyncio.run_coroutine_threadsafe(coro, self._loop)
7679

7780
def _call(self, coro: Coroutine[Any, Any, T]) -> T:
78-
"""Submit a coroutine to the loop and block until it completes (or raises)."""
79-
return self._submit(coro).result()
81+
"""Submit a coroutine to the background loop and wait for result."""
82+
if self._closed:
83+
raise RuntimeError("Cannot call methods on a closed WokwiClientSync")
84+
future = asyncio.run_coroutine_threadsafe(coro, self._loop)
85+
return future.result() # Block until the coroutine completes or raises
8086

8187
def _add_bg_future(self, fut: Future[Any]) -> None:
8288
"""Track a background future so we can cancel & drain on shutdown."""
@@ -96,37 +102,35 @@ def connect(self) -> dict[str, Any]:
96102
return self._call(self._async_client.connect())
97103

98104
def disconnect(self) -> None:
99-
"""Disconnect and stop the background loop.
100-
101-
Order matters:
102-
1) Cancel and drain background serial-monitor futures.
103-
2) Disconnect the underlying transport.
104-
3) Stop the loop and join the thread.
105-
Safe to call multiple times.
106-
"""
107105
if self._closed:
108106
return
109-
self._closed = True
110107

111108
# (1) Cancel + drain monitors
112109
for fut in list(self._bg_futures):
113110
fut.cancel()
114111
for fut in list(self._bg_futures):
115112
with contextlib.suppress(FutureTimeoutError, Exception):
116-
# Give each monitor a short window to handle cancellation cleanly.
117113
fut.result(timeout=1.0)
118114
self._bg_futures.discard(fut)
119115

120116
# (2) Disconnect transport
121117
with contextlib.suppress(Exception):
122-
self._call(self._async_client._transport.close())
118+
fut = asyncio.run_coroutine_threadsafe(self._async_client.disconnect(), self._loop)
119+
fut.result(timeout=2.0)
123120

124121
# (3) Stop loop / join thread
125122
if self._loop.is_running():
126123
self._loop.call_soon_threadsafe(self._loop.stop)
127124
if self._thread.is_alive():
128125
self._thread.join(timeout=5.0)
129126

127+
# (4) Close loop
128+
with contextlib.suppress(Exception):
129+
self._loop.close()
130+
131+
# (5) Mark closed at the very end
132+
self._closed = True
133+
130134
# ----- Serial monitoring ------------------------------------------------
131135
def serial_monitor(self, callback: Callable[[bytes], Any]) -> None:
132136
"""
@@ -138,17 +142,25 @@ def serial_monitor(self, callback: Callable[[bytes], Any]) -> None:
138142
"""
139143

140144
async def _runner() -> None:
141-
async for line in monitor_lines(self._async_client._transport):
142-
try:
143-
maybe_awaitable = callback(line)
144-
if inspect.isawaitable(maybe_awaitable):
145-
await maybe_awaitable
146-
except Exception:
147-
# Keep the monitor alive even if the callback throws.
148-
pass
149-
150-
fut = self._submit(_runner())
151-
self._add_bg_future(fut)
145+
try:
146+
# **Prepare to receive serial events before enabling monitor**
147+
# (monitor_lines will subscribe to serial events internally)
148+
async for line in monitor_lines(self._async_client._transport):
149+
try:
150+
result = callback(line) # invoke callback with the raw bytes line
151+
if inspect.isawaitable(result):
152+
await result # await if callback is async
153+
except Exception:
154+
# Swallow exceptions from callback to keep monitor alive
155+
pass
156+
finally:
157+
# Remove this task’s future from the set when done
158+
self._bg_futures.discard(task_future)
159+
160+
# Schedule the serial monitor runner on the event loop:
161+
task_future = asyncio.run_coroutine_threadsafe(_runner(), self._loop)
162+
self._bg_futures.add(task_future)
163+
# (No return value; monitoring happens in background)
152164

153165
def serial_monitor_cat(self, decode_utf8: bool = True, errors: str = "replace") -> None:
154166
"""
@@ -160,34 +172,32 @@ def serial_monitor_cat(self, decode_utf8: bool = True, errors: str = "replace")
160172
"""
161173

162174
async def _runner() -> None:
163-
async for line in monitor_lines(self._async_client._transport):
164-
try:
165-
if decode_utf8:
166-
try:
167-
print(line.decode("utf-8", errors=errors), end="", flush=True)
168-
except UnicodeDecodeError:
175+
try:
176+
# **Subscribe to serial events before reading output**
177+
async for line in monitor_lines(self._async_client._transport):
178+
try:
179+
if decode_utf8:
180+
# Decode bytes to string (handle errors per parameter)
181+
text = line.decode("utf-8", errors=errors)
182+
print(text, end="", flush=True)
183+
else:
184+
# Print raw bytes
169185
print(line, end="", flush=True)
170-
else:
171-
print(line, end="", flush=True)
172-
except Exception:
173-
# Keep the monitor alive even if printing raises intermittently.
174-
pass
186+
except Exception:
187+
# Swallow print errors to keep stream alive
188+
pass
189+
finally:
190+
self._bg_futures.discard(task_future)
175191

176-
fut = self._submit(_runner())
177-
self._add_bg_future(fut)
192+
task_future = asyncio.run_coroutine_threadsafe(_runner(), self._loop)
193+
self._bg_futures.add(task_future)
194+
# (No return; printing continues in background)
178195

179196
def stop_serial_monitors(self) -> None:
180-
"""
181-
Cancel and drain all running serial monitors without disconnecting.
182-
183-
Useful if you want to stop printing but keep the connection alive.
184-
"""
197+
"""Stop all active serial monitor background tasks."""
185198
for fut in list(self._bg_futures):
186199
fut.cancel()
187-
for fut in list(self._bg_futures):
188-
with contextlib.suppress(FutureTimeoutError, Exception):
189-
fut.result(timeout=1.0)
190-
self._bg_futures.discard(fut)
200+
self._bg_futures.clear()
191201

192202
# ----- Dynamic method wrapping -----------------------------------------
193203
def __getattr__(self, name: str) -> Any:
@@ -197,16 +207,17 @@ def __getattr__(self, name: str) -> Any:
197207
If the attribute on `WokwiClient` is a coroutine function, return a
198208
sync wrapper that blocks until the coroutine completes.
199209
"""
200-
# Explicit methods above (serial monitors) take precedence.
210+
# Explicit methods (like serial_monitor functions above) take precedence over __getattr__
201211
attr = getattr(self._async_client, name)
202212
if callable(attr):
213+
# Get the function object from WokwiClient class (unbound) to check if coroutine
203214
func = getattr(WokwiClient, name, None)
204215
if func is not None and inspect.iscoroutinefunction(func):
205-
216+
# Wrap coroutine method to run in background loop
206217
def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
207218
return self._call(attr(*args, **kwargs))
208219

209220
sync_wrapper.__name__ = name
210-
sync_wrapper.__doc__ = func.__doc__
221+
sync_wrapper.__doc__ = getattr(func, "__doc__", "")
211222
return sync_wrapper
212223
return attr

src/wokwi_client/serial.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,12 @@
1010

1111

1212
async def monitor_lines(transport: Transport) -> AsyncGenerator[bytes, None]:
13-
await transport.request("serial-monitor:listen", {})
13+
"""
14+
Monitor the serial output lines.
15+
"""
16+
# Create the queue/listener before enabling the monitor to catch all data
1417
with EventQueue(transport, "serial-monitor:data") as queue:
18+
await transport.request("serial-monitor:listen", {})
1519
while True:
1620
event_msg = await queue.get()
1721
yield bytes(event_msg["payload"]["bytes"])

0 commit comments

Comments
 (0)