@@ -69,7 +69,7 @@ class ResponseCandidateStream(
6969 #
7070 @property
7171 def subscription_msg_types (self ) -> Set [Type [Command ]]:
72- return {self ._response_msg_type }
72+ return {self .response_msg_type }
7373
7474 msg_queue_maxsize = 100
7575
@@ -87,7 +87,7 @@ def __init__(
8787 super ().__init__ (token )
8888 self ._peer = peer
8989 self .response_times = ResponseTimeTracker ()
90- self ._response_msg_type = response_msg_type
90+ self .response_msg_type = response_msg_type
9191
9292 async def payload_candidates (
9393 self ,
@@ -105,7 +105,7 @@ async def payload_candidates(
105105
106106 @property
107107 def response_msg_name (self ) -> str :
108- return self ._response_msg_type .__name__
108+ return self .response_msg_type .__name__
109109
110110 def complete_request (self , item_count : int ) -> None :
111111 self .pending_request = None
@@ -123,7 +123,7 @@ async def _run(self) -> None:
123123 if peer != self ._peer :
124124 self .logger .error ("Unexpected peer: %s expected: %s" , peer , self ._peer )
125125 continue
126- elif isinstance (cmd , self ._response_msg_type ):
126+ elif isinstance (cmd , self .response_msg_type ):
127127 await self ._handle_msg (cast (TResponsePayload , msg ))
128128 else :
129129 self .logger .warning ("Unexpected payload type: %s" , cmd .__class__ .__name__ )
@@ -176,8 +176,8 @@ def _request(self, request: BaseRequest[TRequestPayload]) -> None:
176176 def _is_pending (self ) -> bool :
177177 return self .pending_request is not None
178178
179- def get_stats (self ) -> str :
180- return '%s: %s' % (self .response_msg_name , self .response_times .get_stats ())
179+ def get_stats (self ) -> Tuple [ str , str ] :
180+ return (self .response_msg_name , self .response_times .get_stats ())
181181
182182
183183class ExchangeManager (Generic [TRequestPayload , TResponsePayload , TResult ]):
@@ -186,14 +186,16 @@ class ExchangeManager(Generic[TRequestPayload, TResponsePayload, TResult]):
186186 def __init__ (
187187 self ,
188188 peer : BasePeer ,
189+ listening_for : Type [Command ],
189190 cancel_token : CancelToken ) -> None :
190191 self ._peer = peer
191192 self ._cancel_token = cancel_token
193+ self ._response_command_type = listening_for
192194
193- async def launch_service (self , listening_for : Type [ Command ] ) -> None :
195+ async def launch_service (self ) -> None :
194196 self ._response_stream = ResponseCandidateStream (
195197 self ._peer ,
196- listening_for ,
198+ self . _response_command_type ,
197199 self ._cancel_token ,
198200 )
199201 self ._peer .run_daemon (self ._response_stream )
@@ -248,5 +250,8 @@ def service(self) -> BaseService:
248250 """
249251 return self ._response_stream
250252
251- def get_stats (self ) -> str :
252- return self ._response_stream .get_stats ()
253+ def get_stats (self ) -> Tuple [str , str ]:
254+ if self ._response_stream is None :
255+ return (self ._response_command_type .__name__ , 'Uninitialized' )
256+ else :
257+ return self ._response_stream .get_stats ()
0 commit comments