Skip to content

Commit 7497790

Browse files
agnxshtersec
andauthored
add getBlobsV2 and payloadV5 (#7123)
* getBlobsV2 and payloadV5 * fix copyright years * update copyright years * tiny bug fix * fix * refactored reaching ELManager * removed data column quarantine * reflect new quarantine changes * add bpo numbers * rm oncolumn callback * fix column quarantine test * add: bpo parsing (#7195) * bpo parsing * fix some formatting stuff * strip spaces due to indentation * handle one more case * stop rejecting blocks with blobs more than electra * comment out a check * delay reconstruction from critical path * check for non zero columns * make another fix * ensure BPOs are in correctly sorted order * bump down parallel requests for data column receiving * drop put calls to quarantine as we get everything from EL, whenever we get * remove reconstruction --------- Co-authored-by: tersec <tersec@users.noreply.github.com> * use newSeqOfCap * adding a further comment --------- Co-authored-by: tersec <tersec@users.noreply.github.com>
1 parent b020321 commit 7497790

18 files changed

+443
-119
lines changed

beacon_chain/consensus_object_pools/blob_quarantine.nim

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ type
4848

4949
BlobQuarantine* =
5050
SidecarQuarantine[BlobSidecar, OnBlobSidecarCallback]
51+
# TODO this OnDataColumnSidecarCallback tpe is notional/unused
5152
ColumnQuarantine* =
5253
SidecarQuarantine[DataColumnSidecar, OnDataColumnSidecarCallback]
5354

@@ -632,11 +633,6 @@ template onBlobSidecarCallback*(
632633
): OnBlobSidecarCallback =
633634
quarantine.onSidecarCallback
634635

635-
template onDataColumnSidecarCallback*(
636-
quarantine: ColumnQuarantine
637-
): OnDataColumnSidecarCallback =
638-
quarantine.onSidecarCallback
639-
640636
func init*(
641637
T: typedesc[BlobQuarantine],
642638
cfg: RuntimeConfig,
@@ -660,7 +656,6 @@ func init*(
660656
T: typedesc[ColumnQuarantine],
661657
cfg: RuntimeConfig,
662658
custodyColumns: openArray[ColumnIndex],
663-
onDataColumnSidecarCallback: OnDataColumnSidecarCallback
664659
): ColumnQuarantine =
665660
doAssert(len(custodyColumns) <= NUMBER_OF_COLUMNS)
666661
let size = maxSidecars(NUMBER_OF_COLUMNS)
@@ -679,5 +674,4 @@ func init*(
679674
indexMap: indexMap,
680675
custodyColumns: @custodyColumns,
681676
custodyMap: ColumnMap.init(custodyColumns),
682-
onSidecarCallback: onDataColumnSidecarCallback
683677
)

beacon_chain/el/el_manager.nim

Lines changed: 79 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,9 @@ type
5151
BellatrixExecutionPayloadWithValue |
5252
GetPayloadV2Response |
5353
GetPayloadV3Response |
54-
GetPayloadV4Response
54+
GetPayloadV4Response |
55+
GetPayloadV5Response
56+
5557

5658
const
5759
noTimeout = WithoutTimeout(0)
@@ -64,6 +66,8 @@ const
6466
# https://github.com/ethereum/execution-apis/blob/v1.0.0-beta.4/src/engine/shanghai.md#request-2
6567
GETPAYLOAD_TIMEOUT = 1.seconds
6668

69+
GETBLOBS_TIMEOUT = 200.milliseconds
70+
6771
connectionStateChangeHysteresisThreshold = 15
6872
## How many unsuccesful/successful requests we must see
6973
## before declaring the connection as degraded/restored
@@ -406,9 +410,11 @@ proc getPayloadFromSingleEL(
406410
suggestedFeeRecipient: suggestedFeeRecipient,
407411
withdrawals: withdrawals))
408412
elif GetPayloadResponseType is engine_api.GetPayloadV3Response or
409-
GetPayloadResponseType is engine_api.GetPayloadV4Response:
413+
GetPayloadResponseType is engine_api.GetPayloadV4Response or
414+
GetPayloadResponseType is engine_api.GetPayloadV5Response:
410415
# https://github.com/ethereum/execution-apis/blob/90a46e9137c89d58e818e62fa33a0347bba50085/src/engine/prague.md
411416
# does not define any new forkchoiceUpdated, so reuse V3 from Dencun
417+
# https://github.com/ethereum/execution-apis/blob/5d634063ccfd897a6974ea589c00e2c1d889abc9/src/engine/osaka.md
412418
let response = await rpcClient.forkchoiceUpdated(
413419
ForkchoiceStateV1(
414420
headBlockHash: headBlock.asBlockHash,
@@ -459,7 +465,7 @@ template EngineApiResponseType*(T: type electra.ExecutionPayloadForSigning): typ
459465
engine_api.GetPayloadV4Response
460466

461467
template EngineApiResponseType*(T: type fulu.ExecutionPayloadForSigning): type =
462-
engine_api.GetPayloadV4Response
468+
engine_api.GetPayloadV5Response
463469

464470
template toEngineWithdrawals*(withdrawals: seq[capella.Withdrawal]): seq[WithdrawalV1] =
465471
mapIt(withdrawals, toEngineWithdrawal(it))
@@ -631,6 +637,13 @@ proc sendNewPayloadToSingleEL(
631637
payload, versioned_hashes, Hash32 parent_beacon_block_root,
632638
executionRequests)
633639

640+
proc sendGetBlobsV2toSingleEl(
641+
connection: ELConnection,
642+
versioned_hashes: seq[engine_api.VersionedHash]
643+
): Future[GetBlobsV2Response] {.async: (raises: [CatchableError]).} =
644+
let rpcClient = await connection.connectedRpcClient()
645+
await rpcClient.engine_getBlobsV2(versioned_hashes)
646+
634647
type
635648
StatusRelation = enum
636649
newStatusIsPreferable
@@ -756,6 +769,69 @@ proc lazyWait(futures: seq[FutureBase]) {.async: (raises: []).} =
756769
if len(pending) > 0:
757770
await noCancel allFutures(pending)
758771

772+
proc sendGetBlobsV2*(
773+
m: ELManager,
774+
blck: fulu.SignedBeaconBlock,
775+
): Future[Opt[seq[BlobAndProofV2]]] {.async: (raises: [CancelledError]).} =
776+
777+
if m.elConnections.len == 0:
778+
return err()
779+
780+
let deadline = sleepAsync(GETBLOBS_TIMEOUT)
781+
782+
var bestIdx: Opt[int] = Opt.none(int)
783+
784+
while true:
785+
let requests = m.elConnections.mapIt(
786+
sendGetBlobsV2toSingleEl(it,
787+
mapIt(blck.message.body.blob_kzg_commitments,
788+
engine_api.VersionedHash(kzg_commitment_to_versioned_hash(it)))
789+
)
790+
)
791+
792+
let timeoutExceeded =
793+
try:
794+
await allFutures(requests).wait(deadline)
795+
false
796+
except AsyncTimeoutError:
797+
true
798+
except CancelledError as exc:
799+
# cancel anything still running, then re-raise
800+
await noCancel allFutures(
801+
requests.filterIt(not it.finished()).mapIt(it.cancelAndWait())
802+
)
803+
raise exc
804+
805+
for idx, req in requests:
806+
if req.finished():
807+
# choose the first successful (not failed) response
808+
if req.error.isNil and bestIdx.isNone:
809+
bestIdx = Opt.some(idx)
810+
else:
811+
# finished == false
812+
let errmsg =
813+
if req.error.isNil: "request still pending"
814+
else: req.error.msg
815+
warn "Timeout while getting blobs & proofs",
816+
url = m.elConnections[idx].engineUrl.url,
817+
reason = errmsg
818+
819+
await noCancel allFutures(
820+
requests.filterIt(not it.finished()).mapIt(it.cancelAndWait())
821+
)
822+
823+
if bestIdx.isSome():
824+
let chosen = requests[bestIdx.get()]
825+
# chosen is finished; but could still be an error, so guard again
826+
if chosen.error.isNil:
827+
return ok(chosen.value())
828+
else:
829+
warn "Chosen EL failed unexpectedly", reason = chosen.error.msg
830+
if timeoutExceeded:
831+
break
832+
833+
err()
834+
759835
proc sendNewPayload*(
760836
m: ELManager,
761837
blck: SomeForkyBeaconBlock,

beacon_chain/el/engine_api_conversions.nim

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# beacon_chain
2-
# Copyright (c) 2024 Status Research & Development GmbH
2+
# Copyright (c) 2024-2025 Status Research & Development GmbH
33
# Licensed and distributed under either of
44
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
55
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
@@ -224,7 +224,7 @@ func asConsensusType*(
224224
executionRequests: payload.executionRequests)
225225

226226
func asConsensusTypeFulu*(
227-
payload: GetPayloadV4Response):
227+
payload: GetPayloadV5Response):
228228
fulu.ExecutionPayloadForSigning =
229229
fulu.ExecutionPayloadForSigning(
230230
executionPayload: payload.executionPayload.asFuluConsensusPayload,

beacon_chain/gossip_processing/block_processor.nim

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,8 @@ proc storeBackfillBlock(
283283
self.consensusManager.dag.db.putBlobSidecar(b[])
284284

285285
# Only store data columns after successfully establishing block validity
286-
let columns = dataColumnsOpt.valueOr: DataColumnSidecars @[]
286+
let
287+
columns = dataColumnsOpt.valueOr: DataColumnSidecars @[]
287288
for c in columns:
288289
self.consensusManager.dag.db.putDataColumnSidecar(c[])
289290

@@ -728,16 +729,17 @@ proc storeBlock(
728729

729730
when typeof(signedBlock).kind >= ConsensusFork.Fulu:
730731
if dataColumnsOpt.isSome:
731-
let columns = dataColumnsOpt.get()
732-
let kzgCommits = signedBlock.message.body.blob_kzg_commitments.asSeq
733-
if columns.len > 0 and kzgCommits.len > 0:
734-
for i in 0..<columns.len:
732+
let
733+
columns0 = dataColumnsOpt.get()
734+
kzgCommits = signedBlock.message.body.blob_kzg_commitments.asSeq
735+
if columns0.len > 0 and kzgCommits.len > 0:
736+
for i in 0..<columns0.len:
735737
let r =
736-
verify_data_column_sidecar_kzg_proofs(columns[i][])
738+
verify_data_column_sidecar_kzg_proofs(columns0[i][])
737739
if r.isErr:
738740
debug "data column validation failed",
739741
blockRoot = shortLog(signedBlock.root),
740-
column_sidecar = shortLog(columns[i][]),
742+
column_sidecar = shortLog(columns0[i][]),
741743
blck = shortLog(signedBlock.message),
742744
signature = shortLog(signedBlock.signature),
743745
msg = r.error()

beacon_chain/gossip_processing/eth2_processor.nim

Lines changed: 76 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,12 @@
88
{.push raises: [].}
99

1010
import
11-
std/[tables],
11+
std/[tables, sequtils],
1212
chronicles, chronos, metrics,
1313
taskpools,
14+
kzg4844/kzg,
15+
ssz_serialization/types,
16+
../el/el_manager,
1417
../spec/[helpers, forks, peerdas_helpers],
1518
../consensus_object_pools/[
1619
blob_quarantine, block_clearance, block_quarantine, blockchain_dag,
@@ -349,15 +352,80 @@ proc processBlobSidecar*(
349352

350353
v
351354

355+
proc validateDataColumnSidecarFromEL*(
356+
self: ref Eth2Processor,
357+
block_root: Eth2Digest):
358+
Future[ValidationRes]
359+
{.async: (raises: [CancelledError]).} =
360+
let elManager = self.blockProcessor[].consensusManager.elManager
361+
if (let o = self.quarantine[].popColumnless(block_root); o.isSome):
362+
let columnless = o.unsafeGet()
363+
withBlck(columnless):
364+
when consensusFork >= ConsensusFork.Fulu:
365+
let
366+
start_time = Moment.now()
367+
let blobsFromElOpt =
368+
await elManager.sendGetBlobsV2(forkyBlck)
369+
if blobsFromElOpt.isSome():
370+
let blobsEl = blobsFromElOpt.get()
371+
372+
# check lengths of array[BlobAndProofV2 with blobs
373+
# kzg commitments of the signed block
374+
if blobsEl.len == forkyBlck.message.body.blob_kzg_commitments.len:
375+
376+
# we have received all columns from the EL
377+
# hence we can safely remove the columnless block from quarantine
378+
var flat_proof: seq[kzg.KzgProof] = @[]
379+
for item in blobsEl:
380+
for proof in item.proofs:
381+
flat_proof.add(kzg.KzgProof(bytes: proof.data))
382+
383+
let
384+
recovered_columns =
385+
assemble_data_column_sidecars(
386+
forkyBlck,
387+
blobsEl.mapIt(kzg.KzgBlob(bytes: it.blob.data)),
388+
flat_proof)
389+
390+
# Pop out the column sidecars as we have all columns from the EL
391+
discard self.dataColumnQuarantine[].popSidecars(block_root,
392+
forkyBlck)
393+
394+
let end_time = Moment.now()
395+
debug "Time taken to get 100% response from EL and bypass blob gossip validation",
396+
time_taken = end_time - start_time
397+
debug "Pulled blobs from EL, bypassing blob gossip validation",
398+
blobs_from_el = blobsEl.len
399+
self.blockProcessor[].enqueueBlock(
400+
MsgSource.gossip, columnless,
401+
Opt.none(BlobSidecars),
402+
Opt.some(recovered_columns.mapIt(newClone it)))
403+
return ok()
404+
405+
else:
406+
discard self.quarantine[].addColumnless(
407+
self.dag.finalizedHead.slot, forkyBlck)
408+
else:
409+
raiseAssert "Could not have been added as columnless"
410+
else:
411+
return errIgnore ("Could not pull blobs and proofs from EL")
412+
352413
proc processDataColumnSidecar*(
353-
self: var Eth2Processor, src: MsgSource,
354-
dataColumnSidecar: DataColumnSidecar, subnet_id: uint64): ValidationRes =
414+
self: ref Eth2Processor, src: MsgSource,
415+
dataColumnSidecar: DataColumnSidecar, subnet_id: uint64):
416+
Future[ValidationRes] {.async: (raises: [CancelledError]).} =
355417
template block_header: untyped = dataColumnSidecar.signed_block_header.message
418+
let block_root = hash_tree_root(block_header)
419+
420+
let vEL =
421+
await self.validateDataColumnSidecarFromEL(block_root)
422+
423+
if vEL.isOk():
424+
return vEL
356425

357426
let
358427
wallTime = self.getCurrentBeaconTime()
359428
(_, wallSlot) = wallTime.toSlot()
360-
block_root = hash_tree_root(block_header)
361429

362430
logScope:
363431
dcs = shortLog(dataColumnSidecar)
@@ -387,17 +455,10 @@ proc processDataColumnSidecar*(
387455
let cres =
388456
self.dataColumnQuarantine[].popSidecars(block_root, forkyBlck)
389457
if cres.isSome():
390-
if cres.get().lenu64 > (self.dag.cfg.NUMBER_OF_COLUMNS div 2):
391-
# We have enough data columns to reconstruct the rest
392-
let
393-
recovered_cps = recover_cells_and_proofs(cres.get())
394-
reconstructed_columns =
395-
reconstruct_data_column_sidecars(forkyBlck, recovered_cps.get)
396-
397-
self.blockProcessor[].enqueueBlock(
398-
MsgSource.gossip, columnless,
399-
Opt.none(BlobSidecars),
400-
Opt.some(reconstructed_columns))
458+
self.blockProcessor[].enqueueBlock(
459+
MsgSource.gossip, columnless,
460+
Opt.none(BlobSidecars),
461+
cres)
401462
else:
402463
discard self.quarantine[].addColumnless(
403464
self.dag.finalizedHead.slot, forkyBlck)

beacon_chain/gossip_processing/gossip_validation.nim

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,16 @@ template checkedReject(
294294
pool.dag.checkedReject(error)
295295

296296
func getMaxBlobsPerBlock(cfg: RuntimeConfig, slot: Slot): uint64 =
297-
if slot >= cfg.ELECTRA_FORK_EPOCH.start_slot:
297+
if slot >= cfg.FULU_FORK_EPOCH.start_slot:
298+
let
299+
epoch: Epoch = Epoch(uint64(slot) div SLOTS_PER_EPOCH)
300+
maxBlobs = get_max_blobs_per_block_bpo(cfg, epoch)
301+
if maxBlobs.isSome():
302+
maxBlobs.get()
303+
else:
304+
# If the max blobs per block is not set, use the default value
305+
cfg.MAX_BLOBS_PER_BLOCK_ELECTRA
306+
elif slot >= cfg.ELECTRA_FORK_EPOCH.start_slot:
298307
cfg.MAX_BLOBS_PER_BLOCK_ELECTRA
299308
else:
300309
cfg.MAX_BLOBS_PER_BLOCK
@@ -696,12 +705,6 @@ proc validateDataColumnSidecar*(
696705
if r.isErr:
697706
return dag.checkedReject(r.error)
698707

699-
# Send notification about new data column sidecar via callback
700-
let onDataColumnSidecarCallback =
701-
dataColumnQuarantine[].onDataColumnSidecarCallback()
702-
if not(isNil(onDataColumnSidecarCallback)):
703-
onDataColumnSidecarCallback data_column_sidecar
704-
705708
ok()
706709

707710
# https://github.com/ethereum/consensus-specs/blob/v1.3.0/specs/phase0/p2p-interface.md#beacon_block
@@ -843,7 +846,7 @@ proc validateBeaconBlock*(
843846
# validation.
844847
validateBeaconBlockBellatrix(signed_beacon_block, parent)
845848

846-
dag.validateBeaconBlockDeneb(signed_beacon_block, wallTime)
849+
# dag.validateBeaconBlockDeneb(signed_beacon_block, wallTime)
847850

848851
# [REJECT] The block is from a higher slot than its parent.
849852
if not (signed_beacon_block.message.slot > parent.bid.slot):

0 commit comments

Comments
 (0)