55import functools
66import logging
77import operator
8- import random
98import struct
109from abc import (
1110 ABC ,
2423 Set ,
2524 Tuple ,
2625 Type ,
27- TYPE_CHECKING ,
2826)
2927
3028import sha3
3937
4038from eth_utils import (
4139 to_tuple ,
42- ValidationError ,
4340)
4441
45- from eth_typing import BlockNumber , Hash32
4642
4743from eth_keys import datatypes
4844
4945from cancel_token import CancelToken , OperationCancelled
5046
51- from lahja import (
52- Endpoint ,
53- )
54-
55- from eth .constants import GENESIS_BLOCK_NUMBER
56- from eth .rlp .headers import BlockHeader
57- from eth .vm .base import BaseVM
58- from eth .vm .forks import HomesteadVM
47+ from lahja import Endpoint
5948
6049from p2p import auth
6150from p2p import protocol
6251from p2p .kademlia import Node
6352from p2p .exceptions import (
6453 BadAckMessage ,
65- DAOForkCheckFailure ,
6654 DecryptionError ,
6755 HandshakeFailure ,
6856 MalformedMessage ,
69- NoConnectedPeers ,
7057 NoMatchingPeerCapabilities ,
7158 PeerConnectionLost ,
7259 RemoteDisconnected ,
9178)
9279
9380from .constants import (
94- CHAIN_SPLIT_CHECK_TIMEOUT ,
9581 CONN_IDLE_TIMEOUT ,
9682 DEFAULT_MAX_PEERS ,
9783 DEFAULT_PEER_BOOT_TIMEOUT ,
10490 PeerCountResponse ,
10591)
10692
107- if TYPE_CHECKING :
108- from trinity .db .header import BaseAsyncHeaderDB # noqa: F401
109-
11093
11194async def handshake (remote : Node , factory : 'BasePeerFactory' ) -> 'BasePeer' :
11295 """Perform the auth and P2P handshakes with the given remote.
@@ -149,13 +132,6 @@ async def handshake(remote: Node, factory: 'BasePeerFactory') -> 'BasePeer':
149132 return peer
150133
151134
152- class ChainInfo (NamedTuple ):
153- block_number : BlockNumber
154- block_hash : Hash32
155- total_difficulty : int
156- genesis_hash : Hash32
157-
158-
159135class PeerConnection (NamedTuple ):
160136 reader : asyncio .StreamReader
161137 writer : asyncio .StreamWriter
@@ -165,80 +141,21 @@ class PeerConnection(NamedTuple):
165141 ingress_mac : sha3 .keccak_256
166142
167143
168- class PeerBootManager (BaseService ):
144+ class BasePeerBootManager (BaseService ):
145+ """
146+ The default boot manager does nothing, simply serving as a hook for other
147+ protocols which need to perform more complex boot check.
148+ """
169149 def __init__ (self , peer : 'BasePeer' ) -> None :
170150 super ().__init__ (peer .cancel_token )
171151 self .peer = peer
172152
173153 async def _run (self ) -> None :
174- try :
175- await self .ensure_same_side_on_dao_fork ()
176- except DAOForkCheckFailure as err :
177- self .logger .debug ("DAO fork check with %s failed: %s" , self .peer , err )
178- # If we `await` the `peer.disconnect` call, we end up with an
179- # OperationCancelled exception bubbling. This doesn't actually
180- # cause anything *bad* to happen, but it does cause the service to
181- # exit via exception rather than cleanly shutting down. By using
182- # `run_task`, this service finishes exiting prior to the
183- # cancellation.
184- self .run_daemon_task (self .peer .disconnect (DisconnectReason .useless_peer ))
185-
186- async def ensure_same_side_on_dao_fork (self ) -> None :
187- """Ensure we're on the same side of the DAO fork as the given peer.
188-
189- In order to do that we have to request the DAO fork block and its parent, but while we
190- wait for that we may receive other messages from the peer, which are returned so that they
191- can be re-added to our subscribers' queues when the peer is finally added to the pool.
192- """
193- for start_block , vm_class in self .peer .context .vm_configuration :
194- if not issubclass (vm_class , HomesteadVM ):
195- continue
196- elif not vm_class .support_dao_fork :
197- break
198- elif start_block > vm_class .dao_fork_block_number :
199- # VM comes after the fork, so stop checking
200- break
201-
202- start_block = vm_class .dao_fork_block_number - 1
203-
204- try :
205- headers = await self .peer .requests .get_block_headers ( # type: ignore
206- start_block ,
207- max_headers = 2 ,
208- reverse = False ,
209- timeout = CHAIN_SPLIT_CHECK_TIMEOUT ,
210- )
211-
212- except (TimeoutError , PeerConnectionLost ) as err :
213- raise DAOForkCheckFailure (
214- f"Timed out waiting for DAO fork header from { self .peer } : { err } "
215- ) from err
216- except ValidationError as err :
217- raise DAOForkCheckFailure (
218- f"Invalid header response during DAO fork check: { err } "
219- ) from err
220-
221- if len (headers ) != 2 :
222- raise DAOForkCheckFailure (
223- f"{ self .peer } failed to return DAO fork check headers"
224- )
225- else :
226- parent , header = headers
227-
228- try :
229- vm_class .validate_header (header , parent , check_seal = True )
230- except ValidationError as err :
231- raise DAOForkCheckFailure (f"{ self .peer } failed DAO fork check validation: { err } " )
154+ pass
232155
233156
234157class BasePeerContext :
235- def __init__ (self ,
236- headerdb : 'BaseAsyncHeaderDB' ,
237- network_id : int ,
238- vm_configuration : Tuple [Tuple [int , Type [BaseVM ]], ...]) -> None :
239- self .headerdb = headerdb
240- self .network_id = network_id
241- self .vm_configuration = vm_configuration
158+ pass
242159
243160
244161class BasePeer (BaseService ):
@@ -250,8 +167,6 @@ class BasePeer(BaseService):
250167 listen_port = 30303
251168 # Will be set upon the successful completion of a P2P handshake.
252169 sub_proto : protocol .Protocol = None
253- head_td : int = None
254- head_hash : Hash32 = None
255170
256171 def __init__ (self ,
257172 remote : Node ,
@@ -266,10 +181,6 @@ def __init__(self,
266181 # Any contextual information the peer may need.
267182 self .context = context
268183
269- self .headerdb = context .headerdb
270- self .network_id = context .network_id
271- self .vm_configuration = context .vm_configuration
272-
273184 # The `Node` that this peer is connected to
274185 self .remote = remote
275186
@@ -317,10 +228,10 @@ def get_extra_stats(self) -> List[str]:
317228 return []
318229
319230 @property
320- def boot_manager_class (self ) -> Type [PeerBootManager ]:
321- return PeerBootManager
231+ def boot_manager_class (self ) -> Type [BasePeerBootManager ]:
232+ return BasePeerBootManager
322233
323- def get_boot_manager (self ) -> PeerBootManager :
234+ def get_boot_manager (self ) -> BasePeerBootManager :
324235 return self .boot_manager_class (self )
325236
326237 @abstractmethod
@@ -402,24 +313,6 @@ async def do_p2p_handshake(self) -> None:
402313 )
403314 await self .process_p2p_handshake (cmd , msg )
404315
405- @property
406- async def genesis (self ) -> BlockHeader :
407- genesis_hash = await self .wait (
408- self .headerdb .coro_get_canonical_block_hash (BlockNumber (GENESIS_BLOCK_NUMBER )))
409- return await self .wait (self .headerdb .coro_get_block_header_by_hash (genesis_hash ))
410-
411- @property
412- async def _local_chain_info (self ) -> ChainInfo :
413- genesis = await self .genesis
414- head = await self .wait (self .headerdb .coro_get_canonical_head ())
415- total_difficulty = await self .headerdb .coro_get_score (head .hash )
416- return ChainInfo (
417- block_number = head .block_number ,
418- block_hash = head .hash ,
419- total_difficulty = total_difficulty ,
420- genesis_hash = genesis .hash ,
421- )
422-
423316 @property
424317 def capabilities (self ) -> List [Tuple [str , int ]]:
425318 return [(klass .name , klass .version ) for klass in self ._supported_sub_protocols ]
@@ -647,23 +540,34 @@ def send(self, header: bytes, body: bytes) -> None:
647540 return
648541 self .writer .write (self .encrypt (header , body ))
649542
650- async def disconnect (self , reason : DisconnectReason ) -> None :
651- """Send a disconnect msg to the remote node and stop this Peer.
652-
653- Also awaits for self.cancel() to ensure any pending tasks are cleaned up.
654-
655- :param reason: An item from the DisconnectReason enum.
656- """
543+ def _disconnect (self , reason : DisconnectReason ) -> None :
657544 if not isinstance (reason , DisconnectReason ):
658545 raise ValueError (
659546 f"Reason must be an item of DisconnectReason, got { reason } "
660547 )
661548 self .logger .debug ("Disconnecting from remote peer; reason: %s" , reason .name )
662549 self .base_protocol .send_disconnect (reason .value )
663550 self .close ()
551+
552+ async def disconnect (self , reason : DisconnectReason ) -> None :
553+ """Send a disconnect msg to the remote node and stop this Peer.
554+
555+ Also awaits for self.cancel() to ensure any pending tasks are cleaned up.
556+
557+ :param reason: An item from the DisconnectReason enum.
558+ """
559+ self ._disconnect (reason )
664560 if self .is_operational :
665561 await self .cancel ()
666562
563+ def disconnect_nowait (self , reason : DisconnectReason ) -> None :
564+ """
565+ Non-coroutine version of `disconnect`
566+ """
567+ self ._disconnect (reason )
568+ if self .is_operational :
569+ self .cancel_nowait ()
570+
667571 def select_sub_protocol (self , remote_capabilities : List [Tuple [bytes , int ]]
668572 ) -> protocol .Protocol :
669573 """Select the sub-protocol to use when talking to the remote.
@@ -1031,22 +935,6 @@ def _peer_finished(self, peer: BaseService) -> None:
1031935 def __aiter__ (self ) -> AsyncIterator [BasePeer ]:
1032936 return ConnectedPeersIterator (tuple (self .connected_nodes .values ()))
1033937
1034- @property
1035- def highest_td_peer (self ) -> BasePeer :
1036- peers = tuple (self .connected_nodes .values ())
1037- if not peers :
1038- raise NoConnectedPeers ()
1039- peers_by_td = groupby (operator .attrgetter ('head_td' ), peers )
1040- max_td = max (peers_by_td .keys ())
1041- return random .choice (peers_by_td [max_td ])
1042-
1043- def get_peers (self , min_td : int ) -> List [BasePeer ]:
1044- # TODO: Consider turning this into a method that returns an AsyncIterator, to make it
1045- # harder for callsites to get a list of peers while making blocking calls, as those peers
1046- # might disconnect in the meantime.
1047- peers = tuple (self .connected_nodes .values ())
1048- return [peer for peer in peers if peer .head_td >= min_td ]
1049-
1050938 async def _periodically_report_stats (self ) -> None :
1051939 while self .is_operational :
1052940 inbound_peers = len (
0 commit comments