@@ -564,55 +564,17 @@ async def _handle_msg(self, peer: HeaderRequestingPeer, cmd: protocol.Command,
564564 elif isinstance (cmd , eth .GetBlockHeaders ):
565565 await self ._handle_get_block_headers (peer , cast (Dict [str , Any ], msg ))
566566 elif isinstance (cmd , eth .GetBlockBodies ):
567- await self ._handle_get_block_bodies (peer , cast (List [Hash32 ], msg ))
567+ await handle_get_block_bodies (
568+ self .db , peer , cast (List [Hash32 ], msg ), self .logger , self .cancel_token )
568569 elif isinstance (cmd , eth .GetReceipts ):
569- await self ._handle_get_receipts (peer , cast (List [Hash32 ], msg ))
570+ await handle_get_receipts (
571+ self .db , peer , cast (List [Hash32 ], msg ), self .logger , self .cancel_token )
570572 elif isinstance (cmd , eth .GetNodeData ):
571- await self ._handle_get_node_data (peer , cast (List [Hash32 ], msg ))
573+ await handle_get_node_data (
574+ self .db , peer , cast (List [Hash32 ], msg ), self .logger , self .cancel_token )
572575 else :
573576 self .logger .debug ("%s msg not handled yet, need to be implemented" , cmd )
574577
575- async def _handle_get_block_bodies (self , peer : ETHPeer , msg : List [Hash32 ]) -> None :
576- bodies = []
577- # Only serve up to eth.MAX_BODIES_FETCH items in every request.
578- hashes = msg [:eth .MAX_BODIES_FETCH ]
579- for block_hash in hashes :
580- try :
581- header = await self .wait (self .db .coro_get_block_header_by_hash (block_hash ))
582- except HeaderNotFound :
583- self .logger .debug ("%s asked for block we don't have: %s" , peer , block_hash )
584- continue
585- transactions = await self .wait (
586- self .db .coro_get_block_transactions (header , BaseTransactionFields ))
587- uncles = await self .wait (self .db .coro_get_block_uncles (header .uncles_hash ))
588- bodies .append (BlockBody (transactions , uncles ))
589- peer .sub_proto .send_block_bodies (bodies )
590-
591- async def _handle_get_receipts (self , peer : ETHPeer , msg : List [Hash32 ]) -> None :
592- receipts = []
593- # Only serve up to eth.MAX_RECEIPTS_FETCH items in every request.
594- hashes = msg [:eth .MAX_RECEIPTS_FETCH ]
595- for block_hash in hashes :
596- try :
597- header = await self .wait (self .db .coro_get_block_header_by_hash (block_hash ))
598- except HeaderNotFound :
599- self .logger .debug (
600- "%s asked receipts for block we don't have: %s" , peer , block_hash )
601- continue
602- receipts .append (await self .wait (self .db .coro_get_receipts (header , Receipt )))
603- peer .sub_proto .send_receipts (receipts )
604-
605- async def _handle_get_node_data (self , peer : ETHPeer , msg : List [Hash32 ]) -> None :
606- nodes = []
607- for node_hash in msg :
608- try :
609- node = await self .db .coro_get (node_hash )
610- except KeyError :
611- self .logger .debug ("%s asked for a trie node we don't have: %s" , peer , node_hash )
612- continue
613- nodes .append (node )
614- peer .sub_proto .send_node_data (nodes )
615-
616578 async def _process_headers (
617579 self , peer : HeaderRequestingPeer , headers : Tuple [BlockHeader , ...]) -> int :
618580 target_td = await self ._calculate_td (headers )
@@ -756,6 +718,67 @@ async def lookup_headers(
756718 return headers
757719
758720
721+ async def handle_get_block_bodies (
722+ chaindb : 'AsyncChainDB' , peer : ETHPeer , block_hashes : List [Hash32 ],
723+ logger : logging .Logger , token : CancelToken ) -> None :
724+ bodies = []
725+ # Only serve up to eth.MAX_BODIES_FETCH items in every request.
726+ for block_hash in block_hashes [:eth .MAX_BODIES_FETCH ]:
727+ try :
728+ header = await wait_with_token (
729+ chaindb .coro_get_block_header_by_hash (block_hash ),
730+ token = token )
731+ except HeaderNotFound :
732+ logger .debug ("%s asked for block we don't have: %s" , peer , block_hash )
733+ continue
734+ transactions = await wait_with_token (
735+ chaindb .coro_get_block_transactions (header , BaseTransactionFields ),
736+ token = token )
737+ uncles = await wait_with_token (
738+ chaindb .coro_get_block_uncles (header .uncles_hash ),
739+ token = token )
740+ bodies .append (BlockBody (transactions , uncles ))
741+ peer .sub_proto .send_block_bodies (bodies )
742+
743+
744+ async def handle_get_receipts (
745+ chaindb : 'AsyncChainDB' , peer : ETHPeer , block_hashes : List [Hash32 ],
746+ logger : logging .Logger , token : CancelToken ) -> None :
747+ receipts = []
748+ # Only serve up to eth.MAX_RECEIPTS_FETCH items in every request.
749+ for block_hash in block_hashes [:eth .MAX_RECEIPTS_FETCH ]:
750+ try :
751+ header = await wait_with_token (
752+ chaindb .coro_get_block_header_by_hash (block_hash ),
753+ token = token )
754+ except HeaderNotFound :
755+ logger .debug (
756+ "%s asked receipts for block we don't have: %s" , peer , block_hash )
757+ continue
758+ block_receipts = await wait_with_token (
759+ chaindb .coro_get_receipts (header , Receipt ),
760+ token = token )
761+ receipts .append (block_receipts )
762+ peer .sub_proto .send_receipts (receipts )
763+
764+
765+ async def handle_get_node_data (
766+ chaindb : 'AsyncChainDB' , peer : ETHPeer , node_hashes : List [Hash32 ],
767+ logger : logging .Logger , token : CancelToken ) -> None :
768+ nodes = []
769+ # Only serve up to eth.MAX_STATE_FETCH items in every request.
770+ for node_hash in node_hashes [:eth .MAX_STATE_FETCH ]:
771+ try :
772+ node = await wait_with_token (
773+ chaindb .coro_get (node_hash ),
774+ token = token )
775+ except KeyError :
776+ logger .debug ("%s asked for a trie node we don't have: %s" , peer , node_hash )
777+ continue
778+ nodes .append (node )
779+ peer .sub_proto .send_node_data (nodes )
780+
781+
759782def _test () -> None :
760783 import argparse
761784 import signal
0 commit comments