Skip to content

Commit 6615373

Browse files
authored
Beacon sync maint update (#3520)
* Move thread switch nano-sleep into `importBlock()` function why That way it becomes easier to capture internal states for trace/replay. Previously, the nano-sleep thread switch was stated after `importBlock()` as a separate directive. So there is no functional change for the production environment. * Move async idle times from `runDaemon()` and `runPeer()` to scheduler why The sleep function extends the minimum idle wait imposed by the scheduler if there is nothing to do. By moving it there, the `runPeer()` directive becomes more environment capture friendly for trace/replace. The functionality of the production system is not affected. * Dissolve generic async functions as templates why This change is inspired by trace/replay considerations. For the production system, this change reduces the number of *floating* async closures which are unnecessary and reduces the `chronos` overhead. These *floating* async closures appear when writing nested functions (rather than a big one) where some leaf functions call async closures as it is common with network communication. * Move out most metrics gauge handling to dedicated source files why Reduces latency when updated right with the state/counter changes. todo Move some gauges to `FC` module
1 parent 35b4db5 commit 6615373

18 files changed

+924
-814
lines changed

execution_chain/core/chain/header_chain_cache.nim

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,8 @@
6060
{.push raises:[].}
6161

6262
import
63+
pkg/[chronicles, metrics, stew/endians2],
6364
pkg/eth/[common, rlp],
64-
pkg/results,
65-
pkg/chronicles,
66-
pkg/stew/endians2,
6765
"../.."/[common, db/core_db, db/storage_types],
6866
../../db/[kvt, kvt_cf],
6967
../../db/kvt/[kvt_utils, kvt_tx_frame],
@@ -73,6 +71,12 @@ import
7371
logScope:
7472
topics = "hc-cache"
7573

74+
declareGauge nec_sync_dangling, "" &
75+
"Least block number for header chain already fetched"
76+
77+
declareGauge nec_sync_consensus_head, "" &
78+
"Block number of latest consensus head"
79+
7680
type
7781
HccDbInfo = object
7882
## For database table storage and clean up
@@ -318,13 +322,15 @@ proc headUpdateFromCL(hc: HeaderChainRef; h: Header; f: Hash32) =
318322
headHash: h.computeBlockHash())
319323

320324
hc.kvt.putHeader(h)
325+
metrics.set(nec_sync_dangling, h.number.int64)
321326

322327
# Inform client app about that a new session has started.
323328
hc.notify()
324329
hc.chain.pendingFCU = f
325330

326331
# For logging and metrics
327332
hc.session.consHeadNum = h.number
333+
metrics.set(nec_sync_consensus_head, h.number.int64)
328334

329335
# ------------------------------------------------------------------------------
330336
# Public constructor/destructor et al.
@@ -349,9 +355,10 @@ proc clear*(hc: HeaderChainRef) =
349355
## accepted session (via `accept()`) or a mere notified session (via `notify`
350356
## call back argument from `start()`.)
351357
##
352-
hc.session.reset # clear session state object
353-
hc.persistClear() # clear database
354-
358+
hc.session.reset # clear session state object
359+
hc.persistClear() # clear database
360+
metrics.set(nec_sync_consensus_head, 0) # clear metrics register
361+
metrics.set(nec_sync_dangling, 0) # ditto
355362

356363
proc stop*(hc: HeaderChainRef) =
357364
## Stop updating the client cache. Will automatically be called by the
@@ -543,6 +550,7 @@ proc put*(
543550

544551
# Set new antecedent `ante` and save to disk (if any)
545552
hc.session.ante = rev[revTopInx]
553+
metrics.set(nec_sync_dangling, hc.session.ante.number.int64)
546554

547555
# Save updates. persist to DB
548556
hc.persistInfo()
@@ -582,6 +590,7 @@ proc commit*(hc: HeaderChainRef): Result[void,string] =
582590
if hc.chain.hashToBlock.hasKey(hc.chain.pendingFCU):
583591
hc.session.ante = fin
584592
hc.session.mode = locked # update internal state
593+
metrics.set(nec_sync_dangling, fin.number.int64)
585594
return ok()
586595

587596
# Impossible situation!

execution_chain/sync/beacon.nim

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ proc runSetup(ctx: BeaconCtxRef): bool =
3535
proc runRelease(ctx: BeaconCtxRef) =
3636
worker.release(ctx, "RunRelease")
3737

38-
proc runDaemon(ctx: BeaconCtxRef) {.async: (raises: []).} =
39-
await worker.runDaemon(ctx, "RunDaemon")
38+
proc runDaemon(ctx: BeaconCtxRef): Future[Duration] {.async: (raises: []).} =
39+
return worker.runDaemon(ctx, "RunDaemon")
4040

4141
proc runTicker(ctx: BeaconCtxRef) =
4242
worker.runTicker(ctx, "RunTicker")
@@ -50,8 +50,8 @@ proc runStop(buddy: BeaconBuddyRef) =
5050
proc runPool(buddy: BeaconBuddyRef; last: bool; laps: int): bool =
5151
worker.runPool(buddy, last, laps, "RunPool")
5252

53-
proc runPeer(buddy: BeaconBuddyRef) {.async: (raises: []).} =
54-
await worker.runPeer(buddy, "RunPeer")
53+
proc runPeer(buddy: BeaconBuddyRef): Future[Duration] {.async: (raises: []).} =
54+
return worker.runPeer(buddy, "RunPeer")
5555

5656
# ------------------------------------------------------------------------------
5757
# Public functions

execution_chain/sync/beacon/worker.nim

Lines changed: 78 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -24,21 +24,13 @@ import
2424
# Private functions
2525
# ------------------------------------------------------------------------------
2626

27-
proc napUnlessSomethingToCollect(
28-
buddy: BeaconBuddyRef;
29-
): Future[bool] {.async: (raises: []).} =
30-
## When idle, save cpu cycles waiting for something to do.
31-
if buddy.ctx.hibernate or # not activated yet?
32-
not (buddy.headersCollectOk() or # something on TODO list
33-
buddy.blocksCollectOk()):
34-
try:
35-
await sleepAsync workerIdleWaitInterval
36-
except CancelledError:
37-
buddy.ctrl.stopped = true
38-
return true
39-
else:
40-
# Returning `false` => no need to check for shutdown
27+
proc somethingToCollect(buddy: BeaconBuddyRef): bool =
28+
if buddy.ctx.hibernate: # not activated yet?
4129
return false
30+
if buddy.headersCollectOk() or # something on TODO list
31+
buddy.blocksCollectOk():
32+
return true
33+
false
4234

4335
# ------------------------------------------------------------------------------
4436
# Public start/stop and admin functions
@@ -107,35 +99,39 @@ proc runTicker*(ctx: BeaconCtxRef; info: static[string]) =
10799
ctx.updateTicker()
108100

109101

110-
proc runDaemon*(
111-
ctx: BeaconCtxRef;
112-
info: static[string];
113-
) {.async: (raises: []).} =
102+
template runDaemon*(ctx: BeaconCtxRef; info: static[string]): Duration =
103+
## Async/template
104+
##
114105
## Global background job that will be re-started as long as the variable
115106
## `ctx.daemon` is set `true` which corresponds to `ctx.hibernating` set
116107
## to false.
117108
##
118109
## On a fresh start, the flag `ctx.daemon` will not be set `true` before the
119110
## first usable request from the CL (via RPC) stumbles in.
120111
##
121-
# Check for a possible header layout and body request changes
122-
ctx.updateSyncState info
123-
if ctx.hibernate:
124-
return
112+
## The template returns a suggested idle time for after this task.
113+
##
114+
var bodyRc = chronos.nanoseconds(0)
115+
block body:
116+
# Check for a possible header layout and body request changes
117+
ctx.updateSyncState info
118+
if ctx.hibernate:
119+
break body # return
120+
121+
# Execute staged block records.
122+
if ctx.blocksUnstageOk():
125123

126-
# Execute staged block records.
127-
if ctx.blocksUnstageOk():
124+
# Import bodies from the `staged` queue.
125+
discard ctx.blocksUnstage info # async/template
128126

129-
# Import bodies from the `staged` queue.
130-
discard await ctx.blocksUnstage info
127+
if not ctx.daemon or # Implied by external sync shutdown?
128+
ctx.poolMode: # Oops, re-org needed?
129+
break body # return
131130

132-
if not ctx.daemon or # Implied by external sync shutdown?
133-
ctx.poolMode: # Oops, re-org needed?
134-
return
131+
# # At the end of the cycle, leave time to trigger refill headers/blocks
132+
bodyRc = daemonWaitInterval
135133

136-
# At the end of the cycle, leave time to trigger refill headers/blocks
137-
try: await sleepAsync daemonWaitInterval
138-
except CancelledError: discard
134+
bodyRc
139135

140136

141137
proc runPool*(
@@ -163,47 +159,59 @@ proc runPool*(
163159
true # stop
164160

165161

166-
proc runPeer*(
167-
buddy: BeaconBuddyRef;
168-
info: static[string];
169-
) {.async: (raises: []).} =
162+
template runPeer*(buddy: BeaconBuddyRef; info: static[string]): Duration =
163+
## Async/template
164+
##
170165
## This peer worker method is repeatedly invoked (exactly one per peer) while
171166
## the `buddy.ctrl.poolMode` flag is set `false`.
172167
##
173-
if not await buddy.napUnlessSomethingToCollect():
174-
175-
# Download and process headers and blocks
176-
while buddy.headersCollectOk():
177-
178-
# Collect headers and either stash them on the header chain cache
179-
# directly, or stage on the header queue to get them serialised and
180-
# stashed, later.
181-
await buddy.headersCollect info
182-
183-
# Store serialised headers from the `staged` queue onto the header
184-
# chain cache.
185-
if not buddy.headersUnstage info:
186-
# Need to proceed with another peer (e.g. gap between queue and
187-
# header chain cache.)
188-
break
189-
190-
# End `while()`
191-
192-
# Fetch bodies and combine them with headers to blocks to be staged. These
193-
# staged blocks are then excuted by the daemon process (no `peer` needed.)
194-
while buddy.blocksCollectOk():
195-
196-
# Collect bodies and either import them via `FC` module, or stage on
197-
# the blocks queue to get them serialised and imported, later.
198-
await buddy.blocksCollect info
199-
200-
# Import bodies from the `staged` queue.
201-
if not await buddy.blocksUnstage info:
202-
# Need to proceed with another peer (e.g. gap between top imported
203-
# block and blocks queue.)
204-
break
205-
206-
# End `while()`
168+
## The template returns a suggested idle time for after this task.
169+
##
170+
var bodyRc = chronos.nanoseconds(0)
171+
block body:
172+
if buddy.somethingToCollect():
173+
174+
# Download and process headers and blocks
175+
while buddy.headersCollectOk():
176+
177+
# Collect headers and either stash them on the header chain cache
178+
# directly, or stage on the header queue to get them serialised and
179+
# stashed, later.
180+
buddy.headersCollect info # async/template
181+
182+
# Store serialised headers from the `staged` queue onto the header
183+
# chain cache.
184+
if not buddy.headersUnstage info:
185+
# Need to proceed with another peer (e.g. gap between queue and
186+
# header chain cache.)
187+
bodyRc = workerIdleWaitInterval
188+
break body
189+
190+
# End `while()`
191+
192+
# Fetch bodies and combine them with headers to blocks to be staged.
193+
# These staged blocks are then excuted by the daemon process (no `peer`
194+
# needed.)
195+
while buddy.blocksCollectOk():
196+
197+
# Collect bodies and either import them via `FC` module, or stage on
198+
# the blocks queue to get them serialised and imported, later.
199+
buddy.blocksCollect info # async/template
200+
201+
# Import bodies from the `staged` queue.
202+
if not buddy.blocksUnstage info: # async/template
203+
# Need to proceed with another peer (e.g. gap between top imported
204+
# block and blocks queue.)
205+
bodyRc = workerIdleWaitInterval
206+
break body
207+
208+
# End `while()`
209+
210+
# Idle sleep unless there is something to do
211+
if not buddy.somethingToCollect():
212+
bodyRc = workerIdleWaitInterval
213+
214+
bodyRc
207215

208216
# ------------------------------------------------------------------------------
209217
# End

0 commit comments

Comments
 (0)