@@ -289,6 +289,7 @@ async def _handle_get_block_headers(self, peer: LESPeer, msg: Dict[str, Any]) ->
289289 query = msg ['query' ]
290290 headers = await self ._handler .lookup_headers (
291291 query .block_number_or_hash , query .max_headers , query .skip , query .reverse )
292+ self .logger .trace ("Replying to %s with %d headers" , peer , len (headers ))
292293 peer .sub_proto .send_block_headers (headers , buffer_value = 0 , request_id = msg ['request_id' ])
293294
294295 async def _process_headers (
@@ -490,8 +491,27 @@ async def _handle_msg(self, peer: HeaderRequestingPeer, cmd: protocol.Command,
490491 await self ._handle_new_block (peer , cast (Dict [str , Any ], msg ))
491492 elif isinstance (cmd , eth .GetBlockHeaders ):
492493 await self ._handle_get_block_headers (peer , cast (Dict [str , Any ], msg ))
494+ elif isinstance (cmd , eth .GetBlockBodies ):
495+ # Only serve up to eth.MAX_BODIES_FETCH items in every request.
496+ block_hashes = cast (List [Hash32 ], msg )[:eth .MAX_BODIES_FETCH ]
497+ await self ._handler .handle_get_block_bodies (peer , block_hashes )
498+ elif isinstance (cmd , eth .GetReceipts ):
499+ # Only serve up to eth.MAX_RECEIPTS_FETCH items in every request.
500+ block_hashes = cast (List [Hash32 ], msg )[:eth .MAX_RECEIPTS_FETCH ]
501+ await self ._handler .handle_get_receipts (peer , block_hashes )
502+ elif isinstance (cmd , eth .GetNodeData ):
503+ # Only serve up to eth.MAX_STATE_FETCH items in every request.
504+ node_hashes = cast (List [Hash32 ], msg )[:eth .MAX_STATE_FETCH ]
505+ await self ._handler .handle_get_node_data (peer , node_hashes )
506+ elif isinstance (cmd , eth .Transactions ):
507+ # Transactions msgs are handled by our TxPool service.
508+ pass
509+ elif isinstance (cmd , eth .NodeData ):
510+ # When doing a chain sync we never send GetNodeData requests, so peers should not send
511+ # us NodeData msgs.
512+ self .logger .warn ("Unexpected NodeData msg from %s" , peer )
493513 else :
494- self .logger .debug ("Ignoring %s message from %s" , cmd , peer )
514+ self .logger .debug ("%s msg not handled yet, need to be implemented" , cmd )
495515
496516 async def _handle_new_block (self , peer : ETHPeer , msg : Dict [str , Any ]) -> None :
497517 self ._sync_requests .put_nowait (peer )
@@ -515,7 +535,7 @@ async def _handle_block_receipts(self,
515535
516536 async def _handle_block_bodies (self ,
517537 peer : ETHPeer ,
518- bodies : List [eth . BlockBody ]) -> None :
538+ bodies : List [BlockBody ]) -> None :
519539 self .logger .debug ("Got Bodies for %d blocks from %s" , len (bodies ), peer )
520540 loop = asyncio .get_event_loop ()
521541 iterator = map (make_trie_root_and_nodes , [body .transactions for body in bodies ])
@@ -542,6 +562,7 @@ async def _handle_get_block_headers(
542562 headers = await self ._handler .lookup_headers (
543563 header_request ['block_number_or_hash' ], header_request ['max_headers' ],
544564 header_request ['skip' ], header_request ['reverse' ])
565+ self .logger .trace ("Replying to %s with %d headers" , peer , len (headers ))
545566 peer .sub_proto .send_block_headers (headers )
546567
547568
@@ -553,31 +574,10 @@ class RegularChainSyncer(FastChainSyncer):
553574 """
554575 _exit_on_sync_complete = False
555576
556- async def _handle_msg (self , peer : HeaderRequestingPeer , cmd : protocol .Command ,
557- msg : protocol ._DecodedMsgType ) -> None :
558- peer = cast (ETHPeer , peer )
559- if isinstance (cmd , eth .BlockHeaders ):
560- self ._handle_block_headers (tuple (cast (Tuple [BlockHeader , ...], msg )))
561- elif isinstance (cmd , eth .BlockBodies ):
562- await self ._handle_block_bodies (peer , list (cast (Tuple [eth .BlockBody ], msg )))
563- elif isinstance (cmd , eth .NewBlock ):
564- await self ._handle_new_block (peer , cast (Dict [str , Any ], msg ))
565- elif isinstance (cmd , eth .GetBlockHeaders ):
566- await self ._handle_get_block_headers (peer , cast (Dict [str , Any ], msg ))
567- elif isinstance (cmd , eth .GetBlockBodies ):
568- # Only serve up to eth.MAX_BODIES_FETCH items in every request.
569- block_hashes = cast (List [Hash32 ], msg )[:eth .MAX_BODIES_FETCH ]
570- await self ._handler .handle_get_block_bodies (peer , cast (List [Hash32 ], msg ))
571- elif isinstance (cmd , eth .GetReceipts ):
572- # Only serve up to eth.MAX_RECEIPTS_FETCH items in every request.
573- block_hashes = cast (List [Hash32 ], msg )[:eth .MAX_RECEIPTS_FETCH ]
574- await self ._handler .handle_get_receipts (peer , block_hashes )
575- elif isinstance (cmd , eth .GetNodeData ):
576- # Only serve up to eth.MAX_STATE_FETCH items in every request.
577- node_hashes = cast (List [Hash32 ], msg )[:eth .MAX_STATE_FETCH ]
578- await self ._handler .handle_get_node_data (peer , node_hashes )
579- else :
580- self .logger .debug ("%s msg not handled yet, need to be implemented" , cmd )
577+ async def _handle_block_receipts (
578+ self , peer : ETHPeer , receipts_by_block : List [List [eth .Receipt ]]) -> None :
579+ # When doing a regular sync we never request receipts.
580+ self .logger .warn ("Unexpected BlockReceipts msg from %s" , peer )
581581
582582 async def _process_headers (
583583 self , peer : HeaderRequestingPeer , headers : Tuple [BlockHeader , ...]) -> int :
@@ -599,7 +599,7 @@ async def _process_headers(
599599 transactions : List [BaseTransaction ] = []
600600 uncles : List [BlockHeader ] = []
601601 else :
602- body = cast (eth . BlockBody , downloaded_parts [_body_key (header )])
602+ body = cast (BlockBody , downloaded_parts [_body_key (header )])
603603 tx_class = block_class .get_transaction_class ()
604604 transactions = [tx_class .from_base_transaction (tx )
605605 for tx in body .transactions ]
@@ -624,6 +624,7 @@ def __init__(self, db: 'AsyncHeaderDB', logger: TraceLogger, token: CancelToken)
624624 self .cancel_token = token
625625
626626 async def handle_get_block_bodies (self , peer : ETHPeer , block_hashes : List [Hash32 ]) -> None :
627+ self .logger .trace ("%s requested bodies for %d blocks" , peer , len (block_hashes ))
627628 chaindb = cast ('AsyncChainDB' , self .db )
628629 bodies = []
629630 for block_hash in block_hashes :
@@ -636,9 +637,11 @@ async def handle_get_block_bodies(self, peer: ETHPeer, block_hashes: List[Hash32
636637 chaindb .coro_get_block_transactions (header , BaseTransactionFields ))
637638 uncles = await self .wait (chaindb .coro_get_block_uncles (header .uncles_hash ))
638639 bodies .append (BlockBody (transactions , uncles ))
640+ self .logger .trace ("Replying to %s with %d block bodies" , peer , len (bodies ))
639641 peer .sub_proto .send_block_bodies (bodies )
640642
641643 async def handle_get_receipts (self , peer : ETHPeer , block_hashes : List [Hash32 ]) -> None :
644+ self .logger .trace ("%s requested receipts for %d blocks" , peer , len (block_hashes ))
642645 chaindb = cast ('AsyncChainDB' , self .db )
643646 receipts = []
644647 for block_hash in block_hashes :
@@ -650,9 +653,11 @@ async def handle_get_receipts(self, peer: ETHPeer, block_hashes: List[Hash32]) -
650653 continue
651654 block_receipts = await self .wait (chaindb .coro_get_receipts (header , Receipt ))
652655 receipts .append (block_receipts )
656+ self .logger .trace ("Replying to %s with receipts for %d blocks" , peer , len (receipts ))
653657 peer .sub_proto .send_receipts (receipts )
654658
655659 async def handle_get_node_data (self , peer : ETHPeer , node_hashes : List [Hash32 ]) -> None :
660+ self .logger .trace ("%s requested %d trie nodes" , peer , len (node_hashes ))
656661 chaindb = cast ('AsyncChainDB' , self .db )
657662 nodes = []
658663 for node_hash in node_hashes :
@@ -662,6 +667,7 @@ async def handle_get_node_data(self, peer: ETHPeer, node_hashes: List[Hash32]) -
662667 self .logger .debug ("%s asked for a trie node we don't have: %s" , peer , node_hash )
663668 continue
664669 nodes .append (node )
670+ self .logger .trace ("Replying to %s with %d trie nodes" , peer , len (nodes ))
665671 peer .sub_proto .send_node_data (nodes )
666672
667673 async def lookup_headers (self , block_number_or_hash : Union [int , bytes ], max_headers : int ,
@@ -731,7 +737,7 @@ async def _generate_available_headers(
731737
732738
733739class DownloadedBlockPart (NamedTuple ):
734- part : Union [eth . BlockBody , List [Receipt ]]
740+ part : Union [BlockBody , List [Receipt ]]
735741 unique_key : Union [bytes , Tuple [bytes , bytes ]]
736742
737743
0 commit comments