99import logging
1010import ssl
1111import warnings
12+ from contextlib import suppress
1213from unittest .mock import AsyncMock
1314from hashlib import blake2b
1415from typing import (
@@ -524,7 +525,7 @@ def __init__(
524525 shutdown_timer = 5 ,
525526 options : Optional [dict ] = None ,
526527 _log_raw_websockets : bool = False ,
527- retry_timeout : float = 60.0
528+ retry_timeout : float = 60.0 ,
528529 ):
529530 """
530531 Websocket manager object. Allows for the use of a single websocket connection by multiple
@@ -604,7 +605,6 @@ async def _cancel(self):
604605 )
605606
606607 async def connect (self , force = False ):
607- logger .debug ("Connecting." )
608608 now = await self .loop_time ()
609609 self .last_received = now
610610 self .last_sent = now
@@ -620,28 +620,24 @@ async def connect(self, force=False):
620620 self .ws = await asyncio .wait_for (
621621 connect (self .ws_url , ** self ._options ), timeout = 10.0
622622 )
623- logger .debug ("Connected." )
624623 if self ._send_recv_task is None or self ._send_recv_task .done ():
625624 self ._send_recv_task = asyncio .get_running_loop ().create_task (
626625 self ._handler (self .ws )
627626 )
628- logger .debug ("Recd task started." )
629627 self ._initialized = True
630628
631629 async def _handler (self , ws : ClientConnection ) -> None :
632630 recv_task = asyncio .create_task (self ._start_receiving (ws ))
633631 send_task = asyncio .create_task (self ._start_sending (ws ))
634- logger .debug ("Starting send/recv tasks." )
635632 done , pending = await asyncio .wait (
636633 [recv_task , send_task ],
637634 return_when = asyncio .FIRST_COMPLETED ,
638635 )
639- logger .debug (f"send/recv tasks done: { done } \n { pending } " )
640636 loop = asyncio .get_running_loop ()
641637 should_reconnect = False
642638 for task in pending :
643639 task .cancel ()
644- if isinstance (recv_task .exception (), asyncio .TimeoutError ):
640+ if isinstance (recv_task .result (), asyncio .TimeoutError ):
645641 # TODO check the logic here
646642 should_reconnect = True
647643 if should_reconnect is True :
@@ -680,15 +676,13 @@ async def _exit_with_timer(self):
680676 pass
681677
682678 async def shutdown (self ):
683- self ._is_closing = True
684679 try :
685680 await asyncio .wait_for (self ._cancel (), timeout = 10.0 )
686681 except asyncio .TimeoutError :
687682 pass
688683 self .ws = None
689684 self ._initialized = False
690685 self ._send_recv_task = None
691- self ._is_closing = False
692686
693687 async def _recv (self , recd : bytes ) -> None :
694688 if self ._log_raw_websockets :
@@ -728,7 +722,6 @@ async def _start_sending(self, ws) -> Exception:
728722 try :
729723 while True :
730724 to_send_ = await self ._sending .get ()
731- logger .debug (f"Pulled { to_send_ } from the queue" )
732725 send_id = to_send_ ["id" ]
733726 to_send = json .dumps (to_send_ )
734727 async with self ._lock :
@@ -747,10 +740,6 @@ async def _start_sending(self, ws) -> Exception:
747740 self ._received [i ].cancel ()
748741 return e
749742
750- async def add_subscription (self , subscription_id : str ) -> None :
751- logger .debug (f"Adding { subscription_id } to subscriptions." )
752- self ._received_subscriptions [subscription_id ] = asyncio .Queue ()
753-
754743 async def send (self , payload : dict ) -> str :
755744 """
756745 Sends a payload to the websocket connection.
@@ -762,7 +751,6 @@ async def send(self, payload: dict) -> str:
762751 id: the internal ID of the request (incremented int)
763752 """
764753 await self .max_subscriptions .acquire ()
765- logger .debug (f"Sending payload: { payload } " )
766754 async with self ._lock :
767755 original_id = get_next_id ()
768756 while original_id in self ._in_use_ids :
@@ -771,7 +759,6 @@ async def send(self, payload: dict) -> str:
771759 self ._received [original_id ] = asyncio .get_running_loop ().create_future ()
772760 to_send = {** payload , ** {"id" : original_id }}
773761 await self ._sending .put (to_send )
774- logger .debug ("767 queue put" )
775762 return original_id
776763
777764 async def unsubscribe (self , subscription_id : str ) -> None :
@@ -2374,7 +2361,6 @@ async def _make_rpc_request(
23742361 item_id = request_manager .overwrite_request (
23752362 item_id , response ["result" ]
23762363 )
2377- await ws .add_subscription (response ["result" ])
23782364 subscription_added = True
23792365 except KeyError :
23802366 raise SubstrateRequestException (str (response ))
0 commit comments