44from typing import (
55 AsyncIterator ,
66 Iterator ,
7+ Optional ,
78 Tuple ,
89 Type ,
910)
1011
11- from cancel_token import CancelToken , OperationCancelled
12+ from cancel_token import (
13+ CancelToken ,
14+ OperationCancelled ,
15+ )
1216
1317from eth .constants import GENESIS_BLOCK_NUMBER
1418from eth .chains import AsyncChain
2327 encode_hex ,
2428 ValidationError ,
2529)
26- from eth .rlp .headers import BlockHeader
30+ from eth .rlp .headers import (
31+ BlockHeader ,
32+ )
33+
34+ from p2p .constants import (
35+ MAX_REORG_DEPTH ,
36+ SEAL_CHECK_RANDOM_SAMPLE_RATE ,
37+ )
38+ from p2p .p2p_proto import (
39+ DisconnectReason ,
40+ )
41+ from p2p .service import (
42+ BaseService ,
43+ )
2744
28- from p2p .constants import MAX_REORG_DEPTH , SEAL_CHECK_RANDOM_SAMPLE_RATE
29- from p2p .p2p_proto import DisconnectReason
30- from p2p .service import BaseService
45+ from trinity .db .header import (
46+ AsyncHeaderDB ,
47+ )
48+ from trinity .protocol .common .monitors import (
49+ BaseChainTipMonitor ,
50+ )
51+ from trinity .protocol .common .peer import (
52+ BaseChainPeer ,
53+ BaseChainPeerPool ,
54+ )
55+ from trinity .protocol .eth .peer import (
56+ ETHPeer ,
57+ )
58+ from trinity .sync .common .events import (
59+ SyncingRequest ,
60+ SyncingResponse ,
61+ )
62+ from trinity .utils .datastructures import (
63+ TaskQueue ,
64+ )
3165
32- from trinity .db .header import AsyncHeaderDB
33- from trinity .protocol .common .monitors import BaseChainTipMonitor
34- from trinity .protocol .common .peer import BaseChainPeer , BaseChainPeerPool
35- from trinity .protocol .eth .peer import ETHPeer
36- from trinity .utils .datastructures import TaskQueue
66+ from .types import SyncProgress
3767
3868
3969class BaseHeaderChainSyncer (BaseService ):
@@ -81,6 +111,8 @@ def tip_monitor_class(self) -> Type[BaseChainTipMonitor]:
81111
82112 async def _run (self ) -> None :
83113 self .run_daemon (self ._tip_monitor )
114+ if self .peer_pool .event_bus is not None :
115+ self .run_daemon_task (self .handle_sync_status_requests ())
84116 try :
85117 async for highest_td_peer in self ._tip_monitor .wait_tip_info ():
86118 self .run_task (self .sync (highest_td_peer ))
@@ -134,6 +166,16 @@ async def sync(self, peer: BaseChainPeer) -> None:
134166 new_headers = tuple (h for h in header_batch if h not in self .header_queue )
135167 await self .wait (self .header_queue .add (new_headers ))
136168
169+ def get_sync_status (self ) -> Tuple [bool , Optional [SyncProgress ]]:
170+ if not self ._syncing :
171+ return False , None
172+ return True , self ._peer_header_syncer .sync_progress
173+
174+ async def handle_sync_status_requests (self ) -> None :
175+ async for req in self .peer_pool .event_bus .stream (SyncingRequest ):
176+ self .peer_pool .event_bus .broadcast (SyncingResponse (* self .get_sync_status ()),
177+ req .broadcast_config ())
178+
137179
138180class PeerHeaderSyncer (BaseService ):
139181 """
@@ -152,6 +194,7 @@ def __init__(self,
152194 super ().__init__ (token )
153195 self .chain = chain
154196 self .db = db
197+ self .sync_progress : SyncProgress = None
155198 self ._peer = peer
156199 self ._target_header_hash = peer .head_hash
157200
@@ -183,7 +226,7 @@ async def next_header_batch(self) -> AsyncIterator[Tuple[BlockHeader, ...]]:
183226 self .logger .debug (
184227 "%s announced Head TD %d, which is higher than ours (%d), starting sync" ,
185228 peer , peer .head_td , head_td )
186-
229+ self . sync_progress = SyncProgress ( head . block_number , head . block_number , peer . head_number )
187230 self .logger .info ("Starting sync with %s" , peer )
188231 last_received_header : BlockHeader = None
189232 # When we start the sync with a peer, we always request up to MAX_REORG_DEPTH extra
@@ -306,6 +349,9 @@ async def next_header_batch(self) -> AsyncIterator[Tuple[BlockHeader, ...]]:
306349
307350 yield headers
308351 last_received_header = headers [- 1 ]
352+ self .sync_progress = self .sync_progress .update_current_block (
353+ last_received_header .block_number ,
354+ )
309355 start_at = last_received_header .block_number + 1
310356
311357 async def _request_headers (
0 commit comments