@@ -604,7 +604,7 @@ async def _cancel(self):
604604 )
605605
606606 async def connect (self , force = False ):
607- # TODO after connecting, move from _inflight to the queue
607+ logger . debug ( "Connecting." )
608608 now = await self .loop_time ()
609609 self .last_received = now
610610 self .last_sent = now
@@ -620,25 +620,30 @@ async def connect(self, force=False):
620620 self .ws = await asyncio .wait_for (
621621 connect (self .ws_url , ** self ._options ), timeout = 10.0
622622 )
623+ logger .debug ("Connected." )
623624 if self ._send_recv_task is None or self ._send_recv_task .done ():
624625 self ._send_recv_task = asyncio .get_running_loop ().create_task (
625626 self ._handler (self .ws )
626627 )
628+ logger .debug ("Recd task started." )
627629 self ._initialized = True
628630
629631 async def _handler (self , ws : ClientConnection ) -> None :
630632 recv_task = asyncio .create_task (self ._start_receiving (ws ))
631633 send_task = asyncio .create_task (self ._start_sending (ws ))
634+ logger .debug ("Starting send/recv tasks." )
632635 done , pending = await asyncio .wait (
633636 [recv_task , send_task ],
634637 return_when = asyncio .FIRST_COMPLETED ,
635638 )
639+ logger .debug ("send/recv tasks done." )
636640 loop = asyncio .get_running_loop ()
637641 should_reconnect = False
638642 for task in pending :
639643 task .cancel ()
640- if isinstance (task .exception (), asyncio .TimeoutError ):
641- should_reconnect = True
644+ if isinstance (recv_task .exception (), asyncio .TimeoutError ):
645+ # TODO check the logic here
646+ should_reconnect = True
642647 if should_reconnect is True :
643648 for original_id , payload in list (self ._inflight .items ()):
644649 self ._received [original_id ] = loop .create_future ()
@@ -648,7 +653,6 @@ async def _handler(self, ws: ClientConnection) -> None:
648653 await self .connect (True )
649654 await self ._handler (ws = ws )
650655
651-
652656 async def __aexit__ (self , exc_type , exc_val , exc_tb ):
653657 if not self .state != State .CONNECTING :
654658 if self ._exit_task is not None :
@@ -705,6 +709,8 @@ async def _start_receiving(self, ws: ClientConnection) -> Exception:
705709 if self ._inflight :
706710 recd = await asyncio .wait_for (ws .recv (decode = False ), timeout = self .retry_timeout )
707711 await self ._recv (recd )
712+ else :
713+ await asyncio .sleep (0.1 )
708714 except Exception as e :
709715 if isinstance (e , ssl .SSLError ):
710716 e = ConnectionClosed
@@ -719,6 +725,7 @@ async def _start_sending(self, ws) -> Exception:
719725 try :
720726 while True :
721727 to_send_ = await self ._sending .get ()
728+ logger .debug (f"Pulled { to_send_ } from the queue" )
722729 send_id = to_send_ ["id" ]
723730 to_send = json .dumps (to_send_ )
724731 async with self ._lock :
@@ -751,6 +758,7 @@ async def send(self, payload: dict) -> str:
751758 id: the internal ID of the request (incremented int)
752759 """
753760 await self .max_subscriptions .acquire ()
761+ logger .debug (f"Sending payload: { payload } " )
754762 async with self ._lock :
755763 original_id = get_next_id ()
756764 while original_id in self ._in_use_ids :
@@ -759,6 +767,7 @@ async def send(self, payload: dict) -> str:
759767 self ._received [original_id ] = asyncio .get_running_loop ().create_future ()
760768 to_send = {** payload , ** {"id" : original_id }}
761769 await self ._sending .put (to_send )
770+ logger .debug ("767 queue put" )
762771 return original_id
763772
764773 async def retrieve (self , item_id : str ) -> Optional [dict ]:
@@ -2320,7 +2329,6 @@ async def _make_rpc_request(
23202329 # TODO use that to determine when it's completed. But how would this work with subscriptions?
23212330
23222331 subscription_added = False
2323- should_retry = False
23242332
23252333 async with self .ws as ws :
23262334 for payload in payloads :
@@ -2333,71 +2341,40 @@ async def _make_rpc_request(
23332341 item_id not in request_manager .responses
23342342 or asyncio .iscoroutinefunction (result_handler )
23352343 ):
2336- try :
2337- if response := await ws .retrieve (item_id ):
2338- if (
2339- asyncio .iscoroutinefunction (result_handler )
2340- and not subscription_added
2341- ):
2342- # handles subscriptions, overwrites the previous mapping of {item_id : payload_id}
2343- # with {subscription_id : payload_id}
2344- try :
2345- item_id = request_manager .overwrite_request (
2346- item_id , response ["result" ]
2347- )
2348- await ws .add_subscription (response ["result" ])
2349- subscription_added = True
2350- except KeyError :
2351- raise SubstrateRequestException (str (response ))
2352- (
2353- decoded_response ,
2354- complete ,
2355- ) = await self ._process_response (
2356- response ,
2357- item_id ,
2358- value_scale_type ,
2359- storage_item ,
2360- result_handler ,
2361- runtime = runtime ,
2362- force_legacy_decode = force_legacy_decode ,
2363- )
2344+ if response := await ws .retrieve (item_id ):
2345+ if (
2346+ asyncio .iscoroutinefunction (result_handler )
2347+ and not subscription_added
2348+ ):
2349+ # handles subscriptions, overwrites the previous mapping of {item_id : payload_id}
2350+ # with {subscription_id : payload_id}
2351+ try :
2352+ item_id = request_manager .overwrite_request (
2353+ item_id , response ["result" ]
2354+ )
2355+ await ws .add_subscription (response ["result" ])
2356+ subscription_added = True
2357+ except KeyError :
2358+ raise SubstrateRequestException (str (response ))
2359+ (
2360+ decoded_response ,
2361+ complete ,
2362+ ) = await self ._process_response (
2363+ response ,
2364+ item_id ,
2365+ value_scale_type ,
2366+ storage_item ,
2367+ result_handler ,
2368+ runtime = runtime ,
2369+ force_legacy_decode = force_legacy_decode ,
2370+ )
23642371
2365- request_manager .add_response (
2366- item_id , decoded_response , complete
2367- )
2368- except ConnectionClosed :
2369- should_retry = True
2372+ request_manager .add_response (
2373+ item_id , decoded_response , complete
2374+ )
23702375
23712376 if request_manager .is_complete :
23722377 break
2373- # TODO I sometimes get timeouts immediately. Why?
2374- if should_retry or (
2375- (current_time := await ws .loop_time ()) - ws .last_received
2376- >= self .retry_timeout
2377- and current_time - ws .last_sent >= self .retry_timeout
2378- ):
2379- # TODO this retry logic should really live inside the Websocket
2380- if attempt >= self .max_retries :
2381- logger .error (
2382- f"Timed out waiting for RPC requests { attempt } times. Exiting."
2383- )
2384- raise MaxRetriesExceeded ("Max retries reached." )
2385- else :
2386- self .ws .last_received = await ws .loop_time ()
2387- await self .ws .connect (force = True )
2388- logger .warning (
2389- f"Timed out waiting for RPC requests. "
2390- f"Retrying attempt { attempt + 1 } of { self .max_retries } "
2391- )
2392- return await self ._make_rpc_request (
2393- payloads = payloads ,
2394- value_scale_type = value_scale_type ,
2395- storage_item = storage_item ,
2396- result_handler = result_handler ,
2397- attempt = attempt + 1 ,
2398- runtime = runtime ,
2399- force_legacy_decode = force_legacy_decode ,
2400- )
24012378
24022379 return request_manager .get_results ()
24032380
0 commit comments