2020 Dict ,
2121 Iterator ,
2222 List ,
23+ NamedTuple ,
2324 Set ,
2425 TYPE_CHECKING ,
2526 Tuple ,
@@ -209,7 +210,7 @@ async def send_sub_proto_handshake(self) -> None:
209210
210211 @abstractmethod
211212 async def process_sub_proto_handshake (
212- self , cmd : protocol .Command , msg : protocol ._DecodedMsgType ) -> None :
213+ self , cmd : protocol .Command , msg : protocol .PayloadType ) -> None :
213214 raise NotImplementedError ("Must be implemented by subclasses" )
214215
215216 @contextlib .contextmanager
@@ -365,7 +366,7 @@ async def _run(self) -> None:
365366 self .logger .debug ("%s disconnected: %s" , self , e )
366367 return
367368
368- async def read_msg (self ) -> Tuple [protocol .Command , protocol ._DecodedMsgType ]:
369+ async def read_msg (self ) -> Tuple [protocol .Command , protocol .PayloadType ]:
369370 header_data = await self .read (HEADER_LEN + MAC_LEN )
370371 header = self .decrypt_header (header_data )
371372 frame_size = self .get_frame_size (header )
@@ -392,7 +393,7 @@ async def read_msg(self) -> Tuple[protocol.Command, protocol._DecodedMsgType]:
392393 self .received_msgs [cmd ] += 1
393394 return cmd , decoded_msg
394395
395- def handle_p2p_msg (self , cmd : protocol .Command , msg : protocol ._DecodedMsgType ) -> None :
396+ def handle_p2p_msg (self , cmd : protocol .Command , msg : protocol .PayloadType ) -> None :
396397 """Handle the base protocol (P2P) messages."""
397398 if isinstance (cmd , Disconnect ):
398399 msg = cast (Dict [str , Any ], msg )
@@ -406,12 +407,12 @@ def handle_p2p_msg(self, cmd: protocol.Command, msg: protocol._DecodedMsgType) -
406407 else :
407408 raise UnexpectedMessage ("Unexpected msg: {} ({})" .format (cmd , msg ))
408409
409- def handle_sub_proto_msg (self , cmd : protocol .Command , msg : protocol ._DecodedMsgType ) -> None :
410+ def handle_sub_proto_msg (self , cmd : protocol .Command , msg : protocol .PayloadType ) -> None :
410411 cmd_type = type (cmd )
411412
412413 if self ._subscribers :
413414 was_added = tuple (
414- subscriber .add_msg ((self , cmd , msg ))
415+ subscriber .add_msg (PeerMessage (self , cmd , msg ))
415416 for subscriber
416417 in self ._subscribers
417418 )
@@ -424,14 +425,14 @@ def handle_sub_proto_msg(self, cmd: protocol.Command, msg: protocol._DecodedMsgT
424425 else :
425426 self .logger .warn ("Peer %s has no subscribers, discarding %s msg" , self , cmd )
426427
427- def process_msg (self , cmd : protocol .Command , msg : protocol ._DecodedMsgType ) -> None :
428+ def process_msg (self , cmd : protocol .Command , msg : protocol .PayloadType ) -> None :
428429 if cmd .is_base_protocol :
429430 self .handle_p2p_msg (cmd , msg )
430431 else :
431432 self .handle_sub_proto_msg (cmd , msg )
432433
433434 async def process_p2p_handshake (
434- self , cmd : protocol .Command , msg : protocol ._DecodedMsgType ) -> None :
435+ self , cmd : protocol .Command , msg : protocol .PayloadType ) -> None :
435436 msg = cast (Dict [str , Any ], msg )
436437 if not isinstance (cmd , Hello ):
437438 await self .disconnect (DisconnectReason .bad_protocol )
@@ -478,7 +479,7 @@ def decrypt_header(self, data: bytes) -> bytes:
478479 self .ingress_mac .update (sxor (aes , header_ciphertext ))
479480 expected_header_mac = self .ingress_mac .digest ()[:HEADER_LEN ]
480481 if not bytes_eq (expected_header_mac , header_mac ):
481- raise DecryptionError ('Invalid header mac: expected %s , got %s ' .format (
482+ raise DecryptionError ('Invalid header mac: expected {} , got {} ' .format (
482483 expected_header_mac , header_mac ))
483484 return self .aes_dec .update (header_ciphertext )
484485
@@ -563,8 +564,14 @@ def __hash__(self) -> int:
563564 return hash (self .remote )
564565
565566
567+ class PeerMessage (NamedTuple ):
568+ peer : BasePeer
569+ command : protocol .Command
570+ payload : protocol .PayloadType
571+
572+
566573class PeerSubscriber (ABC ):
567- _msg_queue : 'asyncio.Queue[PEER_MSG_TYPE ]' = None
574+ _msg_queue : 'asyncio.Queue[PeerMessage ]' = None
568575
569576 @property
570577 @abstractmethod
@@ -609,7 +616,7 @@ def deregister_peer(self, peer: BasePeer) -> None:
609616 pass
610617
611618 @property
612- def msg_queue (self ) -> 'asyncio.Queue[PEER_MSG_TYPE ]' :
619+ def msg_queue (self ) -> 'asyncio.Queue[PeerMessage ]' :
613620 if self ._msg_queue is None :
614621 self ._msg_queue = asyncio .Queue (maxsize = self .msg_queue_maxsize )
615622 return self ._msg_queue
@@ -618,26 +625,29 @@ def msg_queue(self) -> 'asyncio.Queue[PEER_MSG_TYPE]':
618625 def queue_size (self ) -> int :
619626 return self .msg_queue .qsize ()
620627
621- def add_msg (self , msg : 'PEER_MSG_TYPE' ) -> bool :
628+ def add_msg (self , msg : PeerMessage ) -> bool :
622629 peer , cmd , _ = msg
623630
624631 if not self .is_subscription_command (type (cmd )):
625- self .logger .trace ( # type: ignore
626- "Discarding %s msg from %s; not subscribed to msg type; "
627- "subscriptions: %s" ,
628- cmd , peer , self .subscription_msg_types ,
629- )
632+ if hasattr (self , 'logger' ):
633+ self .logger .trace ( # type: ignore
634+ "Discarding %s msg from %s; not subscribed to msg type; "
635+ "subscriptions: %s" ,
636+ cmd , peer , self .subscription_msg_types ,
637+ )
630638 return False
631639
632640 try :
633- self .logger .trace ( # type: ignore
634- "Adding %s msg from %s to queue; queue_size=%d" , cmd , peer , self .queue_size )
641+ if hasattr (self , 'logger' ):
642+ self .logger .trace ( # type: ignore
643+ "Adding %s msg from %s to queue; queue_size=%d" , cmd , peer , self .queue_size )
635644 self .msg_queue .put_nowait (msg )
636645 return True
637646 except asyncio .queues .QueueFull :
638- self .logger .warn ( # type: ignore
639- "%s msg queue is full; discarding %s msg from %s" ,
640- self .__class__ .__name__ , cmd , peer )
647+ if hasattr (self , 'logger' ):
648+ self .logger .warn ( # type: ignore
649+ "%s msg queue is full; discarding %s msg from %s" ,
650+ self .__class__ .__name__ , cmd , peer )
641651 return False
642652
643653 @contextlib .contextmanager
@@ -663,7 +673,7 @@ class MsgBuffer(PeerSubscriber):
663673 subscription_msg_types = {protocol .Command }
664674
665675 @to_tuple
666- def get_messages (self ) -> Iterator ['PEER_MSG_TYPE' ]:
676+ def get_messages (self ) -> Iterator [PeerMessage ]:
667677 while not self .msg_queue .empty ():
668678 yield self .msg_queue .get_nowait ()
669679
@@ -740,7 +750,7 @@ async def start_peer(self, peer: BasePeer) -> None:
740750
741751 def _add_peer (self ,
742752 peer : BasePeer ,
743- msgs : Tuple [Tuple [protocol .Command , protocol ._DecodedMsgType ], ...]) -> None :
753+ msgs : Tuple [Tuple [protocol .Command , protocol .PayloadType ], ...]) -> None :
744754 """Add the given peer to the pool.
745755
746756 Appart from adding it to our list of connected nodes and adding each of our subscriber's
@@ -753,7 +763,7 @@ def _add_peer(self,
753763 subscriber .register_peer (peer )
754764 peer .add_subscriber (subscriber )
755765 for cmd , msg in msgs :
756- subscriber .add_msg ((peer , cmd , msg ))
766+ subscriber .add_msg (PeerMessage (peer , cmd , msg ))
757767
758768 async def _run (self ) -> None :
759769 # FIXME: PeerPool should probably no longer be a BaseService, but for now we're keeping it
@@ -1006,9 +1016,6 @@ def __init__(self,
10061016 self .genesis_hash = genesis_hash
10071017
10081018
1009- PEER_MSG_TYPE = Tuple [BasePeer , protocol .Command , protocol ._DecodedMsgType ]
1010-
1011-
10121019def _test () -> None :
10131020 """
10141021 Create a Peer instance connected to a local geth instance and log messages exchanged with it.
@@ -1066,8 +1073,8 @@ async def request_stuff() -> None:
10661073 hashes = tuple (header .hash for header in headers )
10671074 if peer_class == ETHPeer :
10681075 peer = cast (ETHPeer , peer )
1069- peer .sub_proto ._send_get_block_bodies (hashes )
1070- peer .sub_proto ._send_get_receipts (hashes )
1076+ peer .sub_proto .send_get_block_bodies (hashes )
1077+ peer .sub_proto .send_get_receipts (hashes )
10711078 else :
10721079 peer = cast (LESPeer , peer )
10731080 request_id = 1
0 commit comments