3232from evm .rlp .headers import BlockHeader
3333from evm .rlp .receipts import Receipt
3434from evm .rlp .transactions import BaseTransaction , BaseTransactionFields
35+ from evm .utils .logging import TraceLogger
3536
3637from p2p import protocol
3738from p2p import eth
3839from p2p import les
39- from p2p .cancel_token import CancelToken
40+ from p2p .cancel_token import CancellableMixin , CancelToken
4041from p2p .constants import MAX_REORG_DEPTH
4142from p2p .exceptions import NoEligiblePeers , OperationCancelled
4243from p2p .peer import BasePeer , ETHPeer , LESPeer , PeerPool , PeerPoolSubscriber
@@ -79,6 +80,7 @@ def __init__(self,
7980 self .chain = chain
8081 self .db = db
8182 self .peer_pool = peer_pool
83+ self ._handler = PeerRequestHandler (self .db , self .logger , self .cancel_token )
8284 self ._syncing = False
8385 self ._sync_complete = asyncio .Event ()
8486 self ._sync_requests : asyncio .Queue [HeaderRequestingPeer ] = asyncio .Queue ()
@@ -255,56 +257,6 @@ def _handle_block_headers(self, headers: Tuple[BlockHeader, ...]) -> None:
255257 "Got BlockHeaders from %d to %d" , headers [0 ].block_number , headers [- 1 ].block_number )
256258 self ._new_headers .put_nowait (headers )
257259
258- async def _get_block_numbers_for_request (
259- self , block_number_or_hash : Union [int , bytes ], max_headers : int , skip : int ,
260- reverse : bool ) -> List [BlockNumber ]:
261- """
262- Generates the block numbers requested, subject to local availability.
263- """
264- block_number_or_hash = block_number_or_hash
265- if isinstance (block_number_or_hash , bytes ):
266- header = await self .wait (
267- self .db .coro_get_block_header_by_hash (cast (Hash32 , block_number_or_hash )),
268- )
269- block_number = header .block_number
270- elif isinstance (block_number_or_hash , int ):
271- block_number = block_number_or_hash
272- else :
273- raise TypeError (
274- "Unexpected type for 'block_number_or_hash': %s" ,
275- type (block_number_or_hash ),
276- )
277-
278- limit = max (max_headers , eth .MAX_HEADERS_FETCH )
279- step = skip + 1
280- if reverse :
281- low = max (0 , block_number - limit )
282- high = block_number + 1
283- block_numbers = reversed (range (low , high , step ))
284- else :
285- low = block_number
286- high = block_number + limit
287- block_numbers = iter (range (low , high , step )) # mypy thinks range isn't iterable
288- return list (block_numbers )
289-
290- async def _generate_available_headers (
291- self ,
292- block_numbers : List [BlockNumber ]) -> AsyncGenerator [BlockHeader , None ]:
293- """
294- Generates the headers requested, halting on the first header that is not locally available.
295- """
296- for block_num in block_numbers :
297- try :
298- yield await self .wait (
299- self .db .coro_get_canonical_block_header_by_number (block_num )
300- )
301- except HeaderNotFound :
302- self .logger .debug (
303- "Peer requested header number %s that is unavailable, stopping search." ,
304- block_num ,
305- )
306- break
307-
308260 @abstractmethod
309261 async def _handle_msg (self , peer : HeaderRequestingPeer , cmd : protocol .Command ,
310262 msg : protocol ._DecodedMsgType ) -> None :
@@ -335,19 +287,8 @@ async def _handle_msg(self, peer: HeaderRequestingPeer, cmd: protocol.Command,
335287 async def _handle_get_block_headers (self , peer : LESPeer , msg : Dict [str , Any ]) -> None :
336288 self .logger .debug ("Peer %s made header request: %s" , peer , msg )
337289 query = msg ['query' ]
338- try :
339- block_numbers = await self ._get_block_numbers_for_request (
340- query .block_number_or_hash , query .max_headers ,
341- query .skip , query .reverse )
342- except HeaderNotFound :
343- self .logger .debug (
344- "Peer %r requested starting header %r that is unavailable, returning nothing" ,
345- peer ,
346- query .block_number_or_hash ,
347- )
348- block_numbers = []
349-
350- headers = [header async for header in self ._generate_available_headers (block_numbers )]
290+ headers = await self ._handler .lookup_headers (
291+ query .block_number_or_hash , query .max_headers , query .skip , query .reverse )
351292 peer .sub_proto .send_block_headers (headers , buffer_value = 0 , request_id = msg ['request_id' ])
352293
353294 async def _process_headers (
@@ -598,19 +539,9 @@ async def _handle_get_block_headers(
598539 header_request : Dict [str , Any ]) -> None :
599540 self .logger .debug ("Peer %s made header request: %s" , peer , header_request )
600541
601- try :
602- block_numbers = await self ._get_block_numbers_for_request (
603- header_request ['block_number_or_hash' ], header_request ['max_headers' ],
604- header_request ['skip' ], header_request ['reverse' ])
605- except HeaderNotFound :
606- self .logger .debug (
607- "Peer %r requested starting header %r that is unavailable, returning nothing" ,
608- peer ,
609- header_request ['block_number_or_hash' ],
610- )
611- block_numbers = []
612-
613- headers = [header async for header in self ._generate_available_headers (block_numbers )]
542+ headers = await self ._handler .lookup_headers (
543+ header_request ['block_number_or_hash' ], header_request ['max_headers' ],
544+ header_request ['skip' ], header_request ['reverse' ])
614545 peer .sub_proto .send_block_headers (headers )
615546
616547
@@ -634,55 +565,20 @@ async def _handle_msg(self, peer: HeaderRequestingPeer, cmd: protocol.Command,
634565 elif isinstance (cmd , eth .GetBlockHeaders ):
635566 await self ._handle_get_block_headers (peer , cast (Dict [str , Any ], msg ))
636567 elif isinstance (cmd , eth .GetBlockBodies ):
637- await self ._handle_get_block_bodies (peer , cast (List [Hash32 ], msg ))
568+ # Only serve up to eth.MAX_BODIES_FETCH items in every request.
569+ block_hashes = cast (List [Hash32 ], msg )[:eth .MAX_BODIES_FETCH ]
570+ await self ._handler .handle_get_block_bodies (peer , cast (List [Hash32 ], msg ))
638571 elif isinstance (cmd , eth .GetReceipts ):
639- await self ._handle_get_receipts (peer , cast (List [Hash32 ], msg ))
572+ # Only serve up to eth.MAX_RECEIPTS_FETCH items in every request.
573+ block_hashes = cast (List [Hash32 ], msg )[:eth .MAX_RECEIPTS_FETCH ]
574+ await self ._handler .handle_get_receipts (peer , block_hashes )
640575 elif isinstance (cmd , eth .GetNodeData ):
641- await self ._handle_get_node_data (peer , cast (List [Hash32 ], msg ))
576+ # Only serve up to eth.MAX_STATE_FETCH items in every request.
577+ node_hashes = cast (List [Hash32 ], msg )[:eth .MAX_STATE_FETCH ]
578+ await self ._handler .handle_get_node_data (peer , node_hashes )
642579 else :
643580 self .logger .debug ("%s msg not handled yet, need to be implemented" , cmd )
644581
645- async def _handle_get_block_bodies (self , peer : ETHPeer , msg : List [Hash32 ]) -> None :
646- bodies = []
647- # Only serve up to eth.MAX_BODIES_FETCH items in every request.
648- hashes = msg [:eth .MAX_BODIES_FETCH ]
649- for block_hash in hashes :
650- try :
651- header = await self .wait (self .db .coro_get_block_header_by_hash (block_hash ))
652- except HeaderNotFound :
653- self .logger .debug ("%s asked for block we don't have: %s" , peer , block_hash )
654- continue
655- transactions = await self .wait (
656- self .db .coro_get_block_transactions (header , BaseTransactionFields ))
657- uncles = await self .wait (self .db .coro_get_block_uncles (header .uncles_hash ))
658- bodies .append (BlockBody (transactions , uncles ))
659- peer .sub_proto .send_block_bodies (bodies )
660-
661- async def _handle_get_receipts (self , peer : ETHPeer , msg : List [Hash32 ]) -> None :
662- receipts = []
663- # Only serve up to eth.MAX_RECEIPTS_FETCH items in every request.
664- hashes = msg [:eth .MAX_RECEIPTS_FETCH ]
665- for block_hash in hashes :
666- try :
667- header = await self .wait (self .db .coro_get_block_header_by_hash (block_hash ))
668- except HeaderNotFound :
669- self .logger .debug (
670- "%s asked receipts for block we don't have: %s" , peer , block_hash )
671- continue
672- receipts .append (await self .wait (self .db .coro_get_receipts (header , Receipt )))
673- peer .sub_proto .send_receipts (receipts )
674-
675- async def _handle_get_node_data (self , peer : ETHPeer , msg : List [Hash32 ]) -> None :
676- nodes = []
677- for node_hash in msg :
678- try :
679- node = await self .db .coro_get (node_hash )
680- except KeyError :
681- self .logger .debug ("%s asked for a trie node we don't have: %s" , peer , node_hash )
682- continue
683- nodes .append (node )
684- peer .sub_proto .send_node_data (nodes )
685-
686582 async def _process_headers (
687583 self , peer : HeaderRequestingPeer , headers : Tuple [BlockHeader , ...]) -> int :
688584 target_td = await self ._calculate_td (headers )
@@ -720,6 +616,120 @@ async def _process_headers(
720616 return head .block_number
721617
722618
619+ class PeerRequestHandler (CancellableMixin ):
620+
621+ def __init__ (self , db : 'AsyncHeaderDB' , logger : TraceLogger , token : CancelToken ) -> None :
622+ self .db = db
623+ self .logger = logger
624+ self .cancel_token = token
625+
626+ async def handle_get_block_bodies (self , peer : ETHPeer , block_hashes : List [Hash32 ]) -> None :
627+ chaindb = cast ('AsyncChainDB' , self .db )
628+ bodies = []
629+ for block_hash in block_hashes :
630+ try :
631+ header = await self .wait (chaindb .coro_get_block_header_by_hash (block_hash ))
632+ except HeaderNotFound :
633+ self .logger .debug ("%s asked for block we don't have: %s" , peer , block_hash )
634+ continue
635+ transactions = await self .wait (
636+ chaindb .coro_get_block_transactions (header , BaseTransactionFields ))
637+ uncles = await self .wait (chaindb .coro_get_block_uncles (header .uncles_hash ))
638+ bodies .append (BlockBody (transactions , uncles ))
639+ peer .sub_proto .send_block_bodies (bodies )
640+
641+ async def handle_get_receipts (self , peer : ETHPeer , block_hashes : List [Hash32 ]) -> None :
642+ chaindb = cast ('AsyncChainDB' , self .db )
643+ receipts = []
644+ for block_hash in block_hashes :
645+ try :
646+ header = await self .wait (chaindb .coro_get_block_header_by_hash (block_hash ))
647+ except HeaderNotFound :
648+ self .logger .debug (
649+ "%s asked receipts for block we don't have: %s" , peer , block_hash )
650+ continue
651+ block_receipts = await self .wait (chaindb .coro_get_receipts (header , Receipt ))
652+ receipts .append (block_receipts )
653+ peer .sub_proto .send_receipts (receipts )
654+
655+ async def handle_get_node_data (self , peer : ETHPeer , node_hashes : List [Hash32 ]) -> None :
656+ chaindb = cast ('AsyncChainDB' , self .db )
657+ nodes = []
658+ for node_hash in node_hashes :
659+ try :
660+ node = await self .wait (chaindb .coro_get (node_hash ))
661+ except KeyError :
662+ self .logger .debug ("%s asked for a trie node we don't have: %s" , peer , node_hash )
663+ continue
664+ nodes .append (node )
665+ peer .sub_proto .send_node_data (nodes )
666+
667+ async def lookup_headers (self , block_number_or_hash : Union [int , bytes ], max_headers : int ,
668+ skip : int , reverse : bool ) -> List [BlockHeader ]:
669+ """
670+ Lookup :max_headers: headers starting at :block_number_or_hash:, skipping :skip: items
671+ between each, in reverse order if :reverse: is True.
672+ """
673+ try :
674+ block_numbers = await self ._get_block_numbers_for_request (
675+ block_number_or_hash , max_headers , skip , reverse )
676+ except HeaderNotFound :
677+ self .logger .debug (
678+ "Peer requested starting header %r that is unavailable, returning nothing" ,
679+ block_number_or_hash )
680+ block_numbers = []
681+
682+ headers = [header async for header in self ._generate_available_headers (block_numbers )]
683+ return headers
684+
685+ async def _get_block_numbers_for_request (
686+ self , block_number_or_hash : Union [int , bytes ], max_headers : int ,
687+ skip : int , reverse : bool ) -> List [BlockNumber ]:
688+ """
689+ Generates the block numbers requested, subject to local availability.
690+ """
691+ block_number_or_hash = block_number_or_hash
692+ if isinstance (block_number_or_hash , bytes ):
693+ header = await self .wait (
694+ self .db .coro_get_block_header_by_hash (cast (Hash32 , block_number_or_hash )))
695+ block_number = header .block_number
696+ elif isinstance (block_number_or_hash , int ):
697+ block_number = block_number_or_hash
698+ else :
699+ raise TypeError (
700+ "Unexpected type for 'block_number_or_hash': %s" ,
701+ type (block_number_or_hash ),
702+ )
703+
704+ limit = max (max_headers , eth .MAX_HEADERS_FETCH )
705+ step = skip + 1
706+ if reverse :
707+ low = max (0 , block_number - limit )
708+ high = block_number + 1
709+ block_numbers = reversed (range (low , high , step ))
710+ else :
711+ low = block_number
712+ high = block_number + limit
713+ block_numbers = iter (range (low , high , step )) # mypy thinks range isn't iterable
714+ return list (block_numbers )
715+
716+ async def _generate_available_headers (
717+ self , block_numbers : List [BlockNumber ]) -> AsyncGenerator [BlockHeader , None ]:
718+ """
719+ Generates the headers requested, halting on the first header that is not locally available.
720+ """
721+ for block_num in block_numbers :
722+ try :
723+ yield await self .wait (
724+ self .db .coro_get_canonical_block_header_by_number (block_num ))
725+ except HeaderNotFound :
726+ self .logger .debug (
727+ "Peer requested header number %s that is unavailable, stopping search." ,
728+ block_num ,
729+ )
730+ break
731+
732+
723733class DownloadedBlockPart (NamedTuple ):
724734 part : Union [eth .BlockBody , List [Receipt ]]
725735 unique_key : Union [bytes , Tuple [bytes , bytes ]]
@@ -815,9 +825,14 @@ async def exit_on_sigint() -> None:
815825 await syncer .cancel ()
816826 loop .stop ()
817827
828+ async def run () -> None :
829+ await syncer .run ()
830+ syncer .logger .info ("run() finished, exiting" )
831+ sigint_received .set ()
832+
818833 # loop.set_debug(True)
819834 asyncio .ensure_future (exit_on_sigint ())
820- asyncio .ensure_future (syncer . run ())
835+ asyncio .ensure_future (run ())
821836 loop .run_forever ()
822837 loop .close ()
823838
0 commit comments