|
17 | 17 | ValidationError, |
18 | 18 | ) |
19 | 19 |
|
| 20 | +from p2p.exceptions import PeerConnectionLost |
20 | 21 | from p2p.peer import BasePeer, PeerSubscriber |
21 | 22 | from p2p.protocol import ( |
22 | 23 | BaseRequest, |
@@ -134,7 +135,7 @@ async def _run(self) -> None: |
134 | 135 | async def _handle_msg(self, msg: TResponsePayload) -> None: |
135 | 136 | if self.pending_request is None: |
136 | 137 | self.logger.debug( |
137 | | - "Got unexpected %s payload from %", self.response_msg_name, self._peer |
| 138 | + "Got unexpected %s payload from %s", self.response_msg_name, self._peer |
138 | 139 | ) |
139 | 140 | return |
140 | 141 |
|
@@ -179,15 +180,24 @@ def _request(self, request: BaseRequest[TRequestPayload]) -> None: |
179 | 180 | def _is_pending(self) -> bool: |
180 | 181 | return self.pending_request is not None |
181 | 182 |
|
| 183 | + async def _cleanup(self) -> None: |
| 184 | + if self.pending_request is not None: |
| 185 | + self.logger.debug("Stream shutting down, raising an exception on the pending request") |
| 186 | + _, future = self.pending_request |
| 187 | + future.set_exception(PeerConnectionLost("Pending request can't complete: peer is gone")) |
| 188 | + |
182 | 189 | def deregister_peer(self, peer: BasePeer) -> None: |
183 | 190 | if self.pending_request is not None: |
184 | | - self.logger.debug("Peer disconnected, trigger a timeout on the pending request") |
| 191 | + self.logger.debug("Peer disconnected, raising an exception on the pending request") |
185 | 192 | _, future = self.pending_request |
186 | | - future.set_exception(TimeoutError("Peer disconnected, simulating inevitable timeout")) |
| 193 | + future.set_exception(PeerConnectionLost("Pending request can't complete: peer is gone")) |
187 | 194 |
|
188 | 195 | def get_stats(self) -> Tuple[str, str]: |
189 | 196 | return (self.response_msg_name, self.response_times.get_stats()) |
190 | 197 |
|
| 198 | + def __repr__(self) -> str: |
| 199 | + return f'<ResponseCandidateStream({self._peer!s}, {self.response_msg_type!r})>' |
| 200 | + |
191 | 201 |
|
192 | 202 | class ExchangeManager(Generic[TRequestPayload, TResponsePayload, TResult]): |
193 | 203 | _response_stream: ResponseCandidateStream[TRequestPayload, TResponsePayload] = None |
|
0 commit comments