Skip to content

Commit 080cbc2

Browse files
committed
Trying some stuff
1 parent d1a8f80 commit 080cbc2

File tree

1 file changed

+55
-2
lines changed

1 file changed

+55
-2
lines changed

async_substrate_interface/async_substrate.py

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -571,6 +571,8 @@ def __init__(
571571
self._log_raw_websockets = _log_raw_websockets
572572
self._in_use_ids = set()
573573
self._max_retries = max_retries
574+
self._last_activity = asyncio.Event()
575+
self._last_activity.set()
574576

575577
@property
576578
def state(self):
@@ -588,6 +590,50 @@ async def __aenter__(self):
588590
async def loop_time() -> float:
589591
return asyncio.get_running_loop().time()
590592

593+
async def _reset_activity_timer(self):
594+
"""Reset the shared activity timeout"""
595+
self._last_activity.set()
596+
self._last_activity.clear()
597+
598+
async def _wait_with_activity_timeout(self, coro, timeout: float):
599+
"""
600+
Wait for a coroutine with a shared activity timeout.
601+
Returns the result or raises TimeoutError if no activity for timeout seconds.
602+
"""
603+
activity_task = asyncio.create_task(self._last_activity.wait())
604+
605+
# Handle both coroutines and tasks
606+
if isinstance(coro, asyncio.Task):
607+
main_task = coro
608+
else:
609+
main_task = asyncio.create_task(coro)
610+
611+
try:
612+
done, pending = await asyncio.wait(
613+
[main_task, activity_task],
614+
timeout=timeout,
615+
return_when=asyncio.FIRST_COMPLETED
616+
)
617+
618+
if not done: # Timeout occurred
619+
for task in pending:
620+
task.cancel()
621+
raise asyncio.TimeoutError()
622+
623+
# Check which completed
624+
if main_task in done:
625+
activity_task.cancel()
626+
return main_task.result()
627+
else: # activity_task completed (activity occurred elsewhere)
628+
# Recursively wait again with fresh timeout
629+
# main_task is already a Task, so pass it directly
630+
return await self._wait_with_activity_timeout(main_task, timeout)
631+
632+
except asyncio.CancelledError:
633+
main_task.cancel()
634+
activity_task.cancel()
635+
raise
636+
591637
async def _cancel(self):
592638
try:
593639
self._send_recv_task.cancel()
@@ -657,6 +703,8 @@ async def _handler(self, ws: ClientConnection) -> Union[None, Exception]:
657703
await self._sending.put(to_send)
658704
if is_retry:
659705
# Otherwise the connection was just closed due to no activity, which should not count against retries
706+
if self._attempts >= self._max_retries:
707+
return TimeoutError("Max retries exceeded.")
660708
logger.info(
661709
f"Timeout occurred. Reconnecting. Attempt {self._attempts} of {self._max_retries}"
662710
)
@@ -728,13 +776,16 @@ async def _recv(self, recd: bytes) -> None:
728776
async def _start_receiving(self, ws: ClientConnection) -> Exception:
729777
try:
730778
while True:
731-
recd = await asyncio.wait_for(
732-
ws.recv(decode=False), timeout=self.retry_timeout
779+
recd = await self._wait_with_activity_timeout(
780+
ws.recv(decode=False),
781+
self.retry_timeout
733782
)
783+
await self._reset_activity_timer()
734784
# reset the counter once we successfully receive something back
735785
self._attempts = 0
736786
await self._recv(recd)
737787
except Exception as e:
788+
logger.exception("Maybe timeout? 738", exc_info=e)
738789
if isinstance(e, ssl.SSLError):
739790
e = ConnectionClosed
740791
if not isinstance(
@@ -764,7 +815,9 @@ async def _start_sending(self, ws) -> Exception:
764815
if self._log_raw_websockets:
765816
raw_websocket_logger.debug(f"WEBSOCKET_SEND> {to_send}")
766817
await ws.send(to_send)
818+
await self._reset_activity_timer()
767819
except Exception as e:
820+
logger.exception("Maybe timeout? 769", exc_info=e)
768821
if isinstance(e, ssl.SSLError):
769822
e = ConnectionClosed
770823
if not isinstance(

0 commit comments

Comments
 (0)