|
1 | | -from abc import abstractmethod |
2 | | -from contextlib import contextmanager |
3 | | -from operator import attrgetter |
4 | 1 | from typing import ( |
5 | 2 | AsyncIterator, |
6 | | - Iterator, |
7 | | - Optional, |
8 | 3 | Tuple, |
9 | | - Type, |
10 | 4 | ) |
11 | 5 |
|
12 | 6 | from cancel_token import ( |
|
43 | 37 |
|
44 | 38 | from trinity.chains.base import BaseAsyncChain |
45 | 39 | from trinity.db.header import AsyncHeaderDB |
46 | | -from trinity.protocol.common.monitors import BaseChainTipMonitor |
47 | | -from trinity.protocol.common.peer import BaseChainPeer, BaseChainPeerPool |
48 | | -from trinity.protocol.eth.peer import ETHPeer |
49 | | -from trinity.sync.common.events import SyncingRequest, SyncingResponse |
50 | | -from trinity.utils.datastructures import TaskQueue |
| 40 | +from trinity.protocol.common.peer import ( |
| 41 | + BaseChainPeer, |
| 42 | +) |
51 | 43 |
|
52 | 44 | from .types import SyncProgress |
53 | 45 |
|
54 | 46 |
|
55 | | -class BaseHeaderChainSyncer(BaseService): |
56 | | - """ |
57 | | - Sync with the Ethereum network by fetching/storing block headers. |
58 | | -
|
59 | | - Here, the run() method will execute the sync loop until our local head is the same as the one |
60 | | - with the highest TD announced by any of our peers. |
61 | | - """ |
62 | | - # We'll only sync if we are connected to at least min_peers_to_sync. |
63 | | - min_peers_to_sync = 1 |
64 | | - # the latest header hash of the peer on the current sync |
65 | | - header_queue: TaskQueue[BlockHeader] |
66 | | - |
67 | | - def __init__(self, |
68 | | - chain: BaseAsyncChain, |
69 | | - db: AsyncHeaderDB, |
70 | | - peer_pool: BaseChainPeerPool, |
71 | | - token: CancelToken = None) -> None: |
72 | | - super().__init__(token) |
73 | | - self.chain = chain |
74 | | - self.db = db |
75 | | - self.peer_pool = peer_pool |
76 | | - self._peer_header_syncer: 'PeerHeaderSyncer' = None |
77 | | - self._last_target_header_hash: Hash32 = None |
78 | | - self._tip_monitor = self.tip_monitor_class(peer_pool, token=self.cancel_token) |
79 | | - |
80 | | - # pending queue size should be big enough to avoid starving the processing consumers, but |
81 | | - # small enough to avoid wasteful over-requests before post-processing can happen |
82 | | - max_pending_headers = ETHPeer.max_headers_fetch * 8 |
83 | | - self.header_queue = TaskQueue(max_pending_headers, attrgetter('block_number')) |
84 | | - |
85 | | - def get_target_header_hash(self) -> Hash32: |
86 | | - if self._peer_header_syncer is None and self._last_target_header_hash is None: |
87 | | - raise ValidationError("Cannot check the target hash before a sync has run") |
88 | | - elif self._peer_header_syncer is not None: |
89 | | - return self._peer_header_syncer.get_target_header_hash() |
90 | | - else: |
91 | | - return self._last_target_header_hash |
92 | | - |
93 | | - @property |
94 | | - @abstractmethod |
95 | | - def tip_monitor_class(self) -> Type[BaseChainTipMonitor]: |
96 | | - pass |
97 | | - |
98 | | - async def _run(self) -> None: |
99 | | - self.run_daemon(self._tip_monitor) |
100 | | - if self.peer_pool.event_bus is not None: |
101 | | - self.run_daemon_task(self.handle_sync_status_requests()) |
102 | | - try: |
103 | | - async for highest_td_peer in self._tip_monitor.wait_tip_info(): |
104 | | - self.run_task(self.sync(highest_td_peer)) |
105 | | - except OperationCancelled: |
106 | | - # In the case of a fast sync, we return once the sync is completed, and our |
107 | | - # caller must then run the StateDownloader. |
108 | | - return |
109 | | - else: |
110 | | - self.logger.debug("chain tip monitor stopped returning tip info to %s", self) |
111 | | - |
112 | | - @property |
113 | | - def _syncing(self) -> bool: |
114 | | - return self._peer_header_syncer is not None |
115 | | - |
116 | | - @contextmanager |
117 | | - def _get_peer_header_syncer(self, peer: BaseChainPeer) -> Iterator['PeerHeaderSyncer']: |
118 | | - if self._syncing: |
119 | | - raise ValidationError("Cannot sync headers from two peers at the same time") |
120 | | - |
121 | | - self._peer_header_syncer = PeerHeaderSyncer( |
122 | | - self.chain, |
123 | | - self.db, |
124 | | - peer, |
125 | | - self.cancel_token, |
126 | | - ) |
127 | | - self.run_child_service(self._peer_header_syncer) |
128 | | - try: |
129 | | - yield self._peer_header_syncer |
130 | | - except OperationCancelled: |
131 | | - pass |
132 | | - else: |
133 | | - self._peer_header_syncer.cancel_nowait() |
134 | | - finally: |
135 | | - self.logger.info("Header Sync with %s ended", peer) |
136 | | - self._last_target_header_hash = self._peer_header_syncer.get_target_header_hash() |
137 | | - self._peer_header_syncer = None |
138 | | - |
139 | | - async def sync(self, peer: BaseChainPeer) -> None: |
140 | | - if self._syncing: |
141 | | - self.logger.debug( |
142 | | - "Got a NewBlock or a new peer, but already syncing so doing nothing") |
143 | | - return |
144 | | - elif len(self.peer_pool) < self.min_peers_to_sync: |
145 | | - self.logger.info( |
146 | | - "Connected to less peers (%d) than the minimum (%d) required to sync, " |
147 | | - "doing nothing", len(self.peer_pool), self.min_peers_to_sync) |
148 | | - return |
149 | | - |
150 | | - with self._get_peer_header_syncer(peer) as syncer: |
151 | | - async for header_batch in syncer.next_header_batch(): |
152 | | - new_headers = tuple(h for h in header_batch if h not in self.header_queue) |
153 | | - await self.wait(self.header_queue.add(new_headers)) |
154 | | - |
155 | | - def get_sync_status(self) -> Tuple[bool, Optional[SyncProgress]]: |
156 | | - if not self._syncing: |
157 | | - return False, None |
158 | | - return True, self._peer_header_syncer.sync_progress |
159 | | - |
160 | | - async def handle_sync_status_requests(self) -> None: |
161 | | - async for req in self.peer_pool.event_bus.stream(SyncingRequest): |
162 | | - self.peer_pool.event_bus.broadcast(SyncingResponse(*self.get_sync_status()), |
163 | | - req.broadcast_config()) |
164 | | - |
165 | | - |
166 | 47 | class PeerHeaderSyncer(BaseService): |
167 | 48 | """ |
168 | 49 | Sync as many headers as possible with a given peer. |
|
0 commit comments