@@ -191,6 +191,7 @@ def __init__(self,
191191 self ._subscribers : List [PeerSubscriber ] = []
192192 self .start_time = datetime .datetime .now ()
193193 self .received_msgs : Dict [protocol .Command , int ] = collections .defaultdict (int )
194+ self .booted = asyncio .Event ()
194195
195196 self .egress_mac = egress_mac
196197 self .ingress_mac = ingress_mac
@@ -344,7 +345,84 @@ def is_closing(self) -> bool:
344345 async def _cleanup (self ) -> None :
345346 self .close ()
346347
348+ async def boot (self ) -> None :
349+ if not self .events .started .is_set ():
350+ raise RuntimeError ("Cannot boot a Peer which has not been started." )
351+
352+ try :
353+ await self ._boot ()
354+ except OperationCancelled :
355+ # If a cancellation happens during boot we suppress it here and
356+ # simply exit without setting the `booted` event.
357+ return
358+ else :
359+ self .booted .set ()
360+
361+ async def _boot (self ) -> None :
362+ try :
363+ await self .ensure_same_side_on_dao_fork ()
364+ except DAOForkCheckFailure as err :
365+ self .logger .debug ("DAO fork check with %s failed: %s" , self , err )
366+ await self .disconnect (DisconnectReason .useless_peer )
367+ raise
368+ except Exception as err :
369+ self .logger .exception ('ERROR BOOTING' )
370+ raise
371+
372+ vm_configuration : Tuple [Tuple [int , Type [BaseVM ]], ...] = None
373+
374+ async def ensure_same_side_on_dao_fork (self ) -> None :
375+ """Ensure we're on the same side of the DAO fork
376+
377+ In order to do that we have to request the DAO fork block and its parent, but while we
378+ wait for that we may receive other messages from the peer, which are returned so that they
379+ can be re-added to our subscribers' queues when the peer is finally added to the pool.
380+ """
381+ for start_block , vm_class in self .vm_configuration :
382+ if not issubclass (vm_class , HomesteadVM ):
383+ continue
384+ elif not vm_class .support_dao_fork :
385+ break
386+ elif start_block > vm_class .dao_fork_block_number :
387+ # VM comes after the fork, so stop checking
388+ break
389+
390+ start_block = vm_class .dao_fork_block_number - 1
391+
392+ try :
393+ headers = await self .requests .get_block_headers ( # type: ignore
394+ start_block ,
395+ max_headers = 2 ,
396+ reverse = False ,
397+ timeout = CHAIN_SPLIT_CHECK_TIMEOUT ,
398+ )
399+
400+ except (TimeoutError , PeerConnectionLost ) as err :
401+ raise DAOForkCheckFailure (
402+ f"Timed out waiting for DAO fork header from { self } : { err } "
403+ ) from err
404+ except ValidationError as err :
405+ raise DAOForkCheckFailure (
406+ f"Invalid header response during DAO fork check: { err } "
407+ ) from err
408+
409+ if len (headers ) != 2 :
410+ raise DAOForkCheckFailure (
411+ f"Peer { self } failed to return DAO fork check headers"
412+ )
413+ else :
414+ parent , header = headers
415+
416+ try :
417+ vm_class .validate_header (header , parent , check_seal = True )
418+ except ValidationError as err :
419+ raise DAOForkCheckFailure (f"Peer failed DAO fork check validation: { err } " )
420+
347421 async def _run (self ) -> None :
422+ # The `boot` process is run in the background to allow the `run` loop
423+ # to continue so that all of the Peer APIs can be used within the
424+ # `boot` task.
425+ self .run_task (self .boot ())
348426 while self .is_operational :
349427 try :
350428 cmd , msg = await self .read_msg ()
@@ -742,25 +820,26 @@ def unsubscribe(self, subscriber: PeerSubscriber) -> None:
742820 peer .remove_subscriber (subscriber )
743821
744822 async def start_peer (self , peer : BasePeer ) -> None :
823+ # TODO: temporary hack until all of this EVM stuff can be fully
824+ # removed from the BasePeer and PeerPool classes.
825+ peer .vm_configuration = self .vm_configuration
826+
745827 self .run_child_service (peer )
746828 await self .wait (peer .events .started .wait (), timeout = 1 )
747829 try :
748- # Although connect() may seem like a more appropriate place to perform the DAO fork
749- # check, we do it here because we want to perform it for incoming peer connections as
750- # well.
751830 with peer .collect_sub_proto_messages () as buffer :
752- await self .ensure_same_side_on_dao_fork (peer )
753- except DAOForkCheckFailure as err :
754- self .logger .debug ("DAO fork check with %s failed: %s" , peer , err )
755- await peer .disconnect (DisconnectReason .useless_peer )
831+ # TODO: update to use a more generic timeout
832+ await self .wait (peer .booted .wait (), timeout = CHAIN_SPLIT_CHECK_TIMEOUT )
833+ except TimeoutError as err :
834+ self .logger .debug ('Timout waiting for peer to boot: %s' , err )
835+ await peer .disconnect (DisconnectReason .timeout )
756836 return
757837 else :
758- msgs = tuple ((cmd , msg ) for _ , cmd , msg in buffer .get_messages ())
759- self ._add_peer (peer , msgs )
838+ self ._add_peer (peer , buffer .get_messages ())
760839
761840 def _add_peer (self ,
762841 peer : BasePeer ,
763- msgs : Tuple [Tuple [ protocol . Command , protocol . PayloadType ] , ...]) -> None :
842+ msgs : Tuple [PeerMessage , ...]) -> None :
764843 """Add the given peer to the pool.
765844
766845 Appart from adding it to our list of connected nodes and adding each of our subscriber's
@@ -772,8 +851,8 @@ def _add_peer(self,
772851 for subscriber in self ._subscribers :
773852 subscriber .register_peer (peer )
774853 peer .add_subscriber (subscriber )
775- for cmd , msg in msgs :
776- subscriber .add_msg (PeerMessage ( peer , cmd , msg ) )
854+ for msg in msgs :
855+ subscriber .add_msg (msg )
777856
778857 async def _run (self ) -> None :
779858 # FIXME: PeerPool should probably no longer be a BaseService, but for now we're keeping it
@@ -848,54 +927,6 @@ async def connect_to_nodes(self, nodes: Iterator[Node]) -> None:
848927 if peer is not None :
849928 await self .start_peer (peer )
850929
851- async def ensure_same_side_on_dao_fork (
852- self , peer : BasePeer ) -> None :
853- """Ensure we're on the same side of the DAO fork as the given peer.
854-
855- In order to do that we have to request the DAO fork block and its parent, but while we
856- wait for that we may receive other messages from the peer, which are returned so that they
857- can be re-added to our subscribers' queues when the peer is finally added to the pool.
858- """
859- for start_block , vm_class in self .vm_configuration :
860- if not issubclass (vm_class , HomesteadVM ):
861- continue
862- elif not vm_class .support_dao_fork :
863- break
864- elif start_block > vm_class .dao_fork_block_number :
865- # VM comes after the fork, so stop checking
866- break
867-
868- start_block = vm_class .dao_fork_block_number - 1
869-
870- try :
871- headers = await peer .requests .get_block_headers ( # type: ignore
872- start_block ,
873- max_headers = 2 ,
874- reverse = False ,
875- timeout = CHAIN_SPLIT_CHECK_TIMEOUT ,
876- )
877-
878- except (TimeoutError , PeerConnectionLost ) as err :
879- raise DAOForkCheckFailure (
880- f"Timed out waiting for DAO fork header from { peer } : { err } "
881- ) from err
882- except ValidationError as err :
883- raise DAOForkCheckFailure (
884- f"Invalid header response during DAO fork check: { err } "
885- ) from err
886-
887- if len (headers ) != 2 :
888- raise DAOForkCheckFailure (
889- f"Peer { peer } failed to return DAO fork check headers"
890- )
891- else :
892- parent , header = headers
893-
894- try :
895- vm_class .validate_header (header , parent , check_seal = True )
896- except ValidationError as err :
897- raise DAOForkCheckFailure (f"Peer failed DAO fork check validation: { err } " )
898-
899930 def _peer_finished (self , peer : BaseService ) -> None :
900931 """Remove the given peer from our list of connected nodes.
901932 This is passed as a callback to be called when a peer finishes.
0 commit comments