Skip to content

Commit 929d5ba

Browse files
authored
Beacon sync maint update and code cosmetics (#3847)
* Make beacon sync `start()`/`stop()` more robust why Make it resilient against `stop()` without `start()`, etc. * Rename `Buddy` in object names to `Peer` or `SyncPeer` * Introduce beacon syncer stand-by mode why It is needed for holding back auto-magic activation. This allows to maintain some basic functionality but not doing anything. I will be useful with other sync protocols as `snap`. Still metrics are working. While `snap` is active, the beacon sync must hold back. It can resume when `snap` has terminated.
1 parent 774ad73 commit 929d5ba

21 files changed

+155
-119
lines changed

execution_chain/sync/beacon.nim

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,16 @@ proc runDaemon(ctx: BeaconCtxRef): Future[Duration] {.async: (raises: []).} =
4242
proc runTicker(ctx: BeaconCtxRef) =
4343
worker.runTicker(ctx, "Ticker")
4444

45-
proc runStart(buddy: BeaconBuddyRef): bool =
45+
proc runStart(buddy: BeaconPeerRef): bool =
4646
worker.start(buddy, "Start")
4747

48-
proc runStop(buddy: BeaconBuddyRef) =
48+
proc runStop(buddy: BeaconPeerRef) =
4949
worker.stop(buddy, "Stop")
5050

51-
proc runPool(buddy: BeaconBuddyRef; last: bool; laps: int): bool =
51+
proc runPool(buddy: BeaconPeerRef; last: bool; laps: int): bool =
5252
worker.runPool(buddy, last, laps, "SyncMode")
5353

54-
proc runPeer(buddy: BeaconBuddyRef): Future[Duration] {.async: (raises: []).} =
54+
proc runPeer(buddy: BeaconPeerRef): Future[Duration] {.async: (raises: []).} =
5555
let rank = buddy.classifyForFetching()
5656
return worker.runPeer(buddy, rank, "Peer")
5757

@@ -75,6 +75,7 @@ proc config*(
7575
ethNode: EthereumNode;
7676
chain: ForkedChainRef;
7777
maxPeers: int;
78+
standByMode = false;
7879
) =
7980
## Complete `BeaconSyncRef` descriptor initialisation.
8081
##
@@ -84,6 +85,8 @@ proc config*(
8485
doAssert desc.ctx.isNil # This can only run once
8586
desc.initSync(ethNode, maxPeers)
8687
desc.ctx.pool.chain = chain
88+
if standByMode:
89+
desc.ctx.pool.syncState = SyncState.standByMode
8790

8891
if not desc.lazyConfigHook.isNil:
8992
desc.lazyConfigHook(desc)
@@ -99,13 +102,23 @@ proc configTarget*(desc: BeaconSyncRef; hex: string; isFinal: bool): bool =
99102
discard
100103
# false
101104

105+
106+
proc activate*(desc: BeaconSyncRef) =
107+
## Clear stand-by mode (if any)
108+
doAssert not desc.ctx.isNil
109+
if desc.ctx.pool.syncState == SyncState.standByMode:
110+
desc.ctx.pool.syncState = SyncState.idle
111+
102112
proc start*(desc: BeaconSyncRef): bool =
103113
doAssert not desc.ctx.isNil
104-
desc.startSync()
114+
if not desc.isRunning and desc.startSync():
115+
return true
116+
# false
105117

106118
proc stop*(desc: BeaconSyncRef) {.async.} =
107119
doAssert not desc.ctx.isNil
108-
await desc.stopSync()
120+
if desc.isRunning:
121+
await desc.stopSync()
109122

110123
# ------------------------------------------------------------------------------
111124
# End

execution_chain/sync/beacon/beacon_desc.nim

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ type
2121
BeaconSyncConfigHook* = proc(desc: BeaconSyncRef) {.gcsafe, raises: [].}
2222
## Conditional configuration request hook
2323

24-
BeaconSyncRef* = ref object of RunnerSyncRef[BeaconCtxData,BeaconBuddyData]
24+
BeaconSyncRef* = ref object of RunnerSyncRef[BeaconCtxData,BeaconPeerData]
2525
## Instance descriptor, extends scheduler object
2626
lazyConfigHook*: BeaconSyncConfigHook
2727

execution_chain/sync/beacon/worker.nim

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ proc release*(ctx: BeaconCtxRef; info: static[string]) =
3333
ctx.destroyServices()
3434

3535

36-
proc start*(buddy: BeaconBuddyRef; info: static[string]): bool =
36+
proc start*(buddy: BeaconPeerRef; info: static[string]): bool =
3737
## Initialise worker peer
3838
let
3939
peer = buddy.peer
@@ -43,20 +43,20 @@ proc start*(buddy: BeaconBuddyRef; info: static[string]): bool =
4343
if not ctx.hibernate: debug info & ": useless peer already tried", peer
4444
return false
4545

46-
if not buddy.startBuddy():
46+
if not buddy.startSyncPeer():
4747
if not ctx.hibernate: debug info & ": failed", peer
4848
return false
4949

5050
if not ctx.hibernate: debug info & ": new peer",
5151
peer, nSyncPeers=ctx.nSyncPeers()
5252
true
5353

54-
proc stop*(buddy: BeaconBuddyRef; info: static[string]) =
54+
proc stop*(buddy: BeaconPeerRef; info: static[string]) =
5555
## Clean up this peer
5656
if not buddy.ctx.hibernate: debug info & ": release peer", peer=buddy.peer,
5757
thPut=buddy.only.thPutStats.toMeanVar.toStr,
5858
nSyncPeers=(buddy.ctx.nSyncPeers()-1), state=($buddy.syncState)
59-
buddy.stopBuddy()
59+
buddy.stopSyncPeer()
6060

6161
# ------------------------------------------------------------------------------
6262
# Public functions
@@ -105,7 +105,7 @@ template runDaemon*(ctx: BeaconCtxRef; info: static[string]): Duration =
105105

106106

107107
proc runPool*(
108-
buddy: BeaconBuddyRef;
108+
buddy: BeaconPeerRef;
109109
last: bool;
110110
laps: int;
111111
info: static[string];
@@ -130,7 +130,7 @@ proc runPool*(
130130

131131

132132
template runPeer*(
133-
buddy: BeaconBuddyRef;
133+
buddy: BeaconPeerRef;
134134
rank: PeerRanking;
135135
info: static[string];
136136
): Duration =
@@ -203,6 +203,10 @@ template runPeer*(
203203
bodyRc = workerIdleLongWaitInterval
204204
break body
205205

206+
elif buddy.ctx.pool.syncState == SyncState.standByMode:
207+
bodyRc = workerIdleLongWaitInterval
208+
break body
209+
206210
# Idle sleep unless there is something to do
207211
if not buddy.somethingToCollectOrUnstage():
208212
bodyRc = workerIdleWaitInterval

execution_chain/sync/beacon/worker/blocks.nim

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ proc toStrIfAvail(bn: BlockNumber; ctx: BeaconCtxRef): string =
3636
# Public functions
3737
# ------------------------------------------------------------------------------
3838

39-
func blocksCollectOk*(buddy: BeaconBuddyRef): bool =
39+
func blocksCollectOk*(buddy: BeaconPeerRef): bool =
4040
## Check whether body records can be fetched and imported or stored
4141
## on the `staged` queue.
4242
##
@@ -49,7 +49,7 @@ func blocksCollectOk*(buddy: BeaconBuddyRef): bool =
4949

5050

5151
template blocksCollect*(
52-
buddy: BeaconBuddyRef;
52+
buddy: BeaconPeerRef;
5353
info: static[string]) =
5454
## Async/template
5555
##
@@ -136,7 +136,7 @@ template blocksCollect*(
136136
if ctx.subState.topNum < lastBn:
137137
ctx.blocksUnprocAppend(ctx.subState.topNum + 1, lastBn)
138138

139-
# Buddy might have been cancelled while importing blocks.
139+
# Sync peer might have been cancelled while importing blocks.
140140
if buddy.ctrl.stopped or ctx.poolMode:
141141
break fetchBlocksBody # done, exit this block
142142

@@ -207,7 +207,7 @@ template blocksCollect*(
207207

208208
# --------------
209209

210-
proc blocksUnstageOk*(buddy: BeaconBuddyRef): bool =
210+
proc blocksUnstageOk*(buddy: BeaconPeerRef): bool =
211211
## Check whether import processing is possible
212212
##
213213
let ctx = buddy.ctx
@@ -216,7 +216,7 @@ proc blocksUnstageOk*(buddy: BeaconBuddyRef): bool =
216216

217217

218218
template blocksUnstage*(
219-
buddy: BeaconBuddyRef;
219+
buddy: BeaconPeerRef;
220220
info: static[string];
221221
): bool =
222222
## Async/template

execution_chain/sync/beacon/worker/blocks/blocks_blocks.nim

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func toStr(e: BeaconError): string =
4040
# ------------------------------------------------------------------------------
4141

4242
template blocksFetchCheckImpl(
43-
buddy: BeaconBuddyRef;
43+
buddy: BeaconPeerRef;
4444
iv: BnRange;
4545
info: static[string];
4646
): Opt[seq[EthBlock]] =
@@ -136,7 +136,7 @@ template blocksFetchCheckImpl(
136136
# ------------------------------------------------------------------------------
137137

138138
template blocksFetch*(
139-
buddy: BeaconBuddyRef;
139+
buddy: BeaconPeerRef;
140140
num: uint;
141141
info: static[string];
142142
): Opt[seq[EthBlock]] =
@@ -180,7 +180,7 @@ template blocksFetch*(
180180

181181

182182
template blocksImport*(
183-
buddy: BeaconBuddyRef;
183+
buddy: BeaconPeerRef;
184184
blocks: seq[EthBlock];
185185
peerID: Hash;
186186
info: static[string];

execution_chain/sync/beacon/worker/blocks/blocks_fetch.nim

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import
2323
# -----------------------------------------------------------------------------
2424

2525
proc maybeSlowPeerError(
26-
buddy: BeaconBuddyRef;
26+
buddy: BeaconPeerRef;
2727
elapsed: Duration;
2828
hash: Hash32;
2929
): bool =
@@ -32,7 +32,7 @@ proc maybeSlowPeerError(
3232
buddy.bdyFetchRegisterError(slowPeer=true)
3333

3434
# Do not repeat the same time-consuming failed request
35-
buddy.only.failedReq = BuddyFirstFetchReq(
35+
buddy.only.failedReq = BcPeerFirstFetchReq(
3636
state: SyncState.blocks,
3737
blockHash: hash)
3838

@@ -41,8 +41,8 @@ proc maybeSlowPeerError(
4141
# false
4242

4343

44-
proc getBlockBodies(
45-
buddy: BeaconBuddyRef;
44+
proc getBlockBodies*(
45+
buddy: BeaconPeerRef;
4646
req: BlockBodiesRequest;
4747
): Future[Result[FetchBodiesData,BeaconError]]
4848
{.async: (raises: []).} =
@@ -77,7 +77,7 @@ proc getBlockBodies(
7777
# ------------------------------------------------------------------------------
7878

7979
template fetchBodies*(
80-
buddy: BeaconBuddyRef;
80+
buddy: BeaconPeerRef;
8181
request: BlockBodiesRequest;
8282
info: static[string];
8383
): Opt[seq[BlockBody]] =

execution_chain/sync/beacon/worker/blocks/blocks_helpers.nim

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@ import
1919
# Public functions
2020
# ------------------------------------------------------------------------------
2121

22-
func blkErrors*(buddy: BeaconBuddyRef): string =
22+
func blkErrors*(buddy: BeaconPeerRef): string =
2323
$buddy.nErrors.fetch.bdy & "/" & $buddy.nErrors.apply.blk
2424

2525
proc bdyFetchRegisterError*(
26-
buddy: BeaconBuddyRef;
26+
buddy: BeaconPeerRef;
2727
slowPeer = false;
2828
forceZombie = false) =
2929
buddy.nErrors.fetch.bdy.inc
@@ -45,20 +45,20 @@ func blkSessionStopped*(ctx: BeaconCtxRef): bool =
4545
ctx.poolMode or
4646
ctx.pool.syncState != SyncState.blocks
4747

48-
func blkThroughput*(buddy: BeaconBuddyRef): string =
48+
func blkThroughput*(buddy: BeaconPeerRef): string =
4949
## Print throuhput sratistics
5050
buddy.only.thPutStats.blk.toMeanVar.toStr
5151

5252
# -------------
5353

5454
proc blkNoSampleSize*(
55-
buddy: BeaconBuddyRef;
55+
buddy: BeaconPeerRef;
5656
elapsed: chronos.Duration;
5757
) =
5858
discard buddy.only.thPutStats.blk.bpsSample(elapsed, 0)
5959

6060
proc blkSampleSize*(
61-
buddy: BeaconBuddyRef;
61+
buddy: BeaconPeerRef;
6262
elapsed: chronos.Duration;
6363
size: int;
6464
): uint =

execution_chain/sync/beacon/worker/blocks/blocks_import.nim

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import
2121
# ------------------------------------------------------------------------------
2222

2323
proc importBlock*(
24-
buddy: BeaconBuddyRef;
24+
buddy: BeaconPeerRef;
2525
blk: EthBlock;
2626
effPeerID: Hash;
2727
): Future[Result[Duration,BeaconError]]

execution_chain/sync/beacon/worker/classify.nim

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import
1717
# Public functions
1818
# ------------------------------------------------------------------------------
1919

20-
func somethingToCollectOrUnstage*(buddy: BeaconBuddyRef): bool =
20+
func somethingToCollectOrUnstage*(buddy: BeaconPeerRef): bool =
2121
if buddy.ctx.hibernate: # not activated yet?
2222
return false
2323
if buddy.headersCollectOk() or # something on TODO list
@@ -28,7 +28,7 @@ func somethingToCollectOrUnstage*(buddy: BeaconBuddyRef): bool =
2828
false
2929

3030

31-
func classifyForFetching*(buddy: BeaconBuddyRef): PeerRanking =
31+
func classifyForFetching*(buddy: BeaconPeerRef): PeerRanking =
3232
## Rank and classify peers by whether they should be used for fetching
3333
## data.
3434
##
@@ -46,7 +46,7 @@ func classifyForFetching*(buddy: BeaconBuddyRef): PeerRanking =
4646
if buddy.ctx.nSyncPeers() <= headersStagedQueueLengthMax + 1:
4747
return (qSlotsAvail, -1)
4848

49-
template hdr(b: BeaconBuddyRef): StatsCollect =
49+
template hdr(b: BeaconPeerRef): StatsCollect =
5050
b.only.thPutStats.hdr
5151

5252
# Are there throughput data available for this peer (aka buddy), at all?
@@ -76,7 +76,7 @@ func classifyForFetching*(buddy: BeaconBuddyRef): PeerRanking =
7676
if buddy.ctx.nSyncPeers() <= blocksStagedQueueLengthMax + 1:
7777
return (qSlotsAvail, -1)
7878

79-
template blk(b: BeaconBuddyRef): StatsCollect =
79+
template blk(b: BeaconPeerRef): StatsCollect =
8080
b.only.thPutStats.blk
8181

8282
if buddy.blk.samples == 0:

execution_chain/sync/beacon/worker/headers.nim

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ proc nUnprocStr(ctx: BeaconCtxRef): string =
3636
# Public functions
3737
# ------------------------------------------------------------------------------
3838

39-
func headersCollectOk*(buddy: BeaconBuddyRef): bool =
39+
func headersCollectOk*(buddy: BeaconPeerRef): bool =
4040
## Helper for `worker.nim`, etc.
4141
if buddy.ctrl.running:
4242
let ctx = buddy.ctx
@@ -46,7 +46,7 @@ func headersCollectOk*(buddy: BeaconBuddyRef): bool =
4646
false
4747

4848

49-
template headersCollect*(buddy: BeaconBuddyRef; info: static[string]) =
49+
template headersCollect*(buddy: BeaconPeerRef; info: static[string]) =
5050
## Async/template
5151
##
5252
## Collect headers and either stash them on the header chain cache directly,
@@ -213,14 +213,14 @@ template headersCollect*(buddy: BeaconBuddyRef; info: static[string]) =
213213

214214
# --------------
215215

216-
proc headersUnstageOk*(buddy: BeaconBuddyRef): bool =
216+
proc headersUnstageOk*(buddy: BeaconPeerRef): bool =
217217
## Check whether import processing is possible
218218
##
219219
let ctx = buddy.ctx
220220
not ctx.poolMode and
221221
0 < ctx.hdr.staged.len
222222

223-
proc headersUnstage*(buddy: BeaconBuddyRef; info: static[string]): bool =
223+
proc headersUnstage*(buddy: BeaconPeerRef; info: static[string]): bool =
224224
## Store headers from the `staged` queue onto the header chain cache.
225225
##
226226
## The function returns `false` if the caller should make sure to allow

0 commit comments

Comments
 (0)