11import asyncio
2+ from functools import (
3+ partial ,
4+ )
25from typing import (
36 Any ,
47 Callable ,
2427)
2528
2629from trie import HexaryTrie
30+ from trie .exceptions import BadTrieProof
2731
2832from evm .exceptions import (
2933 BlockNotFound ,
3539
3640from p2p .exceptions import (
3741 BadLESResponse ,
42+ NoConnectedPeers ,
43+ NoEligiblePeers ,
3844)
3945from p2p .cancel_token import CancelToken
4046from p2p import protocol
41- from p2p .constants import REPLY_TIMEOUT
47+ from p2p .constants import (
48+ COMPLETION_TIMEOUT ,
49+ MAX_REORG_DEPTH ,
50+ MAX_REQUEST_ATTEMPTS ,
51+ REPLY_TIMEOUT ,
52+ )
53+ from p2p .p2p_proto import (
54+ DisconnectReason ,
55+ )
4256from p2p .peer import (
4357 LESPeer ,
4458 PeerPool ,
4761from p2p .rlp import BlockBody
4862from p2p .service import (
4963 BaseService ,
64+ service_timeout ,
5065)
5166from p2p .utils import gen_request_id
5267
@@ -103,11 +118,22 @@ def callback(r: protocol._DecodedMsgType) -> None:
103118 return cast (Dict [str , Any ], reply )
104119
105120 @alru_cache (maxsize = 1024 , cache_exceptions = False )
121+ @service_timeout (COMPLETION_TIMEOUT )
106122 async def get_block_header_by_hash (self , block_hash : Hash32 ) -> BlockHeader :
107- peer = cast (LESPeer , self .peer_pool .highest_td_peer )
108- return await self ._get_block_header_by_hash (peer , block_hash )
123+ """
124+ :param block_hash: hash of the header to retrieve
125+
126+ :return: header returned by peer
127+
128+ :raise NoEligiblePeers: if no peers are available to fulfill the request
129+ :raise TimeoutError: if an individual request or the overall process times out
130+ """
131+ return await self ._retry_on_bad_response (
132+ partial (self ._get_block_header_by_hash , block_hash )
133+ )
109134
110135 @alru_cache (maxsize = 1024 , cache_exceptions = False )
136+ @service_timeout (COMPLETION_TIMEOUT )
111137 async def get_block_body_by_hash (self , block_hash : Hash32 ) -> BlockBody :
112138 peer = cast (LESPeer , self .peer_pool .highest_td_peer )
113139 self .logger .debug ("Fetching block %s from %s" , encode_hex (block_hash ), peer )
@@ -121,6 +147,7 @@ async def get_block_body_by_hash(self, block_hash: Hash32) -> BlockBody:
121147 # TODO add a get_receipts() method to BaseChain API, and dispatch to this, as needed
122148
123149 @alru_cache (maxsize = 1024 , cache_exceptions = False )
150+ @service_timeout (COMPLETION_TIMEOUT )
124151 async def get_receipts (self , block_hash : Hash32 ) -> List [Receipt ]:
125152 peer = cast (LESPeer , self .peer_pool .highest_td_peer )
126153 self .logger .debug ("Fetching %s receipts from %s" , encode_hex (block_hash ), peer )
@@ -135,25 +162,158 @@ async def get_receipts(self, block_hash: Hash32) -> List[Receipt]:
135162 # request accounts and code (and storage?)
136163
137164 @alru_cache (maxsize = 1024 , cache_exceptions = False )
165+ @service_timeout (COMPLETION_TIMEOUT )
138166 async def get_account (self , block_hash : Hash32 , address : Address ) -> Account :
139- peer = cast (LESPeer , self .peer_pool .highest_td_peer )
167+ return await self ._retry_on_bad_response (
168+ partial (self ._get_account_from_peer , block_hash , address )
169+ )
170+
171+ async def _get_account_from_peer (
172+ self ,
173+ block_hash : Hash32 ,
174+ address : Address ,
175+ peer : LESPeer ) -> Account :
140176 key = keccak (address )
141177 proof = await self ._get_proof (peer , block_hash , account_key = b'' , key = key )
142- header = await self ._get_block_header_by_hash (peer , block_hash )
143- rlp_account = HexaryTrie .get_from_proof (header .state_root , key , proof )
178+ header = await self ._get_block_header_by_hash (block_hash , peer )
179+ try :
180+ rlp_account = HexaryTrie .get_from_proof (header .state_root , key , proof )
181+ except BadTrieProof as exc :
182+ raise BadLESResponse ("Peer %s returned an invalid proof for account %s at block %s" % (
183+ peer ,
184+ encode_hex (address ),
185+ encode_hex (block_hash ),
186+ )) from exc
144187 return rlp .decode (rlp_account , sedes = Account )
145188
146189 @alru_cache (maxsize = 1024 , cache_exceptions = False )
147- async def get_contract_code (self , block_hash : Hash32 , key : bytes ) -> bytes :
148- peer = cast (LESPeer , self .peer_pool .highest_td_peer )
190+ @service_timeout (COMPLETION_TIMEOUT )
191+ async def get_contract_code (self , block_hash : Hash32 , address : Address ) -> bytes :
192+ """
193+ :param block_hash: find code as of the block with block_hash
194+ :param address: which contract to look up
195+
196+ :return: bytecode of the contract, ``b''`` if no code is set
197+
198+ :raise NoEligiblePeers: if no peers are available to fulfill the request
199+ :raise TimeoutError: if an individual request or the overall process times out
200+ """
201+ # get account for later verification, and
202+ # to confirm that our highest total difficulty peer has the info
203+ try :
204+ account = await self .get_account (block_hash , address )
205+ except HeaderNotFound as exc :
206+ raise NoEligiblePeers ("Our best peer does not have header %s" % block_hash ) from exc
207+
208+ code_hash = account .code_hash
209+
210+ return await self ._retry_on_bad_response (
211+ partial (self ._get_contract_code_from_peer , block_hash , address , code_hash )
212+ )
213+
214+ async def _get_contract_code_from_peer (
215+ self ,
216+ block_hash : Hash32 ,
217+ address : Address ,
218+ code_hash : Hash32 ,
219+ peer : LESPeer ) -> bytes :
220+ """
221+ A single attempt to get the contract code from the given peer
222+
223+ :raise BadLESResponse: if the peer replies with contract code that does not match the
224+ account's code hash
225+ """
226+ # request contract code
149227 request_id = gen_request_id ()
150- peer .sub_proto .send_get_contract_code (block_hash , key , request_id )
228+ peer .sub_proto .send_get_contract_code (block_hash , keccak ( address ) , request_id )
151229 reply = await self ._wait_for_reply (request_id )
230+
152231 if not reply ['codes' ]:
153- return b''
154- return reply ['codes' ][0 ]
232+ bytecode = b''
233+ else :
234+ bytecode = reply ['codes' ][0 ]
235+
236+ # validate bytecode against a proven account
237+ if code_hash == keccak (bytecode ):
238+ return bytecode
239+ elif bytecode == b'' :
240+ await self ._raise_for_empty_code (block_hash , address , code_hash , peer )
241+ # The following is added for mypy linting:
242+ raise RuntimeError ("Unreachable, _raise_for_empty_code must raise its own exception" )
243+ else :
244+ # a bad-acting peer sent an invalid non-empty bytecode
245+ raise BadLESResponse ("Peer %s sent code %s that did not match hash %s in account %s" % (
246+ peer ,
247+ encode_hex (bytecode ),
248+ encode_hex (code_hash ),
249+ encode_hex (address ),
250+ ))
251+
252+ async def _raise_for_empty_code (
253+ self ,
254+ block_hash : Hash32 ,
255+ address : Address ,
256+ code_hash : Hash32 ,
257+ peer : LESPeer ) -> None :
258+ """
259+ A peer might return b'' if it doesn't have the block at the requested header,
260+ or it might maliciously return b'' when the code is non-empty. This method tries to tell the
261+ difference.
262+
263+ This method MUST raise an exception, it's trying to determine the appropriate one.
264+
265+ :raise BadLESResponse: if peer seems to be maliciously responding with invalid empty code
266+ :raise NoEligiblePeers: if peer might simply not have the code available
267+ """
268+ try :
269+ header = await self ._get_block_header_by_hash (block_hash , peer )
270+ except HeaderNotFound :
271+ # We presume that the current peer is the best peer. Because
272+ # our best peer doesn't have the header we want, there are no eligible peers.
273+ raise NoEligiblePeers ("Our best peer does not have the header %s" % block_hash )
155274
156- async def _get_block_header_by_hash (self , peer : LESPeer , block_hash : Hash32 ) -> BlockHeader :
275+ head_number = peer .head_info .block_number
276+ if head_number - header .block_number > MAX_REORG_DEPTH :
277+ # The peer claims to be far ahead of the header we requested
278+ if self .headerdb .get_canonical_block_hash (header .block_number ) == block_hash :
279+ # Our node believes that the header at the reference hash is canonical,
280+ # so treat the peer as malicious
281+ raise BadLESResponse (
282+ "Peer %s sent empty code that did not match hash %s in account %s" % (
283+ peer ,
284+ encode_hex (code_hash ),
285+ encode_hex (address ),
286+ )
287+ )
288+ else :
289+ # our header isn't canonical, so treat the empty response as missing data
290+ raise NoEligiblePeers (
291+ "Our best peer does not have the non-canonical header %s" % block_hash
292+ )
293+ elif head_number - header .block_number < 0 :
294+ # The peer claims to be behind the header we requested, but somehow served it to us.
295+ # Odd, it might be a race condition. Treat as if there are no eligible peers for now.
296+ raise NoEligiblePeers ("Our best peer's head does include header %s" % block_hash )
297+ else :
298+ # The peer is ahead of the current block header, but only by a bit. It might be on
299+ # an uncle, or we might be. So we can't tell the difference between missing and
300+ # malicious. We don't want to aggressively drop this peer, so treat the code as missing.
301+ raise NoEligiblePeers (
302+ "Peer %s claims to be ahead of %s, but returned empty code with hash %s. "
303+ "It is on number %d, maybe an uncle. Retry with an older block hash." % (
304+ peer ,
305+ header ,
306+ code_hash ,
307+ head_number ,
308+ )
309+ )
310+
311+ async def _get_block_header_by_hash (self , block_hash : Hash32 , peer : LESPeer ) -> BlockHeader :
312+ """
313+ A single attempt to get the block header from the given peer.
314+
315+ :raise BadLESResponse: if the peer replies with a header that has a different hash
316+ """
157317 self .logger .debug ("Fetching header %s from %s" , encode_hex (block_hash ), peer )
158318 request_id = gen_request_id ()
159319 max_headers = 1
@@ -178,3 +338,27 @@ async def _get_proof(self,
178338 peer .sub_proto .send_get_proof (block_hash , account_key , key , from_level , request_id )
179339 reply = await self ._wait_for_reply (request_id )
180340 return reply ['proof' ]
341+
342+ async def _retry_on_bad_response (self , make_request_to_peer : Callable [[LESPeer ], Any ]) -> Any :
343+ """
344+ Make a call to a peer. If it behaves badly, drop it and retry with a different peer.
345+
346+ :param make_request_to_peer: an abstract call to a peer that may raise a BadLESResponse
347+
348+ :raise NoEligiblePeers: if no peers are available to fulfill the request
349+ :raise TimeoutError: if an individual request or the overall process times out
350+ """
351+ for _ in range (MAX_REQUEST_ATTEMPTS ):
352+ try :
353+ peer = cast (LESPeer , self .peer_pool .highest_td_peer )
354+ except NoConnectedPeers as exc :
355+ raise NoEligiblePeers () from exc
356+
357+ try :
358+ return await make_request_to_peer (peer )
359+ except BadLESResponse as exc :
360+ self .logger .warn ("Disconnecting from peer, because: %s" , exc )
361+ await peer .disconnect (DisconnectReason .subprotocol_error )
362+ # reattempt after removing this peer from our pool
363+
364+ raise TimeoutError ("Could not complete peer request in %d attempts" % MAX_REQUEST_ATTEMPTS )
0 commit comments