|
1 | 1 | import asyncio |
2 | 2 | from abc import abstractmethod |
| 3 | +from operator import attrgetter |
3 | 4 | from typing import ( |
4 | 5 | AsyncGenerator, |
5 | | - Set, |
6 | 6 | Tuple, |
7 | 7 | Union, |
8 | 8 | cast, |
@@ -81,7 +81,7 @@ def __init__(self, |
81 | 81 | # pending queue size should be big enough to avoid starving the processing consumers, but |
82 | 82 | # small enough to avoid wasteful over-requests before post-processing can happen |
83 | 83 | max_pending_headers = ETHPeer.max_headers_fetch * 8 |
84 | | - self.header_queue = TaskQueue(max_pending_headers, lambda header: header.block_number) |
| 84 | + self.header_queue = TaskQueue(max_pending_headers, attrgetter('block_number')) |
85 | 85 |
|
86 | 86 | @property |
87 | 87 | def msg_queue_maxsize(self) -> int: |
@@ -168,7 +168,7 @@ async def _sync(self, peer: HeaderRequestingPeer) -> None: |
168 | 168 | return |
169 | 169 |
|
170 | 170 | self.logger.info("Starting sync with %s", peer) |
171 | | - last_received_header = None |
| 171 | + last_received_header: BlockHeader = None |
172 | 172 | # When we start the sync with a peer, we always request up to MAX_REORG_DEPTH extra |
173 | 173 | # headers before our current head's number, in case there were chain reorgs since the last |
174 | 174 | # time _sync() was called. All of the extra headers that are already present in our DB |
@@ -238,10 +238,21 @@ async def _sync(self, peer: HeaderRequestingPeer) -> None: |
238 | 238 | # Setting the latest header hash for the peer, before queuing header processing tasks |
239 | 239 | self._target_header_hash = peer.head_hash |
240 | 240 |
|
241 | | - await self.header_queue.add(headers) |
| 241 | + unrequested_headers = tuple(h for h in headers if h not in self.header_queue) |
| 242 | + await self.header_queue.add(unrequested_headers) |
242 | 243 | last_received_header = headers[-1] |
243 | 244 | start_at = last_received_header.block_number + 1 |
244 | 245 |
|
| 246 | + # erase any pending tasks, to restart on next _sync() run |
| 247 | + try: |
| 248 | + batch_id, pending_tasks = self.header_queue.get_nowait() |
| 249 | + except asyncio.QueueFull: |
| 250 | + # nothing pending, continue |
| 251 | + pass |
| 252 | + else: |
| 253 | + # fully remove pending tasks from queue |
| 254 | + self.header_queue.complete(batch_id, pending_tasks) |
| 255 | + |
245 | 256 | async def _fetch_missing_headers( |
246 | 257 | self, peer: HeaderRequestingPeer, start_at: int) -> Tuple[BlockHeader, ...]: |
247 | 258 | """Fetch a batch of headers starting at start_at and return the ones we're missing.""" |
|
0 commit comments