@@ -692,20 +692,25 @@ async def _recv(self, recd: bytes) -> None:
692692 if "id" in response :
693693 async with self ._lock :
694694 self ._inflight .pop (response ["id" ])
695- self ._received [response ["id" ]].set_result (response )
696- self ._in_use_ids .remove (response ["id" ])
695+ with suppress (KeyError ):
696+ # These would be subscriptions that were unsubscribed
697+ self ._received [response ["id" ]].set_result (response )
698+ self ._in_use_ids .remove (response ["id" ])
697699 elif "params" in response :
698700 # TODO self._inflight won't work with subscriptions
699701 sub_id = response ["params" ]["subscription" ]
700- logger .debug (f"Adding { sub_id } to subscriptions." )
702+ if sub_id not in self ._received_subscriptions :
703+ self ._received_subscriptions [sub_id ] = asyncio .Queue ()
701704 await self ._received_subscriptions [sub_id ].put (response )
702705 else :
703706 raise KeyError (response )
704707
705708 async def _start_receiving (self , ws : ClientConnection ) -> Exception :
706709 try :
707710 while True :
708- recd = await asyncio .wait_for (ws .recv (decode = False ), timeout = self .retry_timeout )
711+ recd = await asyncio .wait_for (
712+ ws .recv (decode = False ), timeout = self .retry_timeout
713+ )
709714 await self ._recv (recd )
710715 except Exception as e :
711716 logger .exception ("Start receving exception" , exc_info = e )
@@ -774,7 +779,12 @@ async def unsubscribe(self, subscription_id: str) -> None:
774779 original_id = get_next_id ()
775780 del self ._received_subscriptions [subscription_id ]
776781
777- to_send = {"jsonrpc" : "2.0" , "method" : "author_unwatchExtrinsic" , "params" : [subscription_id ]}
782+ to_send = {
783+ "jsonrpc" : "2.0" ,
784+ "id" : original_id ,
785+ "method" : "author_unwatchExtrinsic" ,
786+ "params" : [subscription_id ],
787+ }
778788 await self ._sending .put (to_send )
779789
780790 async def retrieve (self , item_id : str ) -> Optional [dict ]:
0 commit comments