Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 151 additions & 29 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@ use crate::beacon_proposer_cache::{
};
use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob};
use crate::block_times_cache::BlockTimesCache;
use crate::block_verification::POS_PANDA_BANNER;
use crate::block_verification::{
AvailableExecutionPendingBlock, POS_PANDA_BANNER, SignatureVerifiedBlock,
};
use crate::block_verification::{
BlockError, ExecutionPendingBlock, GossipVerifiedBlock, IntoExecutionPendingBlock,
check_block_is_finalized_checkpoint_or_descendant, check_block_relevancy,
signature_verify_chain_segment, verify_header_signature,
};
use crate::block_verification_types::{
AsBlock, AvailableExecutedBlock, BlockImportData, ExecutedBlock, RpcBlock,
AsBlock, AvailableExecutedBlock, BlockImportData, ExecutedBlock, MaybeAvailableBlock, RpcBlock,
};
pub use crate::canonical_head::CanonicalHead;
use crate::chain_config::ChainConfig;
Expand Down Expand Up @@ -2910,35 +2912,16 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
for signature_verified_block in signature_verified_blocks {
let block_slot = signature_verified_block.slot();
match self
.process_block(
.process_sync_block(
signature_verified_block.block_root(),
signature_verified_block,
notify_execution_layer,
BlockImportSource::RangeSync,
|| Ok(()),
)
.await
{
Ok(status) => {
match status {
AvailabilityProcessingStatus::Imported(block_root) => {
// The block was imported successfully.
imported_blocks.push((block_root, block_slot));
}
AvailabilityProcessingStatus::MissingComponents(slot, block_root) => {
warn!(
?block_root,
%slot,
"Blobs missing in response to range request"
);
return ChainSegmentResult::Failed {
imported_blocks,
error: BlockError::AvailabilityCheck(
AvailabilityCheckError::MissingBlobs,
),
};
}
}
Ok(block_root) => {
imported_blocks.push((block_root, block_slot));
}
Err(BlockError::DuplicateFullyImported(block_root)) => {
debug!(
Expand Down Expand Up @@ -3322,6 +3305,119 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

pub async fn process_sync_block(
self: &Arc<Self>,
block_root: Hash256,
unverified_block: SignatureVerifiedBlock<T, AvailableBlock<T::EthSpec>>,
notify_execution_layer: NotifyExecutionLayer,
publish_fn: impl FnOnce() -> Result<(), BlockError>,
) -> Result<Hash256, BlockError> {
let block_slot = unverified_block.slot();

// Set observed time if not already set. Usually this should be set by gossip or RPC,
// but just in case we set it again here (useful for tests).
if let Some(seen_timestamp) = self.slot_clock.now_duration() {
self.block_times_cache.write().set_time_observed(
block_root,
block_slot,
seen_timestamp,
None,
None,
);
}

let block = unverified_block.block_cloned();
self.data_availability_checker.put_pre_execution_block(
block_root,
block,
BlockImportSource::RangeSync,
)?;

// Start the Prometheus timer.
let _full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES);

// Increment the Prometheus counter for block processing requests.
metrics::inc_counter(&metrics::BLOCK_PROCESSING_REQUESTS);

// A small closure to group the verification and import errors.
let chain = self.clone();
let import_block = async move {
let available_execution_pending: AvailableExecutionPendingBlock<T> = unverified_block
.into_execution_pending_block(
block_root,
&chain,
notify_execution_layer,
)?;
publish_fn()?;

// Record the time it took to complete consensus verification.
if let Some(timestamp) = self.slot_clock.now_duration() {
self.block_times_cache
.write()
.set_time_consensus_verified(block_root, block_slot, timestamp)
}

let executed_block = chain
.into_available_executed_block(available_execution_pending)
.await
.inspect_err(|_| {
// If the block fails execution for whatever reason (e.g. engine offline),
// and we keep it in the cache, then the node will NOT perform lookup and
// reprocess this block until the block is evicted from DA checker, causing the
// chain to get stuck temporarily if the block is canonical. Therefore we remove
// it from the cache if execution fails.
self.data_availability_checker
.remove_block_on_execution_error(&block_root);
})?;

// Record the *additional* time it took to wait for execution layer verification.
if let Some(timestamp) = self.slot_clock.now_duration() {
self.block_times_cache
.write()
.set_time_executed(block_root, block_slot, timestamp)
}

self.import_available_block(Box::new(executed_block)).await
};

// Verify and import the block.
match import_block.await {
Ok(block_root) => {
debug!(
?block_root,
%block_slot,
source = %BlockImportSource::RangeSync,
"Beacon block imported"
);
Ok(block_root)
}
Err(BlockError::BeaconChainError(e)) => {
match e.as_ref() {
BeaconChainError::TokioJoin(e) => {
debug!(
error = ?e,
"Beacon block processing cancelled"
);
}
_ => {
// There was an error whilst attempting to verify and import the block. The block might
// be partially verified or partially imported.
crit!(
error = ?e,
"Beacon block processing error"
);
}
};
Err(BlockError::BeaconChainError(e))
}
// The block failed verification.
Err(other) => {
debug!(reason = other.to_string(), "Beacon block rejected");
Err(other)
}
}
}

/// Returns `Ok(block_root)` if the given `unverified_block` was successfully verified and
/// imported into the chain.
///
Expand All @@ -3336,7 +3432,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// Returns an `Err` if the given block was invalid, or an error was encountered during
/// verification.
#[instrument(skip_all, fields(block_root = ?block_root, block_source = %block_source))]
pub async fn process_block<B: IntoExecutionPendingBlock<T>>(
pub async fn process_block<
B: IntoExecutionPendingBlock<T, MaybeAvailableBlock<T::EthSpec>, ExecutionPendingBlock<T>>,
>(
self: &Arc<Self>,
block_root: Hash256,
unverified_block: B,
Expand Down Expand Up @@ -3409,7 +3507,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

match executed_block {
ExecutedBlock::Available(block) => {
self.import_available_block(Box::new(block)).await
let block_root = self.import_available_block(Box::new(block)).await?;
Ok(AvailabilityProcessingStatus::Imported(block_root))
}
ExecutedBlock::AvailabilityPending(block) => {
self.check_block_availability_and_import(block).await
Expand Down Expand Up @@ -3465,6 +3564,28 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

pub async fn into_available_executed_block(
self: Arc<Self>,
available_execution_pending_block: AvailableExecutionPendingBlock<T>,
) -> Result<AvailableExecutedBlock<T::EthSpec>, BlockError> {
let AvailableExecutionPendingBlock {
block,
import_data,
payload_verification_handle,
} = available_execution_pending_block;

let payload_verification_outcome = payload_verification_handle
.await
.map_err(BeaconChainError::TokioJoin)?
.ok_or(BeaconChainError::RuntimeShutdown)??;

Ok(AvailableExecutedBlock::new(
block,
import_data,
payload_verification_outcome,
))
}

/// Accepts a fully-verified block and awaits on its payload verification handle to
/// get a fully `ExecutedBlock`.
///
Expand Down Expand Up @@ -3721,7 +3842,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Availability::Available(block) => {
publish_fn()?;
// Block is fully available, import into fork choice
self.import_available_block(block).await
let block_root = self.import_available_block(block).await?;
Ok(AvailabilityProcessingStatus::Imported(block_root))
}
Availability::MissingComponents(block_root) => Ok(
AvailabilityProcessingStatus::MissingComponents(slot, block_root),
Expand All @@ -3733,7 +3855,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub async fn import_available_block(
self: &Arc<Self>,
block: Box<AvailableExecutedBlock<T::EthSpec>>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
) -> Result<Hash256, BlockError> {
let AvailableExecutedBlock {
block,
import_data,
Expand Down Expand Up @@ -3780,7 +3902,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.await??
};

Ok(AvailabilityProcessingStatus::Imported(block_root))
Ok(block_root)
}

/// Accepts a fully-verified and available block and imports it into the chain without performing any
Expand Down
Loading
Loading