Skip to content

Commit 60923e1

Browse files
committed
Seems to work
1 parent f805aaa commit 60923e1

File tree

1 file changed

+119
-52
lines changed

1 file changed

+119
-52
lines changed

async_substrate_interface/async_substrate.py

Lines changed: 119 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import inspect
99
import logging
1010
import os
11+
import socket
1112
import ssl
1213
import warnings
1314
from contextlib import suppress
@@ -34,6 +35,7 @@
3435
ss58_encode,
3536
MultiAccountId,
3637
)
38+
from websockets import CloseCode
3739
from websockets.asyncio.client import connect, ClientConnection
3840
from websockets.exceptions import (
3941
ConnectionClosed,
@@ -592,8 +594,11 @@ async def loop_time() -> float:
592594

593595
async def _reset_activity_timer(self):
594596
"""Reset the shared activity timeout"""
595-
self._last_activity.set()
596-
self._last_activity.clear()
597+
# Create a NEW event instead of reusing the same one
598+
old_event = self._last_activity
599+
self._last_activity = asyncio.Event()
600+
self._last_activity.clear() # Start fresh
601+
old_event.set() # Wake up anyone waiting on the old event
597602

598603
async def _wait_with_activity_timeout(self, coro, timeout: float):
599604
"""
@@ -612,21 +617,28 @@ async def _wait_with_activity_timeout(self, coro, timeout: float):
612617
done, pending = await asyncio.wait(
613618
[main_task, activity_task],
614619
timeout=timeout,
615-
return_when=asyncio.FIRST_COMPLETED
620+
return_when=asyncio.FIRST_COMPLETED,
616621
)
617622

618623
if not done: # Timeout occurred
624+
logger.debug(f"Activity timeout after {timeout}s, no activity detected")
619625
for task in pending:
620626
task.cancel()
621-
raise asyncio.TimeoutError()
627+
raise TimeoutError()
622628

623629
# Check which completed
624630
if main_task in done:
625631
activity_task.cancel()
626-
return main_task.result()
632+
633+
# Check if the task raised an exception
634+
exc = main_task.exception()
635+
if exc is not None:
636+
raise exc
637+
else:
638+
return main_task.result()
627639
else: # activity_task completed (activity occurred elsewhere)
640+
logger.debug("Activity detected, resetting timeout")
628641
# Recursively wait again with fresh timeout
629-
# main_task is already a Task, so pass it directly
630642
return await self._wait_with_activity_timeout(main_task, timeout)
631643

632644
except asyncio.CancelledError:
@@ -636,46 +648,72 @@ async def _wait_with_activity_timeout(self, coro, timeout: float):
636648

637649
async def _cancel(self):
638650
try:
639-
self._send_recv_task.cancel()
640-
await self.ws.close()
641-
except (
642-
AttributeError,
643-
asyncio.CancelledError,
644-
WebSocketException,
645-
):
651+
logger.debug("Cancelling send/recv tasks")
652+
if self._send_recv_task is not None:
653+
self._send_recv_task.cancel()
654+
except asyncio.CancelledError:
646655
pass
647656
except Exception as e:
648657
logger.warning(
649658
f"{e} encountered while trying to close websocket connection."
650659
)
660+
try:
661+
logger.debug("Closing websocket connection")
662+
if self.ws is not None:
663+
await self.ws.close()
664+
except Exception as e:
665+
logger.warning(
666+
f"{e} encountered while trying to close websocket connection."
667+
)
651668

652669
async def connect(self, force=False):
653-
async with self._lock:
654-
logger.debug(f"Websocket connecting to {self.ws_url}")
655-
if self._sending is None or self._sending.empty():
656-
self._sending = asyncio.Queue()
657-
if self._exit_task:
658-
self._exit_task.cancel()
659-
logger.debug(f"self.state={self.state}")
660-
if self.state not in (State.OPEN, State.CONNECTING) or force:
670+
if not force:
671+
await self._lock.acquire()
672+
else:
673+
logger.debug("Proceeding without acquiring lock.")
674+
logger.debug(f"Websocket connecting to {self.ws_url}")
675+
if self._sending is None or self._sending.empty():
676+
self._sending = asyncio.Queue()
677+
if self._exit_task:
678+
self._exit_task.cancel()
679+
logger.debug(f"self.state={self.state}")
680+
if force and self.state == State.OPEN:
681+
logger.debug(f"Attempting to reconnect while already connected.")
682+
if self.ws is not None:
683+
self.ws.protocol.fail(CloseCode.SERVICE_RESTART)
684+
logger.debug(f"Open connection cancelled.")
685+
await asyncio.sleep(1)
686+
if self.state not in (State.OPEN, State.CONNECTING) or force:
687+
if not force:
661688
try:
689+
logger.debug("Attempting cancellation")
662690
await asyncio.wait_for(self._cancel(), timeout=10.0)
663691
except asyncio.TimeoutError:
664692
logger.debug(f"Timed out waiting for cancellation")
665693
pass
666-
logger.debug("Attempting connection")
694+
logger.debug("Attempting connection")
695+
try:
667696
connection = await asyncio.wait_for(
668697
connect(self.ws_url, **self._options), timeout=10.0
669698
)
670-
logger.debug("Connection established")
671-
self.ws = connection
672-
if self._send_recv_task is None or self._send_recv_task.done():
673-
self._send_recv_task = asyncio.get_running_loop().create_task(
674-
self._handler(self.ws)
675-
)
676-
logger.debug("Websocket handler attached.")
699+
except socket.gaierror:
700+
logger.debug(f"Hostname not known (this is just for testing")
701+
await asyncio.sleep(10)
702+
if self._lock.locked():
703+
self._lock.release()
704+
return await self.connect(force=force)
705+
logger.debug("Connection established")
706+
self.ws = connection
707+
if self._send_recv_task is None or self._send_recv_task.done():
708+
self._send_recv_task = asyncio.get_running_loop().create_task(
709+
self._handler(self.ws)
710+
)
711+
if self._lock.locked():
712+
self._lock.release()
713+
return None
677714

678715
async def _handler(self, ws: ClientConnection) -> Union[None, Exception]:
716+
logger.debug("WS handler attached")
679717
recv_task = asyncio.create_task(self._start_receiving(ws))
680718
send_task = asyncio.create_task(self._start_sending(ws))
681719
done, pending = await asyncio.wait(
@@ -685,38 +723,54 @@ async def _handler(self, ws: ClientConnection) -> Union[None, Exception]:
685723
loop = asyncio.get_running_loop()
686724
should_reconnect = False
687725
is_retry = False
726+
688727
for task in pending:
689728
task.cancel()
729+
690730
for task in done:
691731
task_res = task.result()
692-
if isinstance(
693-
task_res, (asyncio.TimeoutError, ConnectionClosed, TimeoutError)
694-
):
732+
733+
# If ConnectionClosedOK, graceful shutdown - don't reconnect
734+
if isinstance(task_res, websockets.exceptions.ConnectionClosedOK):
735+
logger.debug("Graceful shutdown detected, not reconnecting")
736+
return None # Clean exit
737+
738+
# Check for timeout/connection errors that should trigger reconnect
739+
if isinstance(task_res, (asyncio.TimeoutError, TimeoutError, ConnectionClosed)):
695740
should_reconnect = True
741+
logger.debug(f"Reconnection triggered by: {type(task_res).__name__}")
742+
696743
if isinstance(task_res, (asyncio.TimeoutError, TimeoutError)):
697744
self._attempts += 1
698745
is_retry = True
746+
699747
if should_reconnect is True:
700748
if len(self._received_subscriptions) > 0:
701749
return SubstrateRequestException(
702750
f"Unable to reconnect because there are currently open subscriptions."
703751
)
704-
for original_id, payload in list(self._inflight.items()):
705-
self._received[original_id] = loop.create_future()
706-
to_send = json.loads(payload)
707-
logger.debug(f"Resubmitting {to_send}")
708-
await self._sending.put(to_send)
752+
709753
if is_retry:
710-
# Otherwise the connection was just closed due to no activity, which should not count against retries
711754
if self._attempts >= self._max_retries:
755+
logger.error("Max retries exceeded.")
712756
return TimeoutError("Max retries exceeded.")
713757
logger.info(
714758
f"Timeout occurred. Reconnecting. Attempt {self._attempts} of {self._max_retries}"
715759
)
760+
761+
async with self._lock:
762+
for original_id in list(self._inflight.keys()):
763+
payload = self._inflight.pop(original_id)
764+
self._received[original_id] = loop.create_future()
765+
to_send = json.loads(payload)
766+
logger.debug(f"Resubmitting {to_send['id']}")
767+
await self._sending.put(to_send)
768+
769+
logger.debug("Attempting reconnection...")
716770
await self.connect(True)
717-
await self._handler(ws=self.ws)
718-
logger.debug(f"Current send queue size: {self._sending.qsize()}")
719-
return None
771+
logger.debug(f"Reconnected. Send queue size: {self._sending.qsize()}")
772+
# Recursively call handler
773+
return await self._handler(self.ws)
720774
elif isinstance(e := recv_task.result(), Exception):
721775
return e
722776
elif isinstance(e := send_task.result(), Exception):
@@ -753,6 +807,7 @@ async def _exit_with_timer(self):
753807
pass
754808

755809
async def shutdown(self):
810+
logger.debug("Shutdown requested")
756811
try:
757812
await asyncio.wait_for(self._cancel(), timeout=10.0)
758813
except asyncio.TimeoutError:
@@ -766,11 +821,16 @@ async def _recv(self, recd: bytes) -> None:
766821
response = json.loads(recd)
767822
if "id" in response:
768823
async with self._lock:
769-
self._inflight.pop(response["id"])
770-
with suppress(KeyError):
771-
# These would be subscriptions that were unsubscribed
824+
inflight_item = self._inflight.pop(response["id"], None)
825+
if inflight_item is not None:
826+
logger.debug(f"Popped {response['id']} from inflight")
827+
else:
828+
logger.debug(
829+
f"Received response for {response['id']} which is no longer inflight (likely reconnection)"
830+
)
831+
if self._received.get(response["id"]) is not None:
772832
self._received[response["id"]].set_result(response)
773-
self._in_use_ids.remove(response["id"])
833+
self._in_use_ids.discard(response["id"])
774834
elif "params" in response:
775835
sub_id = response["params"]["subscription"]
776836
if sub_id not in self._received_subscriptions:
@@ -780,39 +840,43 @@ async def _recv(self, recd: bytes) -> None:
780840
raise KeyError(response)
781841

782842
async def _start_receiving(self, ws: ClientConnection) -> Exception:
843+
logger.debug("Starting receiving task")
783844
try:
784845
while True:
785846
recd = await self._wait_with_activity_timeout(
786-
ws.recv(decode=False),
787-
self.retry_timeout
847+
ws.recv(decode=False), self.retry_timeout
788848
)
789849
await self._reset_activity_timer()
790850
# reset the counter once we successfully receive something back
791851
self._attempts = 0
792852
await self._recv(recd)
853+
except websockets.exceptions.ConnectionClosedOK as e:
854+
logger.debug("ConnectionClosedOK")
855+
return e
793856
except Exception as e:
794-
logger.exception("Maybe timeout? 738", exc_info=e)
857+
logger.exception("Receiving exception", exc_info=e)
795858
if isinstance(e, ssl.SSLError):
796859
e = ConnectionClosed
797860
if not isinstance(
798-
e, (asyncio.TimeoutError, TimeoutError, ConnectionClosed)
861+
e, (asyncio.TimeoutError, TimeoutError, ConnectionClosed)
799862
):
800863
logger.exception("Websocket receiving exception", exc_info=e)
801864
for fut in self._received.values():
802865
if not fut.done():
803866
fut.set_exception(e)
804867
fut.cancel()
805-
elif isinstance(e, websockets.exceptions.ConnectionClosedOK):
806-
logger.debug("Websocket connection closed.")
807868
else:
808-
logger.debug(f"Timeout occurred.")
869+
logger.debug(f"Timeout/ConnectionClosed occurred.")
809870
return e
810871

811872
async def _start_sending(self, ws) -> Exception:
873+
logger.debug("Starting sending task")
812874
to_send = None
813875
try:
814876
while True:
877+
logger.debug(f"_sending, {self._sending.qsize()}")
815878
to_send_ = await self._sending.get()
879+
logger.debug("Retrieved item from sending queue")
816880
self._sending.task_done()
817881
send_id = to_send_["id"]
818882
to_send = json.dumps(to_send_)
@@ -821,6 +885,7 @@ async def _start_sending(self, ws) -> Exception:
821885
if self._log_raw_websockets:
822886
raw_websocket_logger.debug(f"WEBSOCKET_SEND> {to_send}")
823887
await ws.send(to_send)
888+
logger.debug("Sent to websocket")
824889
await self._reset_activity_timer()
825890
except Exception as e:
826891
logger.exception("Maybe timeout? 769", exc_info=e)
@@ -2529,6 +2594,8 @@ async def _make_rpc_request(
25292594

25302595
if request_manager.is_complete:
25312596
break
2597+
else:
2598+
await asyncio.sleep(0.2)
25322599

25332600
return request_manager.get_results()
25342601

0 commit comments

Comments
 (0)