3737from p2p import protocol
3838from p2p import eth
3939from p2p import les
40- from p2p .cancel_token import CancelToken , wait_with_token
40+ from p2p .cancel_token import CancellableMixin , CancelToken
4141from p2p .constants import MAX_REORG_DEPTH
4242from p2p .exceptions import NoEligiblePeers , OperationCancelled
4343from p2p .peer import BasePeer , ETHPeer , LESPeer , PeerPool , PeerPoolSubscriber
@@ -565,11 +565,17 @@ async def _handle_msg(self, peer: HeaderRequestingPeer, cmd: protocol.Command,
565565 elif isinstance (cmd , eth .GetBlockHeaders ):
566566 await self ._handle_get_block_headers (peer , cast (Dict [str , Any ], msg ))
567567 elif isinstance (cmd , eth .GetBlockBodies ):
568+ # Only serve up to eth.MAX_BODIES_FETCH items in every request.
569+ block_hashes = cast (List [Hash32 ], msg )[:eth .MAX_BODIES_FETCH ]
568570 await self ._handler .handle_get_block_bodies (peer , cast (List [Hash32 ], msg ))
569571 elif isinstance (cmd , eth .GetReceipts ):
570- await self ._handler .handle_get_receipts (peer , cast (List [Hash32 ], msg ))
572+ # Only serve up to eth.MAX_RECEIPTS_FETCH items in every request.
573+ block_hashes = cast (List [Hash32 ], msg )[:eth .MAX_RECEIPTS_FETCH ]
574+ await self ._handler .handle_get_receipts (peer , block_hashes )
571575 elif isinstance (cmd , eth .GetNodeData ):
572- await self ._handler .handle_get_node_data (peer , cast (List [Hash32 ], msg ))
576+ # Only serve up to eth.MAX_STATE_FETCH items in every request.
577+ node_hashes = cast (List [Hash32 ], msg )[:eth .MAX_STATE_FETCH ]
578+ await self ._handler .handle_get_node_data (peer , node_hashes )
573579 else :
574580 self .logger .debug ("%s msg not handled yet, need to be implemented" , cmd )
575581
@@ -610,7 +616,7 @@ async def _process_headers(
610616 return head .block_number
611617
612618
613- class PeerRequestHandler :
619+ class PeerRequestHandler ( CancellableMixin ) :
614620
615621 def __init__ (self , db : 'AsyncHeaderDB' , logger : TraceLogger , token : CancelToken ) -> None :
616622 self .db = db
@@ -620,52 +626,38 @@ def __init__(self, db: 'AsyncHeaderDB', logger: TraceLogger, token: CancelToken)
620626 async def handle_get_block_bodies (self , peer : ETHPeer , block_hashes : List [Hash32 ]) -> None :
621627 chaindb = cast ('AsyncChainDB' , self .db )
622628 bodies = []
623- # Only serve up to eth.MAX_BODIES_FETCH items in every request.
624- for block_hash in block_hashes [:eth .MAX_BODIES_FETCH ]:
629+ for block_hash in block_hashes :
625630 try :
626- header = await wait_with_token (
627- chaindb .coro_get_block_header_by_hash (block_hash ),
628- token = self .cancel_token )
631+ header = await self .wait (chaindb .coro_get_block_header_by_hash (block_hash ))
629632 except HeaderNotFound :
630633 self .logger .debug ("%s asked for block we don't have: %s" , peer , block_hash )
631634 continue
632- transactions = await wait_with_token (
633- chaindb .coro_get_block_transactions (header , BaseTransactionFields ),
634- token = self .cancel_token )
635- uncles = await wait_with_token (
636- chaindb .coro_get_block_uncles (header .uncles_hash ),
637- token = self .cancel_token )
635+ transactions = await self .wait (
636+ chaindb .coro_get_block_transactions (header , BaseTransactionFields ))
637+ uncles = await self .wait (chaindb .coro_get_block_uncles (header .uncles_hash ))
638638 bodies .append (BlockBody (transactions , uncles ))
639639 peer .sub_proto .send_block_bodies (bodies )
640640
641641 async def handle_get_receipts (self , peer : ETHPeer , block_hashes : List [Hash32 ]) -> None :
642642 chaindb = cast ('AsyncChainDB' , self .db )
643643 receipts = []
644- # Only serve up to eth.MAX_RECEIPTS_FETCH items in every request.
645- for block_hash in block_hashes [:eth .MAX_RECEIPTS_FETCH ]:
644+ for block_hash in block_hashes :
646645 try :
647- header = await wait_with_token (
648- chaindb .coro_get_block_header_by_hash (block_hash ),
649- token = self .cancel_token )
646+ header = await self .wait (chaindb .coro_get_block_header_by_hash (block_hash ))
650647 except HeaderNotFound :
651648 self .logger .debug (
652649 "%s asked receipts for block we don't have: %s" , peer , block_hash )
653650 continue
654- block_receipts = await wait_with_token (
655- chaindb .coro_get_receipts (header , Receipt ),
656- token = self .cancel_token )
651+ block_receipts = await self .wait (chaindb .coro_get_receipts (header , Receipt ))
657652 receipts .append (block_receipts )
658653 peer .sub_proto .send_receipts (receipts )
659654
660655 async def handle_get_node_data (self , peer : ETHPeer , node_hashes : List [Hash32 ]) -> None :
661656 chaindb = cast ('AsyncChainDB' , self .db )
662657 nodes = []
663- # Only serve up to eth.MAX_STATE_FETCH items in every request.
664- for node_hash in node_hashes [:eth .MAX_STATE_FETCH ]:
658+ for node_hash in node_hashes :
665659 try :
666- node = await wait_with_token (
667- chaindb .coro_get (node_hash ),
668- token = self .cancel_token )
660+ node = await self .wait (chaindb .coro_get (node_hash ))
669661 except KeyError :
670662 self .logger .debug ("%s asked for a trie node we don't have: %s" , peer , node_hash )
671663 continue
@@ -698,10 +690,8 @@ async def _get_block_numbers_for_request(
698690 """
699691 block_number_or_hash = block_number_or_hash
700692 if isinstance (block_number_or_hash , bytes ):
701- header = await wait_with_token (
702- self .db .coro_get_block_header_by_hash (cast (Hash32 , block_number_or_hash )),
703- token = self .cancel_token ,
704- )
693+ header = await self .wait (
694+ self .db .coro_get_block_header_by_hash (cast (Hash32 , block_number_or_hash )))
705695 block_number = header .block_number
706696 elif isinstance (block_number_or_hash , int ):
707697 block_number = block_number_or_hash
@@ -730,10 +720,8 @@ async def _generate_available_headers(
730720 """
731721 for block_num in block_numbers :
732722 try :
733- yield await wait_with_token (
734- self .db .coro_get_canonical_block_header_by_number (block_num ),
735- token = self .cancel_token
736- )
723+ yield await self .wait (
724+ self .db .coro_get_canonical_block_header_by_number (block_num ))
737725 except HeaderNotFound :
738726 self .logger .debug (
739727 "Peer requested header number %s that is unavailable, stopping search." ,
0 commit comments