@@ -636,7 +636,7 @@ async def _handler(self, ws: ClientConnection) -> None:
636636 [recv_task , send_task ],
637637 return_when = asyncio .FIRST_COMPLETED ,
638638 )
639- logger .debug ("send/recv tasks done. " )
639+ logger .debug (f "send/recv tasks done: { done } \n { pending } " )
640640 loop = asyncio .get_running_loop ()
641641 should_reconnect = False
642642 for task in pending :
@@ -652,6 +652,10 @@ async def _handler(self, ws: ClientConnection) -> None:
652652 logger .info ("Timeout occurred. Reconnecting." )
653653 await self .connect (True )
654654 await self ._handler (ws = ws )
655+ elif isinstance (e := recv_task .result (), Exception ):
656+ return e
657+ elif isinstance (e := send_task .result (), Exception ):
658+ return e
655659
656660 async def __aexit__ (self , exc_type , exc_val , exc_tb ):
657661 if not self .state != State .CONNECTING :
@@ -699,26 +703,25 @@ async def _recv(self, recd: bytes) -> None:
699703 elif "params" in response :
700704 # TODO self._inflight won't work with subscriptions
701705 sub_id = response ["params" ]["subscription" ]
706+ logger .debug (f"Adding { sub_id } to subscriptions." )
702707 await self ._received_subscriptions [sub_id ].put (response )
703708 else :
704709 raise KeyError (response )
705710
706711 async def _start_receiving (self , ws : ClientConnection ) -> Exception :
707712 try :
708713 while True :
709- if self ._inflight :
710- recd = await asyncio .wait_for (ws .recv (decode = False ), timeout = self .retry_timeout )
711- await self ._recv (recd )
712- else :
713- await asyncio .sleep (0.1 )
714+ recd = await asyncio .wait_for (ws .recv (decode = False ), timeout = self .retry_timeout )
715+ await self ._recv (recd )
714716 except Exception as e :
717+ logger .exception ("Start receving exception" , exc_info = e )
715718 if isinstance (e , ssl .SSLError ):
716719 e = ConnectionClosed
717720 for fut in self ._received .values ():
718721 if not fut .done ():
719722 fut .set_exception (e )
720723 fut .cancel ()
721- return
724+ return e
722725
723726 async def _start_sending (self , ws ) -> Exception :
724727 to_send = None
@@ -742,9 +745,10 @@ async def _start_sending(self, ws) -> Exception:
742745 for i in self ._received .keys ():
743746 self ._received [i ].set_exception (e )
744747 self ._received [i ].cancel ()
745- return
748+ return e
746749
747750 async def add_subscription (self , subscription_id : str ) -> None :
751+ logger .debug (f"Adding { subscription_id } to subscriptions." )
748752 self ._received_subscriptions [subscription_id ] = asyncio .Queue ()
749753
750754 async def send (self , payload : dict ) -> str :
@@ -770,6 +774,22 @@ async def send(self, payload: dict) -> str:
770774 logger .debug ("767 queue put" )
771775 return original_id
772776
777+ async def unsubscribe (self , subscription_id : str ) -> None :
778+ """
779+ Unwatches a watched extrinsic subscription.
780+
781+ Args:
782+ subscription_id: the internal ID of the subscription (typically a hex string)
783+ """
784+ async with self ._lock :
785+ original_id = get_next_id ()
786+ while original_id in self ._in_use_ids :
787+ original_id = get_next_id ()
788+ del self ._received_subscriptions [subscription_id ]
789+
790+ to_send = {"jsonrpc" : "2.0" , "method" : "author_unwatchExtrinsic" , "params" : [subscription_id ]}
791+ await self ._sending .put (to_send )
792+
773793 async def retrieve (self , item_id : str ) -> Optional [dict ]:
774794 """
775795 Retrieves a single item from received responses dict queue
@@ -789,9 +809,11 @@ async def retrieve(self, item_id: str) -> Optional[dict]:
789809 else :
790810 try :
791811 return self ._received_subscriptions [item_id ].get_nowait ()
792- # TODO make sure to delete during unsubscribe
793812 except asyncio .QueueEmpty :
794813 pass
814+ if self ._send_recv_task .done ():
815+ if isinstance (e := self ._send_recv_task .result (), Exception ):
816+ raise e
795817 await asyncio .sleep (0.1 )
796818 return None
797819
@@ -3776,10 +3798,8 @@ async def result_handler(message: dict, subscription_id) -> tuple[dict, bool]:
37763798 }
37773799
37783800 if "finalized" in message_result and wait_for_finalization :
3779- # Created as a task because we don't actually care about the result
3780- self ._forgettable_task = asyncio .create_task (
3781- self .rpc_request ("author_unwatchExtrinsic" , [subscription_id ])
3782- )
3801+ async with self .ws as ws :
3802+ await ws .unsubscribe (subscription_id )
37833803 return {
37843804 "block_hash" : message_result ["finalized" ],
37853805 "extrinsic_hash" : "0x{}" .format (extrinsic .extrinsic_hash .hex ()),
@@ -3790,10 +3810,8 @@ async def result_handler(message: dict, subscription_id) -> tuple[dict, bool]:
37903810 and wait_for_inclusion
37913811 and not wait_for_finalization
37923812 ):
3793- # Created as a task because we don't actually care about the result
3794- self ._forgettable_task = asyncio .create_task (
3795- self .rpc_request ("author_unwatchExtrinsic" , [subscription_id ])
3796- )
3813+ async with self .ws as ws :
3814+ await ws .unsubscribe (subscription_id )
37973815 return {
37983816 "block_hash" : message_result ["inblock" ],
37993817 "extrinsic_hash" : "0x{}" .format (extrinsic .extrinsic_hash .hex ()),
0 commit comments