|
36 | 36 | from p2p import protocol |
37 | 37 | from p2p import eth |
38 | 38 | from p2p import les |
39 | | -from p2p.cancel_token import CancelToken |
| 39 | +from p2p.cancel_token import CancelToken, wait_with_token |
40 | 40 | from p2p.constants import MAX_REORG_DEPTH |
41 | 41 | from p2p.exceptions import NoEligiblePeers, OperationCancelled |
42 | 42 | from p2p.peer import BasePeer, ETHPeer, LESPeer, PeerPool, PeerPoolSubscriber |
@@ -255,56 +255,6 @@ def _handle_block_headers(self, headers: Tuple[BlockHeader, ...]) -> None: |
255 | 255 | "Got BlockHeaders from %d to %d", headers[0].block_number, headers[-1].block_number) |
256 | 256 | self._new_headers.put_nowait(headers) |
257 | 257 |
|
258 | | - async def _get_block_numbers_for_request( |
259 | | - self, block_number_or_hash: Union[int, bytes], max_headers: int, skip: int, |
260 | | - reverse: bool) -> List[BlockNumber]: |
261 | | - """ |
262 | | - Generates the block numbers requested, subject to local availability. |
263 | | - """ |
264 | | - block_number_or_hash = block_number_or_hash |
265 | | - if isinstance(block_number_or_hash, bytes): |
266 | | - header = await self.wait( |
267 | | - self.db.coro_get_block_header_by_hash(cast(Hash32, block_number_or_hash)), |
268 | | - ) |
269 | | - block_number = header.block_number |
270 | | - elif isinstance(block_number_or_hash, int): |
271 | | - block_number = block_number_or_hash |
272 | | - else: |
273 | | - raise TypeError( |
274 | | - "Unexpected type for 'block_number_or_hash': %s", |
275 | | - type(block_number_or_hash), |
276 | | - ) |
277 | | - |
278 | | - limit = max(max_headers, eth.MAX_HEADERS_FETCH) |
279 | | - step = skip + 1 |
280 | | - if reverse: |
281 | | - low = max(0, block_number - limit) |
282 | | - high = block_number + 1 |
283 | | - block_numbers = reversed(range(low, high, step)) |
284 | | - else: |
285 | | - low = block_number |
286 | | - high = block_number + limit |
287 | | - block_numbers = iter(range(low, high, step)) # mypy thinks range isn't iterable |
288 | | - return list(block_numbers) |
289 | | - |
290 | | - async def _generate_available_headers( |
291 | | - self, |
292 | | - block_numbers: List[BlockNumber]) -> AsyncGenerator[BlockHeader, None]: |
293 | | - """ |
294 | | - Generates the headers requested, halting on the first header that is not locally available. |
295 | | - """ |
296 | | - for block_num in block_numbers: |
297 | | - try: |
298 | | - yield await self.wait( |
299 | | - self.db.coro_get_canonical_block_header_by_number(block_num) |
300 | | - ) |
301 | | - except HeaderNotFound: |
302 | | - self.logger.debug( |
303 | | - "Peer requested header number %s that is unavailable, stopping search.", |
304 | | - block_num, |
305 | | - ) |
306 | | - break |
307 | | - |
308 | 258 | @abstractmethod |
309 | 259 | async def _handle_msg(self, peer: HeaderRequestingPeer, cmd: protocol.Command, |
310 | 260 | msg: protocol._DecodedMsgType) -> None: |
@@ -335,19 +285,9 @@ async def _handle_msg(self, peer: HeaderRequestingPeer, cmd: protocol.Command, |
335 | 285 | async def _handle_get_block_headers(self, peer: LESPeer, msg: Dict[str, Any]) -> None: |
336 | 286 | self.logger.debug("Peer %s made header request: %s", peer, msg) |
337 | 287 | query = msg['query'] |
338 | | - try: |
339 | | - block_numbers = await self._get_block_numbers_for_request( |
340 | | - query.block_number_or_hash, query.max_headers, |
341 | | - query.skip, query.reverse) |
342 | | - except HeaderNotFound: |
343 | | - self.logger.debug( |
344 | | - "Peer %r requested starting header %r that is unavailable, returning nothing", |
345 | | - peer, |
346 | | - query.block_number_or_hash, |
347 | | - ) |
348 | | - block_numbers = [] |
349 | | - |
350 | | - headers = [header async for header in self._generate_available_headers(block_numbers)] |
| 288 | + headers = await lookup_headers( |
| 289 | + self.db, query.block_number_or_hash, query.max_headers, |
| 290 | + query.skip, query.reverse, self.logger, self.cancel_token) |
351 | 291 | peer.sub_proto.send_block_headers(headers, buffer_value=0, request_id=msg['request_id']) |
352 | 292 |
|
353 | 293 | async def _process_headers( |
@@ -598,19 +538,9 @@ async def _handle_get_block_headers( |
598 | 538 | header_request: Dict[str, Any]) -> None: |
599 | 539 | self.logger.debug("Peer %s made header request: %s", peer, header_request) |
600 | 540 |
|
601 | | - try: |
602 | | - block_numbers = await self._get_block_numbers_for_request( |
603 | | - header_request['block_number_or_hash'], header_request['max_headers'], |
604 | | - header_request['skip'], header_request['reverse']) |
605 | | - except HeaderNotFound: |
606 | | - self.logger.debug( |
607 | | - "Peer %r requested starting header %r that is unavailable, returning nothing", |
608 | | - peer, |
609 | | - header_request['block_number_or_hash'], |
610 | | - ) |
611 | | - block_numbers = [] |
612 | | - |
613 | | - headers = [header async for header in self._generate_available_headers(block_numbers)] |
| 541 | + headers = await lookup_headers( |
| 542 | + self.db, header_request['block_number_or_hash'], header_request['max_headers'], |
| 543 | + header_request['skip'], header_request['reverse'], self.logger, self.cancel_token) |
614 | 544 | peer.sub_proto.send_block_headers(headers) |
615 | 545 |
|
616 | 546 |
|
@@ -749,6 +679,83 @@ def _is_receipts_empty(header: BlockHeader) -> bool: |
749 | 679 | return header.receipt_root == BLANK_ROOT_HASH |
750 | 680 |
|
751 | 681 |
|
| 682 | +async def _get_block_numbers_for_request( |
| 683 | + headerdb: 'AsyncHeaderDB', block_number_or_hash: Union[int, bytes], max_headers: int, |
| 684 | + skip: int, reverse: bool, token: CancelToken) -> List[BlockNumber]: |
| 685 | + """ |
| 686 | + Generates the block numbers requested, subject to local availability. |
| 687 | + """ |
| 688 | + block_number_or_hash = block_number_or_hash |
| 689 | + if isinstance(block_number_or_hash, bytes): |
| 690 | + header = await wait_with_token( |
| 691 | + headerdb.coro_get_block_header_by_hash(cast(Hash32, block_number_or_hash)), |
| 692 | + token=token, |
| 693 | + ) |
| 694 | + block_number = header.block_number |
| 695 | + elif isinstance(block_number_or_hash, int): |
| 696 | + block_number = block_number_or_hash |
| 697 | + else: |
| 698 | + raise TypeError( |
| 699 | + "Unexpected type for 'block_number_or_hash': %s", |
| 700 | + type(block_number_or_hash), |
| 701 | + ) |
| 702 | + |
| 703 | + limit = max(max_headers, eth.MAX_HEADERS_FETCH) |
| 704 | + step = skip + 1 |
| 705 | + if reverse: |
| 706 | + low = max(0, block_number - limit) |
| 707 | + high = block_number + 1 |
| 708 | + block_numbers = reversed(range(low, high, step)) |
| 709 | + else: |
| 710 | + low = block_number |
| 711 | + high = block_number + limit |
| 712 | + block_numbers = iter(range(low, high, step)) # mypy thinks range isn't iterable |
| 713 | + return list(block_numbers) |
| 714 | + |
| 715 | + |
| 716 | +async def _generate_available_headers( |
| 717 | + headerdb: 'AsyncHeaderDB', |
| 718 | + block_numbers: List[BlockNumber], |
| 719 | + logger: logging.Logger, |
| 720 | + token: CancelToken) -> AsyncGenerator[BlockHeader, None]: |
| 721 | + """ |
| 722 | + Generates the headers requested, halting on the first header that is not locally available. |
| 723 | + """ |
| 724 | + for block_num in block_numbers: |
| 725 | + try: |
| 726 | + yield await wait_with_token( |
| 727 | + headerdb.coro_get_canonical_block_header_by_number(block_num), |
| 728 | + token=token |
| 729 | + ) |
| 730 | + except HeaderNotFound: |
| 731 | + logger.debug( |
| 732 | + "Peer requested header number %s that is unavailable, stopping search.", |
| 733 | + block_num, |
| 734 | + ) |
| 735 | + break |
| 736 | + |
| 737 | + |
| 738 | +async def lookup_headers( |
| 739 | + headerdb: 'AsyncHeaderDB', block_number_or_hash: Union[int, bytes], max_headers: int, |
| 740 | + skip: int, reverse: bool, logger: logging.Logger, token: CancelToken) -> List[BlockHeader]: |
| 741 | + """ |
| 742 | + Lookup :max_headers: headers starting at :block_number_or_hash:, skipping :skip: items between |
| 743 | + each, in reverse order if :reverse: is True. |
| 744 | + """ |
| 745 | + try: |
| 746 | + block_numbers = await _get_block_numbers_for_request( |
| 747 | + headerdb, block_number_or_hash, max_headers, skip, reverse, token) |
| 748 | + except HeaderNotFound: |
| 749 | + logger.debug( |
| 750 | + "Peer requested starting header %r that is unavailable, returning nothing", |
| 751 | + block_number_or_hash) |
| 752 | + block_numbers = [] |
| 753 | + |
| 754 | + headers = [header async for header in _generate_available_headers( |
| 755 | + headerdb, block_numbers, logger, token)] |
| 756 | + return headers |
| 757 | + |
| 758 | + |
752 | 759 | def _test() -> None: |
753 | 760 | import argparse |
754 | 761 | import signal |
|
0 commit comments