33 PriorityQueue ,
44)
55from concurrent .futures import CancelledError
6+ import datetime
67import enum
78from functools import (
89 partial ,
1112from typing import (
1213 Dict ,
1314 List ,
15+ NamedTuple ,
1416 Set ,
1517 Tuple ,
1618 Type ,
6264 OrderedTaskPreparation ,
6365 TaskQueue ,
6466)
67+ from trinity .utils .ema import EMA
68+ from trinity .utils .humanize import humanize_elapsed
6569from trinity .utils .timer import Timer
6670
6771# (ReceiptBundle, (Receipt, (root_hash, receipt_trie_data))
@@ -338,6 +342,71 @@ class BlockPersistPrereqs(enum.Enum):
338342 StoreReceipts = enum .auto ()
339343
340344
345+ class ChainSyncStats (NamedTuple ):
346+ prev_head : BlockHeader
347+ latest_head : BlockHeader
348+
349+ elapsed : float
350+
351+ num_blocks : int
352+ blocks_per_second : float
353+
354+ num_transactions : int
355+ transactions_per_second : float
356+
357+
358+ class ChainSyncPerformanceTracker :
359+ def __init__ (self , head : BlockHeader ) -> None :
360+ # The `head` from the previous time we reported stats
361+ self .prev_head = head
362+ # The latest `head` we have synced
363+ self .latest_head = head
364+
365+ # A `Timer` object to report elapsed time between reports
366+ self .timer = Timer ()
367+
368+ # EMA of the blocks per second
369+ self .blocks_per_second_ema = EMA (initial_value = 0 , smoothing_factor = 0.05 )
370+
371+ # EMA of the transactions per second
372+ self .transactions_per_second_ema = EMA (initial_value = 0 , smoothing_factor = 0.05 )
373+
374+ # Number of transactions processed
375+ self .num_transactions = 0
376+
377+ def record_transactions (self , count : int ) -> None :
378+ self .num_transactions += count
379+
380+ def set_latest_head (self , head : BlockHeader ) -> None :
381+ self .latest_head = head
382+
383+ def report (self ) -> ChainSyncStats :
384+ elapsed = self .timer .pop_elapsed ()
385+
386+ num_blocks = self .latest_head .block_number - self .prev_head .block_number
387+ blocks_per_second = num_blocks / elapsed
388+ transactions_per_second = self .num_transactions / elapsed
389+
390+ self .blocks_per_second_ema .update (blocks_per_second )
391+ self .transactions_per_second_ema .update (transactions_per_second )
392+
393+ stats = ChainSyncStats (
394+ prev_head = self .prev_head ,
395+ latest_head = self .latest_head ,
396+ elapsed = elapsed ,
397+ num_blocks = num_blocks ,
398+ blocks_per_second = self .blocks_per_second_ema .value ,
399+ num_transactions = self .num_transactions ,
400+ transactions_per_second = self .transactions_per_second_ema .value ,
401+ )
402+
403+ # reset the counters
404+ self .num_transactions = 0
405+ self .prev_head = self .latest_head
406+
407+ return stats
408+
409+
341410class FastChainSyncer (BaseBodyChainSyncer ):
342411 """
343412 Sync with the Ethereum network by fetching block headers/bodies and storing them in our DB.
@@ -374,6 +443,8 @@ def __init__(self,
374443
375444 async def _run (self ) -> None :
376445 head = await self .wait (self .db .coro_get_canonical_head ())
446+ self .tracker = ChainSyncPerformanceTracker (head )
447+
377448 self ._block_persist_tracker .set_finished_dependency (head )
378449 self .run_daemon_task (self ._launch_prerequisite_tasks ())
379450 self .run_daemon_task (self ._assign_receipt_download_to_peers ())
@@ -445,9 +516,6 @@ async def _launch_prerequisite_tasks(self) -> None:
445516 self .header_queue .complete (batch_id , headers )
446517
447518 async def _display_stats (self ) -> None :
448- last_head = await self .wait (self .db .coro_get_canonical_head ())
449- timer = Timer ()
450-
451519 while self .is_operational :
452520 await self .sleep (5 )
453521 self .logger .debug (
@@ -459,16 +527,29 @@ async def _display_stats(self) -> None:
459527 )],
460528 )
461529
462- head = await self .wait (self .db .coro_get_canonical_head ())
463- if head == last_head :
464- continue
465- else :
466- block_num_change = head .block_number - last_head .block_number
467- last_head = head
468-
469- self .logger .info (
470- "Advanced by %d blocks in %0.1f seconds, new head: %s" ,
471- block_num_change , timer .pop_elapsed (), head )
530+ stats = self .tracker .report ()
531+ utcnow = int (datetime .datetime .utcnow ().timestamp ())
532+ head_age = utcnow - stats .latest_head .timestamp
533+ self .logger .info (
534+ (
535+ "blks=%-4d "
536+ "txs=%-5d "
537+ "bps=%-3d "
538+ "tps=%-4d "
539+ "elapsed=%0.1f "
540+ "head=#%d (%s\u2026 %s) "
541+ "age=%s"
542+ ),
543+ stats .num_blocks ,
544+ stats .num_transactions ,
545+ stats .blocks_per_second ,
546+ stats .transactions_per_second ,
547+ stats .elapsed ,
548+ stats .latest_head .block_number ,
549+ stats .latest_head .hex_hash [2 :6 ],
550+ stats .latest_head .hex_hash [- 4 :],
551+ humanize_elapsed (head_age ),
552+ )
472553
473554 async def _persist_ready_blocks (self ) -> None :
474555 """
@@ -514,8 +595,12 @@ async def _persist_blocks(self, headers: Tuple[BlockHeader, ...]) -> None:
514595 tx_class = block_class .get_transaction_class ()
515596 transactions = [tx_class .from_base_transaction (tx ) for tx in body .transactions ]
516597
598+ # record progress in the tracker
599+ self .tracker .record_transactions (len (transactions ))
600+
517601 block = block_class (header , transactions , uncles )
518602 await self .wait (self .db .coro_persist_block (block ))
603+ self .tracker .set_latest_head (header )
519604
520605 async def _assign_receipt_download_to_peers (self ) -> None :
521606 """
0 commit comments