@@ -664,30 +664,28 @@ async def shutdown(self):
664664 self ._is_closing = False
665665
666666 async def _recv (self , recd ) -> None :
667- try :
668- if self ._log_raw_websockets :
669- raw_websocket_logger .debug (f"WEBSOCKET_RECEIVE> { recd } " )
670- response = json .loads (recd )
671- self .last_received = await self .loop_time ()
672- if "id" in response :
673- self ._received [response ["id" ]].set_result (response )
674- self ._in_use_ids .remove (response ["id" ])
675- elif "params" in response :
676- self ._received [response ["params" ]["subscription" ]].set_result (response )
677- else :
678- raise KeyError (response )
679- except ssl .SSLError :
680- raise ConnectionClosed
681- except (ConnectionClosed , KeyError ):
682- raise
667+ if self ._log_raw_websockets :
668+ raw_websocket_logger .debug (f"WEBSOCKET_RECEIVE> { recd } " )
669+ response = json .loads (recd )
670+ self .last_received = await self .loop_time ()
671+ if "id" in response :
672+ self ._received [response ["id" ]].set_result (response )
673+ self ._in_use_ids .remove (response ["id" ])
674+ elif "params" in response :
675+ self ._received [response ["params" ]["subscription" ]].set_result (response )
676+ else :
677+ raise KeyError (response )
683678
684679 async def _start_receiving (self , ws : ClientConnection ) -> Exception :
685680 try :
686681 async for recd in ws :
687682 await self ._recv (recd )
688683 except Exception as e :
684+ if isinstance (e , ssl .SSLError ):
685+ e = ConnectionClosed
689686 for i in self ._received .keys ():
690687 self ._received [i ].set_exception (e )
688+ self ._received [i ].cancel ()
691689 return
692690
693691 async def _start_sending (self , ws ) -> Exception :
@@ -702,9 +700,11 @@ async def _start_sending(self, ws) -> Exception:
702700 except Exception as e :
703701 if to_send is not None :
704702 self ._received [to_send ["id" ]].set_exception (e )
703+ self ._received [to_send ["id" ]].cancel ()
705704 else :
706705 for i in self ._received .keys ():
707706 self ._received [i ].set_exception (e )
707+ self ._received [i ].cancel ()
708708 return
709709
710710 async def send (self , payload : dict ) -> str :
@@ -2270,6 +2270,7 @@ async def _make_rpc_request(
22702270 request_manager = RequestManager (payloads )
22712271
22722272 subscription_added = False
2273+ should_retry = False
22732274
22742275 async with self .ws as ws :
22752276 for payload in payloads :
@@ -2282,37 +2283,43 @@ async def _make_rpc_request(
22822283 item_id not in request_manager .responses
22832284 or asyncio .iscoroutinefunction (result_handler )
22842285 ):
2285- if response := await ws .retrieve (item_id ):
2286- if (
2287- asyncio .iscoroutinefunction (result_handler )
2288- and not subscription_added
2289- ):
2290- # handles subscriptions, overwrites the previous mapping of {item_id : payload_id}
2291- # with {subscription_id : payload_id}
2292- try :
2293- item_id = request_manager .overwrite_request (
2294- item_id , response ["result" ]
2295- )
2296- subscription_added = True
2297- except KeyError :
2298- raise SubstrateRequestException (str (response ))
2299- decoded_response , complete = await self ._process_response (
2300- response ,
2301- item_id ,
2302- value_scale_type ,
2303- storage_item ,
2304- result_handler ,
2305- runtime = runtime ,
2306- force_legacy_decode = force_legacy_decode ,
2307- )
2286+ try :
2287+ if response := await ws .retrieve (item_id ):
2288+ if (
2289+ asyncio .iscoroutinefunction (result_handler )
2290+ and not subscription_added
2291+ ):
2292+ # handles subscriptions, overwrites the previous mapping of {item_id : payload_id}
2293+ # with {subscription_id : payload_id}
2294+ try :
2295+ item_id = request_manager .overwrite_request (
2296+ item_id , response ["result" ]
2297+ )
2298+ subscription_added = True
2299+ except KeyError :
2300+ raise SubstrateRequestException (str (response ))
2301+ (
2302+ decoded_response ,
2303+ complete ,
2304+ ) = await self ._process_response (
2305+ response ,
2306+ item_id ,
2307+ value_scale_type ,
2308+ storage_item ,
2309+ result_handler ,
2310+ runtime = runtime ,
2311+ force_legacy_decode = force_legacy_decode ,
2312+ )
23082313
2309- request_manager .add_response (
2310- item_id , decoded_response , complete
2311- )
2314+ request_manager .add_response (
2315+ item_id , decoded_response , complete
2316+ )
2317+ except ConnectionClosed :
2318+ should_retry = True
23122319
23132320 if request_manager .is_complete :
23142321 break
2315- if (
2322+ if should_retry or (
23162323 (current_time := await ws .loop_time ()) - ws .last_received
23172324 >= self .retry_timeout
23182325 and current_time - ws .last_sent >= self .retry_timeout
0 commit comments