4040from p2p .cancel_token import CancellableMixin , CancelToken
4141from p2p .constants import MAX_REORG_DEPTH
4242from p2p .exceptions import NoEligiblePeers , OperationCancelled
43+ from p2p .p2p_proto import DisconnectReason
4344from p2p .peer import BasePeer , ETHPeer , LESPeer , PeerPool , PeerPoolSubscriber
4445from p2p .rlp import BlockBody
4546from p2p .service import BaseService
@@ -187,7 +188,7 @@ async def _sync(self, peer: HeaderRequestingPeer) -> None:
187188 headers = await self ._fetch_missing_headers (peer , start_at )
188189 except TimeoutError :
189190 self .logger .warn ("Timeout waiting for header batch from %s, aborting sync" , peer )
190- await peer .cancel ( )
191+ await peer .disconnect ( DisconnectReason . timeout )
191192 break
192193
193194 if not headers :
@@ -289,6 +290,7 @@ async def _handle_get_block_headers(self, peer: LESPeer, msg: Dict[str, Any]) ->
289290 query = msg ['query' ]
290291 headers = await self ._handler .lookup_headers (
291292 query .block_number_or_hash , query .max_headers , query .skip , query .reverse )
293+ self .logger .trace ("Replying to %s with %d headers" , peer , len (headers ))
292294 peer .sub_proto .send_block_headers (headers , buffer_value = 0 , request_id = msg ['request_id' ])
293295
294296 async def _process_headers (
@@ -490,8 +492,28 @@ async def _handle_msg(self, peer: HeaderRequestingPeer, cmd: protocol.Command,
490492 await self ._handle_new_block (peer , cast (Dict [str , Any ], msg ))
491493 elif isinstance (cmd , eth .GetBlockHeaders ):
492494 await self ._handle_get_block_headers (peer , cast (Dict [str , Any ], msg ))
495+ elif isinstance (cmd , eth .GetBlockBodies ):
496+ # Only serve up to eth.MAX_BODIES_FETCH items in every request.
497+ block_hashes = cast (List [Hash32 ], msg )[:eth .MAX_BODIES_FETCH ]
498+ await self ._handler .handle_get_block_bodies (peer , block_hashes )
499+ elif isinstance (cmd , eth .GetReceipts ):
500+ # Only serve up to eth.MAX_RECEIPTS_FETCH items in every request.
501+ block_hashes = cast (List [Hash32 ], msg )[:eth .MAX_RECEIPTS_FETCH ]
502+ await self ._handler .handle_get_receipts (peer , block_hashes )
503+ elif isinstance (cmd , eth .GetNodeData ):
504+ # Only serve up to eth.MAX_STATE_FETCH items in every request.
505+ node_hashes = cast (List [Hash32 ], msg )[:eth .MAX_STATE_FETCH ]
506+ await self ._handler .handle_get_node_data (peer , node_hashes )
507+ elif isinstance (cmd , eth .Transactions ):
508+ # Transactions msgs are handled by our TxPool service.
509+ pass
510+ elif isinstance (cmd , eth .NodeData ):
511+ # When doing a chain sync we never send GetNodeData requests, so peers should not send
512+ # us NodeData msgs.
513+ self .logger .warn ("Unexpected NodeData msg from %s, disconnecting" , peer )
514+ await peer .disconnect (DisconnectReason .bad_protocol )
493515 else :
494- self .logger .debug ("Ignoring %s message from %s" , cmd , peer )
516+ self .logger .debug ("%s msg not handled yet, need to be implemented" , cmd )
495517
496518 async def _handle_new_block (self , peer : ETHPeer , msg : Dict [str , Any ]) -> None :
497519 self ._sync_requests .put_nowait (peer )
@@ -515,7 +537,7 @@ async def _handle_block_receipts(self,
515537
516538 async def _handle_block_bodies (self ,
517539 peer : ETHPeer ,
518- bodies : List [eth . BlockBody ]) -> None :
540+ bodies : List [BlockBody ]) -> None :
519541 self .logger .debug ("Got Bodies for %d blocks from %s" , len (bodies ), peer )
520542 loop = asyncio .get_event_loop ()
521543 iterator = map (make_trie_root_and_nodes , [body .transactions for body in bodies ])
@@ -542,6 +564,7 @@ async def _handle_get_block_headers(
542564 headers = await self ._handler .lookup_headers (
543565 header_request ['block_number_or_hash' ], header_request ['max_headers' ],
544566 header_request ['skip' ], header_request ['reverse' ])
567+ self .logger .trace ("Replying to %s with %d headers" , peer , len (headers ))
545568 peer .sub_proto .send_block_headers (headers )
546569
547570
@@ -553,31 +576,11 @@ class RegularChainSyncer(FastChainSyncer):
553576 """
554577 _exit_on_sync_complete = False
555578
556- async def _handle_msg (self , peer : HeaderRequestingPeer , cmd : protocol .Command ,
557- msg : protocol ._DecodedMsgType ) -> None :
558- peer = cast (ETHPeer , peer )
559- if isinstance (cmd , eth .BlockHeaders ):
560- self ._handle_block_headers (tuple (cast (Tuple [BlockHeader , ...], msg )))
561- elif isinstance (cmd , eth .BlockBodies ):
562- await self ._handle_block_bodies (peer , list (cast (Tuple [eth .BlockBody ], msg )))
563- elif isinstance (cmd , eth .NewBlock ):
564- await self ._handle_new_block (peer , cast (Dict [str , Any ], msg ))
565- elif isinstance (cmd , eth .GetBlockHeaders ):
566- await self ._handle_get_block_headers (peer , cast (Dict [str , Any ], msg ))
567- elif isinstance (cmd , eth .GetBlockBodies ):
568- # Only serve up to eth.MAX_BODIES_FETCH items in every request.
569- block_hashes = cast (List [Hash32 ], msg )[:eth .MAX_BODIES_FETCH ]
570- await self ._handler .handle_get_block_bodies (peer , cast (List [Hash32 ], msg ))
571- elif isinstance (cmd , eth .GetReceipts ):
572- # Only serve up to eth.MAX_RECEIPTS_FETCH items in every request.
573- block_hashes = cast (List [Hash32 ], msg )[:eth .MAX_RECEIPTS_FETCH ]
574- await self ._handler .handle_get_receipts (peer , block_hashes )
575- elif isinstance (cmd , eth .GetNodeData ):
576- # Only serve up to eth.MAX_STATE_FETCH items in every request.
577- node_hashes = cast (List [Hash32 ], msg )[:eth .MAX_STATE_FETCH ]
578- await self ._handler .handle_get_node_data (peer , node_hashes )
579- else :
580- self .logger .debug ("%s msg not handled yet, need to be implemented" , cmd )
579+ async def _handle_block_receipts (
580+ self , peer : ETHPeer , receipts_by_block : List [List [eth .Receipt ]]) -> None :
581+ # When doing a regular sync we never request receipts.
582+ self .logger .warn ("Unexpected BlockReceipts msg from %s, disconnecting" , peer )
583+ await peer .disconnect (DisconnectReason .bad_protocol )
581584
582585 async def _process_headers (
583586 self , peer : HeaderRequestingPeer , headers : Tuple [BlockHeader , ...]) -> int :
@@ -599,7 +602,7 @@ async def _process_headers(
599602 transactions : List [BaseTransaction ] = []
600603 uncles : List [BlockHeader ] = []
601604 else :
602- body = cast (eth . BlockBody , downloaded_parts [_body_key (header )])
605+ body = cast (BlockBody , downloaded_parts [_body_key (header )])
603606 tx_class = block_class .get_transaction_class ()
604607 transactions = [tx_class .from_base_transaction (tx )
605608 for tx in body .transactions ]
@@ -624,6 +627,7 @@ def __init__(self, db: 'AsyncHeaderDB', logger: TraceLogger, token: CancelToken)
624627 self .cancel_token = token
625628
626629 async def handle_get_block_bodies (self , peer : ETHPeer , block_hashes : List [Hash32 ]) -> None :
630+ self .logger .trace ("%s requested bodies for %d blocks" , peer , len (block_hashes ))
627631 chaindb = cast ('AsyncChainDB' , self .db )
628632 bodies = []
629633 for block_hash in block_hashes :
@@ -636,9 +640,11 @@ async def handle_get_block_bodies(self, peer: ETHPeer, block_hashes: List[Hash32
636640 chaindb .coro_get_block_transactions (header , BaseTransactionFields ))
637641 uncles = await self .wait (chaindb .coro_get_block_uncles (header .uncles_hash ))
638642 bodies .append (BlockBody (transactions , uncles ))
643+ self .logger .trace ("Replying to %s with %d block bodies" , peer , len (bodies ))
639644 peer .sub_proto .send_block_bodies (bodies )
640645
641646 async def handle_get_receipts (self , peer : ETHPeer , block_hashes : List [Hash32 ]) -> None :
647+ self .logger .trace ("%s requested receipts for %d blocks" , peer , len (block_hashes ))
642648 chaindb = cast ('AsyncChainDB' , self .db )
643649 receipts = []
644650 for block_hash in block_hashes :
@@ -650,9 +656,11 @@ async def handle_get_receipts(self, peer: ETHPeer, block_hashes: List[Hash32]) -
650656 continue
651657 block_receipts = await self .wait (chaindb .coro_get_receipts (header , Receipt ))
652658 receipts .append (block_receipts )
659+ self .logger .trace ("Replying to %s with receipts for %d blocks" , peer , len (receipts ))
653660 peer .sub_proto .send_receipts (receipts )
654661
655662 async def handle_get_node_data (self , peer : ETHPeer , node_hashes : List [Hash32 ]) -> None :
663+ self .logger .trace ("%s requested %d trie nodes" , peer , len (node_hashes ))
656664 chaindb = cast ('AsyncChainDB' , self .db )
657665 nodes = []
658666 for node_hash in node_hashes :
@@ -662,6 +670,7 @@ async def handle_get_node_data(self, peer: ETHPeer, node_hashes: List[Hash32]) -
662670 self .logger .debug ("%s asked for a trie node we don't have: %s" , peer , node_hash )
663671 continue
664672 nodes .append (node )
673+ self .logger .trace ("Replying to %s with %d trie nodes" , peer , len (nodes ))
665674 peer .sub_proto .send_node_data (nodes )
666675
667676 async def lookup_headers (self , block_number_or_hash : Union [int , bytes ], max_headers : int ,
@@ -731,7 +740,7 @@ async def _generate_available_headers(
731740
732741
733742class DownloadedBlockPart (NamedTuple ):
734- part : Union [eth . BlockBody , List [Receipt ]]
743+ part : Union [BlockBody , List [Receipt ]]
735744 unique_key : Union [bytes , Tuple [bytes , bytes ]]
736745
737746
0 commit comments