From 232ec5b7cc59bc816c88b2267369b6fea42df4c5 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Thu, 13 Nov 2025 15:57:51 -0500 Subject: [PATCH] Rpc block is always available --- beacon_node/beacon_chain/src/beacon_chain.rs | 180 +- .../beacon_chain/src/block_verification.rs | 1639 +++++++++-------- .../src/data_availability_checker.rs | 36 +- 3 files changed, 1046 insertions(+), 809 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 494346e7ff2..f6f240b0852 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -10,14 +10,16 @@ use crate::beacon_proposer_cache::{ }; use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob}; use crate::block_times_cache::BlockTimesCache; -use crate::block_verification::POS_PANDA_BANNER; +use crate::block_verification::{ + AvailableExecutionPendingBlock, POS_PANDA_BANNER, SignatureVerifiedBlock, +}; use crate::block_verification::{ BlockError, ExecutionPendingBlock, GossipVerifiedBlock, IntoExecutionPendingBlock, check_block_is_finalized_checkpoint_or_descendant, check_block_relevancy, signature_verify_chain_segment, verify_header_signature, }; use crate::block_verification_types::{ - AsBlock, AvailableExecutedBlock, BlockImportData, ExecutedBlock, RpcBlock, + AsBlock, AvailableExecutedBlock, BlockImportData, ExecutedBlock, MaybeAvailableBlock, RpcBlock, }; pub use crate::canonical_head::CanonicalHead; use crate::chain_config::ChainConfig; @@ -2910,35 +2912,16 @@ impl BeaconChain { for signature_verified_block in signature_verified_blocks { let block_slot = signature_verified_block.slot(); match self - .process_block( + .process_sync_block( signature_verified_block.block_root(), signature_verified_block, notify_execution_layer, - BlockImportSource::RangeSync, || Ok(()), ) .await { - Ok(status) => { - match status { - AvailabilityProcessingStatus::Imported(block_root) => { - // The block was imported successfully. - imported_blocks.push((block_root, block_slot)); - } - AvailabilityProcessingStatus::MissingComponents(slot, block_root) => { - warn!( - ?block_root, - %slot, - "Blobs missing in response to range request" - ); - return ChainSegmentResult::Failed { - imported_blocks, - error: BlockError::AvailabilityCheck( - AvailabilityCheckError::MissingBlobs, - ), - }; - } - } + Ok(block_root) => { + imported_blocks.push((block_root, block_slot)); } Err(BlockError::DuplicateFullyImported(block_root)) => { debug!( @@ -3322,6 +3305,119 @@ impl BeaconChain { } } + pub async fn process_sync_block( + self: &Arc, + block_root: Hash256, + unverified_block: SignatureVerifiedBlock>, + notify_execution_layer: NotifyExecutionLayer, + publish_fn: impl FnOnce() -> Result<(), BlockError>, + ) -> Result { + let block_slot = unverified_block.slot(); + + // Set observed time if not already set. Usually this should be set by gossip or RPC, + // but just in case we set it again here (useful for tests). + if let Some(seen_timestamp) = self.slot_clock.now_duration() { + self.block_times_cache.write().set_time_observed( + block_root, + block_slot, + seen_timestamp, + None, + None, + ); + } + + let block = unverified_block.block_cloned(); + self.data_availability_checker.put_pre_execution_block( + block_root, + block, + BlockImportSource::RangeSync, + )?; + + // Start the Prometheus timer. + let _full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES); + + // Increment the Prometheus counter for block processing requests. + metrics::inc_counter(&metrics::BLOCK_PROCESSING_REQUESTS); + + // A small closure to group the verification and import errors. + let chain = self.clone(); + let import_block = async move { + let available_execution_pending: AvailableExecutionPendingBlock = unverified_block + .into_execution_pending_block( + block_root, + &chain, + notify_execution_layer, + )?; + publish_fn()?; + + // Record the time it took to complete consensus verification. + if let Some(timestamp) = self.slot_clock.now_duration() { + self.block_times_cache + .write() + .set_time_consensus_verified(block_root, block_slot, timestamp) + } + + let executed_block = chain + .into_available_executed_block(available_execution_pending) + .await + .inspect_err(|_| { + // If the block fails execution for whatever reason (e.g. engine offline), + // and we keep it in the cache, then the node will NOT perform lookup and + // reprocess this block until the block is evicted from DA checker, causing the + // chain to get stuck temporarily if the block is canonical. Therefore we remove + // it from the cache if execution fails. + self.data_availability_checker + .remove_block_on_execution_error(&block_root); + })?; + + // Record the *additional* time it took to wait for execution layer verification. + if let Some(timestamp) = self.slot_clock.now_duration() { + self.block_times_cache + .write() + .set_time_executed(block_root, block_slot, timestamp) + } + + self.import_available_block(Box::new(executed_block)).await + }; + + // Verify and import the block. + match import_block.await { + Ok(block_root) => { + debug!( + ?block_root, + %block_slot, + source = %BlockImportSource::RangeSync, + "Beacon block imported" + ); + Ok(block_root) + } + Err(BlockError::BeaconChainError(e)) => { + match e.as_ref() { + BeaconChainError::TokioJoin(e) => { + debug!( + error = ?e, + "Beacon block processing cancelled" + ); + } + _ => { + // There was an error whilst attempting to verify and import the block. The block might + // be partially verified or partially imported. + crit!( + error = ?e, + "Beacon block processing error" + ); + } + }; + Err(BlockError::BeaconChainError(e)) + } + // The block failed verification. + Err(other) => { + debug!(reason = other.to_string(), "Beacon block rejected"); + Err(other) + } + } + } + /// Returns `Ok(block_root)` if the given `unverified_block` was successfully verified and /// imported into the chain. /// @@ -3336,7 +3432,9 @@ impl BeaconChain { /// Returns an `Err` if the given block was invalid, or an error was encountered during /// verification. #[instrument(skip_all, fields(block_root = ?block_root, block_source = %block_source))] - pub async fn process_block>( + pub async fn process_block< + B: IntoExecutionPendingBlock, ExecutionPendingBlock>, + >( self: &Arc, block_root: Hash256, unverified_block: B, @@ -3409,7 +3507,8 @@ impl BeaconChain { match executed_block { ExecutedBlock::Available(block) => { - self.import_available_block(Box::new(block)).await + let block_root = self.import_available_block(Box::new(block)).await?; + Ok(AvailabilityProcessingStatus::Imported(block_root)) } ExecutedBlock::AvailabilityPending(block) => { self.check_block_availability_and_import(block).await @@ -3465,6 +3564,28 @@ impl BeaconChain { } } + pub async fn into_available_executed_block( + self: Arc, + available_execution_pending_block: AvailableExecutionPendingBlock, + ) -> Result, BlockError> { + let AvailableExecutionPendingBlock { + block, + import_data, + payload_verification_handle, + } = available_execution_pending_block; + + let payload_verification_outcome = payload_verification_handle + .await + .map_err(BeaconChainError::TokioJoin)? + .ok_or(BeaconChainError::RuntimeShutdown)??; + + Ok(AvailableExecutedBlock::new( + block, + import_data, + payload_verification_outcome, + )) + } + /// Accepts a fully-verified block and awaits on its payload verification handle to /// get a fully `ExecutedBlock`. /// @@ -3721,7 +3842,8 @@ impl BeaconChain { Availability::Available(block) => { publish_fn()?; // Block is fully available, import into fork choice - self.import_available_block(block).await + let block_root = self.import_available_block(block).await?; + Ok(AvailabilityProcessingStatus::Imported(block_root)) } Availability::MissingComponents(block_root) => Ok( AvailabilityProcessingStatus::MissingComponents(slot, block_root), @@ -3733,7 +3855,7 @@ impl BeaconChain { pub async fn import_available_block( self: &Arc, block: Box>, - ) -> Result { + ) -> Result { let AvailableExecutedBlock { block, import_data, @@ -3780,7 +3902,7 @@ impl BeaconChain { .await?? }; - Ok(AvailabilityProcessingStatus::Imported(block_root)) + Ok(block_root) } /// Accepts a fully-verified and available block and imports it into the chain without performing any diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 1ddc51cc351..8d757a2a190 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -50,7 +50,7 @@ use crate::beacon_snapshot::PreProcessingSnapshot; use crate::blob_verification::GossipBlobError; -use crate::block_verification_types::{AsBlock, BlockImportData, RpcBlock}; +use crate::block_verification_types::{AsBlock, AvailableBlock, BlockImportData, RpcBlock}; use crate::data_availability_checker::{AvailabilityCheckError, MaybeAvailableBlock}; use crate::data_column_verification::GossipDataColumnError; use crate::execution_payload::{ @@ -94,6 +94,7 @@ use store::{Error as DBError, KeyValueStore}; use strum::AsRefStr; use task_executor::JoinHandle; use tracing::{Instrument, Span, debug, debug_span, error, info_span, instrument}; +use types::BeaconBlock; use types::{ BeaconBlockRef, BeaconState, BeaconStateError, BlobsList, ChainSpec, DataColumnSidecarList, Epoch, EthSpec, ExecutionBlockHash, FullPayload, Hash256, InconsistentFork, KzgProofs, @@ -623,7 +624,7 @@ pub(crate) fn process_block_slash_info( mut chain_segment: Vec<(Hash256, RpcBlock)>, chain: &BeaconChain, -) -> Result>, BlockError> { +) -> Result>>, BlockError> { if chain_segment.is_empty() { return Ok(vec![]); } @@ -647,18 +648,18 @@ pub fn signature_verify_chain_segment( // unzip chain segment and verify kzg in bulk let (roots, blocks): (Vec<_>, Vec<_>) = chain_segment.into_iter().unzip(); - let maybe_available_blocks = chain + let available_blocks = chain .data_availability_checker .verify_kzg_for_rpc_blocks(blocks)?; // zip it back up let mut signature_verified_blocks = roots .into_iter() - .zip(maybe_available_blocks) - .map(|(block_root, maybe_available_block)| { - let consensus_context = ConsensusContext::new(maybe_available_block.slot()) - .set_current_block_root(block_root); + .zip(available_blocks) + .map(|(block_root, available_block)| { + let consensus_context = + ConsensusContext::new(available_block.slot()).set_current_block_root(block_root); SignatureVerifiedBlock { - block: maybe_available_block, + block: available_block, block_root, parent: None, consensus_context, @@ -700,8 +701,8 @@ pub struct GossipVerifiedBlock { /// A wrapper around a `SignedBeaconBlock` that indicates that all signatures (except the deposit /// signatures) have been verified. -pub struct SignatureVerifiedBlock { - block: MaybeAvailableBlock, +pub struct SignatureVerifiedBlock> { + block: G, block_root: Hash256, parent: Option>, consensus_context: ConsensusContext, @@ -727,219 +728,681 @@ pub struct ExecutionPendingBlock { pub payload_verification_handle: PayloadVerificationHandle, } -pub trait IntoGossipVerifiedBlock: Sized { - fn into_gossip_verified_block( - self, - chain: &BeaconChain, - ) -> Result, BlockError>; - fn inner_block(&self) -> Arc>; +pub struct AvailableExecutionPendingBlock { + pub block: AvailableBlock, + pub import_data: BlockImportData, + pub payload_verification_handle: PayloadVerificationHandle, } -impl IntoGossipVerifiedBlock for GossipVerifiedBlock { - fn into_gossip_verified_block( - self, - _chain: &BeaconChain, - ) -> Result, BlockError> { - Ok(self) - } - fn inner_block(&self) -> Arc> { - self.block_cloned() - } +pub trait BlockWithAvailabilityStatus { + fn block_cloned(&self) -> Arc>; + + fn new(block_root: Hash256, block: Arc>) -> Self; } -impl IntoGossipVerifiedBlock for Arc> { - fn into_gossip_verified_block( - self, - chain: &BeaconChain, - ) -> Result, BlockError> { - GossipVerifiedBlock::new(self, chain) +impl BlockWithAvailabilityStatus for AvailableBlock { + fn block_cloned(&self) -> Arc> { + self.block_cloned() } - fn inner_block(&self) -> Arc> { - self.clone() + fn new(block_root: Hash256, block: Arc>) -> Self { + todo!() } } -pub fn build_blob_data_column_sidecars( - chain: &BeaconChain, - block: &SignedBeaconBlock>, - blobs: BlobsList, - kzg_cell_proofs: KzgProofs, -) -> Result, DataColumnSidecarError> { - // Only attempt to build data columns if blobs is non empty to avoid skewing the metrics. - if blobs.is_empty() { - return Ok(vec![]); +impl BlockWithAvailabilityStatus for MaybeAvailableBlock { + fn block_cloned(&self) -> Arc> { + self.block_cloned() } - let mut timer = metrics::start_timer_vec( - &metrics::DATA_COLUMN_SIDECAR_COMPUTATION, - &[&blobs.len().to_string()], - ); - let blob_refs = blobs.iter().collect::>(); - let sidecars = blobs_to_data_column_sidecars( - &blob_refs, - kzg_cell_proofs.to_vec(), - block, - &chain.kzg, - &chain.spec, - ) - .discard_timer_on_break(&mut timer)?; - drop(timer); - Ok(sidecars) + fn new(block_root: Hash256, block: Arc>) -> Self { + MaybeAvailableBlock::AvailabilityPending { + block_root: block_root, + block, + } + } } -/// Implemented on types that can be converted into a `ExecutionPendingBlock`. -/// -/// Used to allow functions to accept blocks at various stages of verification. -pub trait IntoExecutionPendingBlock: Sized { - #[instrument(skip_all, level = "debug")] - fn into_execution_pending_block( - self, - block_root: Hash256, - chain: &Arc>, - notify_execution_layer: NotifyExecutionLayer, - ) -> Result, BlockError> { - self.into_execution_pending_block_slashable(block_root, chain, notify_execution_layer) - .inspect(|execution_pending| { - // Supply valid block to slasher. - if let Some(slasher) = chain.slasher.as_ref() { - slasher.accept_block_header(execution_pending.block.signed_block_header()); - } - }) - .map_err(|slash_info| process_block_slash_info::<_, BlockError>(chain, slash_info)) - } +pub trait GenericExecutionPendingBlock>: + Sized +{ + fn block(&self) -> Arc>; - /// Convert the block to fully-verified form while producing data to aid checking slashability. - fn into_execution_pending_block_slashable( - self, + /// Instantiates `Self`, a wrapper that indicates that the given `block` is fully valid. See + /// the struct-level documentation for more information. + /// + /// Note: this function does not verify block signatures, it assumes they are valid. Signature + /// verification must be done upstream (e.g., via a `SignatureVerifiedBlock` + /// + /// Returns an error if the block is invalid, or if the block was unable to be verified. + fn from_signature_verified_components( + block: G, block_root: Hash256, + parent: PreProcessingSnapshot, + mut consensus_context: ConsensusContext, chain: &Arc>, notify_execution_layer: NotifyExecutionLayer, - ) -> Result, BlockSlashInfo>; - - fn block(&self) -> &SignedBeaconBlock; - fn block_cloned(&self) -> Arc>; -} - -impl GossipVerifiedBlock { - /// Instantiates `Self`, a wrapper that indicates the given `block` is safe to be re-gossiped - /// on the p2p network. - /// - /// Returns an error if the block is invalid, or if the block was unable to be verified. - #[instrument(name = "verify_gossip_block", skip_all, fields(block_root = tracing::field::Empty))] - pub fn new( - block: Arc>, - chain: &BeaconChain, ) -> Result { - // If the block is valid for gossip we don't supply it to the slasher here because - // we assume it will be transformed into a fully verified block. We *do* need to supply - // it to the slasher if an error occurs, because that's the end of this block's journey, - // and it could be a repeat proposal (a likely cause for slashing!). - let header = block.signed_block_header(); - // The `SignedBeaconBlock` and `SignedBeaconBlockHeader` have the same canonical root, - // but it's way quicker to calculate root of the header since the hash of the tree rooted - // at `BeaconBlockBody` is already computed in the header. - Self::new_without_slasher_checks(block, &header, chain) - .map_err(|e| { - process_block_slash_info::<_, BlockError>( - chain, - BlockSlashInfo::from_early_error_block(header, e), - ) - }) - .inspect(|block| { - let current_span = Span::current(); - current_span.record("block_root", block.block_root.to_string()); - }) - } + let signed_beacon_block = block.block_cloned(); - /// As for new, but doesn't pass the block to the slasher. - fn new_without_slasher_checks( - block: Arc>, - block_header: &SignedBeaconBlockHeader, - chain: &BeaconChain, - ) -> Result { - // Ensure the block is the correct structure for the fork at `block.slot()`. - block - .fork_name(&chain.spec) - .map_err(BlockError::InconsistentFork)?; + chain + .observed_slashable + .write() + .observe_slashable( + signed_beacon_block.slot(), + signed_beacon_block.message().proposer_index(), + block_root, + ) + .map_err(|e| BlockError::BeaconChainError(Box::new(e.into())))?; - // Do not gossip or process blocks from future slots. - let present_slot_with_tolerance = chain - .slot_clock - .now_with_future_tolerance(chain.spec.maximum_gossip_clock_disparity()) - .ok_or(BeaconChainError::UnableToReadSlot)?; - if block.slot() > present_slot_with_tolerance { - return Err(BlockError::FutureSlot { - present_slot: present_slot_with_tolerance, - block_slot: block.slot(), - }); - } + chain + .observed_block_producers + .write() + .observe_proposal(block_root, signed_beacon_block.message()) + .map_err(|e| BlockError::BeaconChainError(Box::new(e.into())))?; - // Do not gossip blocks that claim to contain more blobs than the max allowed - // at the given block epoch. - if let Ok(commitments) = block.message().body().blob_kzg_commitments() { - let max_blobs_at_epoch = chain - .spec - .max_blobs_per_block(block.slot().epoch(T::EthSpec::slots_per_epoch())) - as usize; - if commitments.len() > max_blobs_at_epoch { - return Err(BlockError::InvalidBlobCount { - max_blobs_at_epoch, - block: commitments.len(), + if let Some(parent) = chain + .canonical_head + .fork_choice_read_lock() + .get_block(&signed_beacon_block.parent_root()) + { + // Reject any block where the parent has an invalid payload. It's impossible for a valid + // block to descend from an invalid parent. + if parent.execution_status.is_invalid() { + return Err(BlockError::ParentExecutionPayloadInvalid { + parent_root: signed_beacon_block.parent_root(), }); } + } else { + // Reject any block if its parent is not known to fork choice. + // + // A block that is not in fork choice is either: + // + // - Not yet imported: we should reject this block because we should only import a child + // after its parent has been fully imported. + // - Pre-finalized: if the parent block is _prior_ to finalization, we should ignore it + // because it will revert finalization. Note that the finalized block is stored in fork + // choice, so we will not reject any child of the finalized block (this is relevant during + // genesis). + return Err(BlockError::ParentUnknown { + parent_root: signed_beacon_block.parent_root(), + }); } - let block_root = get_block_header_root(block_header); - - // Do not gossip a block from a finalized slot. - check_block_against_finalized_slot(block.message(), block_root, chain)?; - - // Check if the block is already known. We know it is post-finalization, so it is - // sufficient to check the fork choice. - // - // In normal operation this isn't necessary, however it is useful immediately after a - // reboot if the `observed_block_producers` cache is empty. In that case, without this - // check, we will load the parent and state from disk only to find out later that we - // already know this block. - let fork_choice_read_lock = chain.canonical_head.fork_choice_read_lock(); - if fork_choice_read_lock.contains_block(&block_root) { - return Err(BlockError::DuplicateFullyImported(block_root)); - } - - // Do not process a block that is known to be invalid. - chain.check_invalid_block_roots(block_root)?; + /* + * Perform cursory checks to see if the block is even worth processing. + */ + check_block_relevancy(&signed_beacon_block, block_root, chain)?; - // Do not process a block that doesn't descend from the finalized root. + // Define a future that will verify the execution payload with an execution engine. // - // We check this *before* we load the parent so that we can return a more detailed error. - let block = check_block_is_finalized_checkpoint_or_descendant( - chain, - &fork_choice_read_lock, - block, + // We do this as early as possible so that later parts of this function can run in parallel + // with the payload verification. + let payload_notifier = PayloadNotifier::new( + chain.clone(), + signed_beacon_block.clone(), + &parent.pre_state, + notify_execution_layer, )?; + let is_valid_merge_transition_block = + is_merge_transition_block(&parent.pre_state, signed_beacon_block.message().body()); - let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch()); - let (parent_block, block) = - verify_parent_block_is_known::(&fork_choice_read_lock, block)?; - drop(fork_choice_read_lock); + let payload_verification_future = async move { + let chain = payload_notifier.chain.clone(); + let block = payload_notifier.block.clone(); - // Track the number of skip slots between the block and its parent. - metrics::set_gauge( - &metrics::GOSSIP_BEACON_BLOCK_SKIPPED_SLOTS, - block - .slot() - .as_u64() - .saturating_sub(1) - .saturating_sub(parent_block.slot.into()) as i64, - ); + // If this block triggers the merge, check to ensure that it references valid execution + // blocks. + // + // The specification defines this check inside `on_block` in the fork-choice specification, + // however we perform the check here for two reasons: + // + // - There's no point in importing a block that will fail fork choice, so it's best to fail + // early. + // - Doing the check here means we can keep our fork-choice implementation "pure". I.e., no + // calls to remote servers. + if is_valid_merge_transition_block { + validate_merge_block(&chain, block.message(), AllowOptimisticImport::Yes).await?; + }; - // Paranoid check to prevent propagation of blocks that don't form a legitimate chain. - // - // This is not in the spec, but @protolambda tells me that the majority of other clients are - // already doing it. For reference: - // - // https://github.com/ethereum/eth2.0-specs/pull/2196 + // The specification declares that this should be run *inside* `per_block_processing`, + // however we run it here to keep `per_block_processing` pure (i.e., no calls to external + // servers). + if let Some(started_execution) = chain.slot_clock.now_duration() { + chain.block_times_cache.write().set_time_started_execution( + block_root, + block.slot(), + started_execution, + ); + } + let payload_verification_status = payload_notifier.notify_new_payload().await?; + + Ok(PayloadVerificationOutcome { + payload_verification_status, + is_valid_merge_transition_block, + }) + }; + // Spawn the payload verification future as a new task, but don't wait for it to complete. + // The `payload_verification_future` will be awaited later to ensure verification completed + // successfully. + let current_span = Span::current(); + let payload_verification_handle = chain + .task_executor + .spawn_handle( + payload_verification_future.instrument(current_span), + "execution_payload_verification", + ) + .ok_or(BeaconChainError::RuntimeShutdown)?; + + /* + * Advance the given `parent.beacon_state` to the slot of the given `block`. + */ + + let catchup_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_CATCHUP_STATE); + + let mut state = parent.pre_state; + + // The block must have a higher slot than its parent. + if signed_beacon_block.slot() <= parent.beacon_block.slot() { + return Err(BlockError::BlockIsNotLaterThanParent { + block_slot: signed_beacon_block.slot(), + parent_slot: parent.beacon_block.slot(), + }); + } + + // Perform a sanity check on the pre-state. + let parent_slot = parent.beacon_block.slot(); + if state.slot() < parent_slot || state.slot() > signed_beacon_block.slot() { + return Err(BeaconChainError::BadPreState { + parent_root: parent.beacon_block_root, + parent_slot, + block_root, + block_slot: signed_beacon_block.slot(), + state_slot: state.slot(), + } + .into()); + } + + // Transition the parent state to the block slot. + // + // It is important to note that we're using a "pre-state" here, one that has potentially + // been advanced one slot forward from `parent.beacon_block.slot`. + let mut summaries = vec![]; + + let distance = signed_beacon_block + .slot() + .as_u64() + .saturating_sub(state.slot().as_u64()); + for _ in 0..distance { + let state_root = if parent.beacon_block.slot() == state.slot() { + // If it happens that `pre_state` has *not* already been advanced forward a single + // slot, then there is no need to compute the state root for this + // `per_slot_processing` call since that state root is already stored in the parent + // block. + parent.beacon_block.state_root() + } else { + // This is a new state we've reached, so stage it for storage in the DB. + // Computing the state root here is time-equivalent to computing it during slot + // processing, but we get early access to it. + let state_root = state.update_tree_hash_cache()?; + + // Store the state immediately. States are ONLY deleted on finalization pruning, so + // we won't have race conditions where we should have written a state and didn't. + let state_already_exists = + chain.store.load_hot_state_summary(&state_root)?.is_some(); + + if state_already_exists { + // If the state exists, we do not need to re-write it. + } else { + // Recycle store codepath to create a state summary and store the state / diff + let mut ops = vec![]; + chain.store.store_hot_state(&state_root, &state, &mut ops)?; + chain.store.hot_db.do_atomically(ops)?; + }; + + state_root + }; + + if let Some(summary) = per_slot_processing(&mut state, Some(state_root), &chain.spec)? { + // Expose Prometheus metrics. + if let Err(e) = summary.observe_metrics() { + error!( + src = "block_verification", + error = ?e, + "Failed to observe epoch summary metrics" + ); + } + summaries.push(summary); + } + } + metrics::stop_timer(catchup_timer); + + let block_slot = signed_beacon_block.slot(); + let state_current_epoch = state.current_epoch(); + + // If the block is sufficiently recent, notify the validator monitor. + if let Some(slot) = chain.slot_clock.now() { + let epoch = slot.epoch(T::EthSpec::slots_per_epoch()); + if block_slot.epoch(T::EthSpec::slots_per_epoch()) + + VALIDATOR_MONITOR_HISTORIC_EPOCHS as u64 + >= epoch + { + let validator_monitor = chain.validator_monitor.read(); + // Update the summaries in a separate loop to `per_slot_processing`. This protects + // the `validator_monitor` lock from being bounced or held for a long time whilst + // performing `per_slot_processing`. + for (i, summary) in summaries.iter().enumerate() { + let epoch = state_current_epoch - Epoch::from(summaries.len() - i); + if let Err(e) = + validator_monitor.process_validator_statuses(epoch, summary, &chain.spec) + { + error!( + error = ?e, + "Failed to process validator statuses" + ); + } + } + } + } + + /* + * Build the committee caches on the state. + */ + + let committee_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_COMMITTEE); + + state.build_all_committee_caches(&chain.spec)?; + + metrics::stop_timer(committee_timer); + + /* + * If we have block reward listeners, compute the block reward and push it to the + * event handler. + */ + if let Some(ref event_handler) = chain.event_handler + && event_handler.has_block_reward_subscribers() + { + let mut reward_cache = Default::default(); + let block_reward = chain.compute_block_reward( + signed_beacon_block.message(), + block_root, + &state, + &mut reward_cache, + true, + )?; + event_handler.register(EventKind::BlockReward(block_reward)); + } + + /* + * Perform `per_block_processing` on the block and state, returning early if the block is + * invalid. + */ + + write_state(&format!("state_pre_block_{}", block_root), &state); + write_block(signed_beacon_block.as_block(), block_root); + + let core_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_CORE); + + if let Err(err) = per_block_processing( + &mut state, + signed_beacon_block.as_block(), + // Signatures were verified earlier in this function. + BlockSignatureStrategy::NoVerification, + VerifyBlockRoot::True, + &mut consensus_context, + &chain.spec, + ) { + match err { + // Capture `BeaconStateError` so that we can easily distinguish between a block + // that's invalid and one that caused an internal error. + BlockProcessingError::BeaconStateError(e) => return Err(e.into()), + other => return Err(BlockError::PerBlockProcessingError(other)), + } + }; + + metrics::stop_timer(core_timer); + + /* + * Calculate the state root of the newly modified state + */ + + let state_root_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_STATE_ROOT); + + let state_root = state.update_tree_hash_cache()?; + + metrics::stop_timer(state_root_timer); + + write_state(&format!("state_post_block_{}", block_root), &state); + + /* + * Check to ensure the state root on the block matches the one we have calculated. + */ + + if signed_beacon_block.state_root() != state_root { + return Err(BlockError::StateRootMismatch { + block: signed_beacon_block.state_root(), + local: state_root, + }); + } + + /* + * Apply the block's attestations to fork choice. + * + * We're running in parallel with the payload verification at this point, so this is + * free real estate. + */ + let current_slot = chain.slot()?; + let mut fork_choice = chain.canonical_head.fork_choice_write_lock(); + + // Register each attester slashing in the block with fork choice. + for attester_slashing in signed_beacon_block.message().body().attester_slashings() { + fork_choice.on_attester_slashing(attester_slashing); + } + + // Register each attestation in the block with fork choice. + for (i, attestation) in signed_beacon_block + .message() + .body() + .attestations() + .enumerate() + { + let indexed_attestation = consensus_context + .get_indexed_attestation(&state, attestation) + .map_err(|e| BlockError::PerBlockProcessingError(e.into_with_index(i)))?; + + match fork_choice.on_attestation( + current_slot, + indexed_attestation, + AttestationFromBlock::True, + ) { + Ok(()) => Ok(()), + // Ignore invalid attestations whilst importing attestations from a block. The + // block might be very old and therefore the attestations useless to fork choice. + Err(ForkChoiceError::InvalidAttestation(_)) => Ok(()), + Err(e) => Err(BlockError::BeaconChainError(Box::new(e.into()))), + }?; + } + drop(fork_choice); + + let block_import_data = BlockImportData { + block_root, + state, + parent_block: parent.beacon_block, + consensus_context, + }; + + Ok(Self::new( + block, + block_import_data, + payload_verification_handle, + )) + } + + fn new( + block: G, + import_data: BlockImportData, + payload_verification_handle: PayloadVerificationHandle, + ) -> Self; +} + +impl GenericExecutionPendingBlock> + for ExecutionPendingBlock +{ + fn new( + block: MaybeAvailableBlock, + import_data: BlockImportData<::EthSpec>, + payload_verification_handle: PayloadVerificationHandle, + ) -> Self { + Self { + block, + import_data, + payload_verification_handle, + } + } + + fn block(&self) -> Arc::EthSpec>> { + todo!() + } +} + +impl GenericExecutionPendingBlock> + for AvailableExecutionPendingBlock +{ + fn new( + block: AvailableBlock, + import_data: BlockImportData<::EthSpec>, + payload_verification_handle: PayloadVerificationHandle, + ) -> Self { + Self { + block, + import_data, + payload_verification_handle, + } + } + + fn block(&self) -> Arc::EthSpec>> { + todo!() + } +} + +pub trait IntoGossipVerifiedBlock: Sized { + fn into_gossip_verified_block( + self, + chain: &BeaconChain, + ) -> Result, BlockError>; + fn inner_block(&self) -> Arc>; +} + +impl IntoGossipVerifiedBlock for GossipVerifiedBlock { + fn into_gossip_verified_block( + self, + _chain: &BeaconChain, + ) -> Result, BlockError> { + Ok(self) + } + fn inner_block(&self) -> Arc> { + self.block.block_cloned() + } +} + +impl IntoGossipVerifiedBlock for Arc> { + fn into_gossip_verified_block( + self, + chain: &BeaconChain, + ) -> Result, BlockError> { + GossipVerifiedBlock::new(self, chain) + } + + fn inner_block(&self) -> Arc> { + self.clone() + } +} + +pub fn build_blob_data_column_sidecars( + chain: &BeaconChain, + block: &SignedBeaconBlock>, + blobs: BlobsList, + kzg_cell_proofs: KzgProofs, +) -> Result, DataColumnSidecarError> { + // Only attempt to build data columns if blobs is non empty to avoid skewing the metrics. + if blobs.is_empty() { + return Ok(vec![]); + } + + let mut timer = metrics::start_timer_vec( + &metrics::DATA_COLUMN_SIDECAR_COMPUTATION, + &[&blobs.len().to_string()], + ); + let blob_refs = blobs.iter().collect::>(); + let sidecars = blobs_to_data_column_sidecars( + &blob_refs, + kzg_cell_proofs.to_vec(), + block, + &chain.kzg, + &chain.spec, + ) + .discard_timer_on_break(&mut timer)?; + drop(timer); + Ok(sidecars) +} + +/// Implemented on types that can be converted into a `ExecutionPendingBlock`. +/// +/// Used to allow functions to accept blocks at various stages of verification. +pub trait IntoExecutionPendingBlock< + T: BeaconChainTypes, + G: BlockWithAvailabilityStatus, + B: GenericExecutionPendingBlock, +>: Sized +{ + #[instrument(skip_all, level = "debug")] + fn into_execution_pending_block( + self, + block_root: Hash256, + chain: &Arc>, + notify_execution_layer: NotifyExecutionLayer, + ) -> Result { + self.into_execution_pending_block_slashable(block_root, chain, notify_execution_layer) + .inspect(|execution_pending| { + // Supply valid block to slasher. + if let Some(slasher) = chain.slasher.as_ref() { + slasher.accept_block_header(execution_pending.block().signed_block_header()); + } + }) + .map_err(|slash_info| process_block_slash_info::<_, BlockError>(chain, slash_info)) + } + + /// Convert the block to fully-verified form while producing data to aid checking slashability. + fn into_execution_pending_block_slashable( + self, + block_root: Hash256, + chain: &Arc>, + notify_execution_layer: NotifyExecutionLayer, + ) -> Result>; + + fn block(&self) -> &SignedBeaconBlock; + fn block_cloned(&self) -> Arc>; +} + +impl GossipVerifiedBlock { + /// Instantiates `Self`, a wrapper that indicates the given `block` is safe to be re-gossiped + /// on the p2p network. + /// + /// Returns an error if the block is invalid, or if the block was unable to be verified. + #[instrument(name = "verify_gossip_block", skip_all, fields(block_root = tracing::field::Empty))] + pub fn new( + block: Arc>, + chain: &BeaconChain, + ) -> Result { + // If the block is valid for gossip we don't supply it to the slasher here because + // we assume it will be transformed into a fully verified block. We *do* need to supply + // it to the slasher if an error occurs, because that's the end of this block's journey, + // and it could be a repeat proposal (a likely cause for slashing!). + let header = block.signed_block_header(); + // The `SignedBeaconBlock` and `SignedBeaconBlockHeader` have the same canonical root, + // but it's way quicker to calculate root of the header since the hash of the tree rooted + // at `BeaconBlockBody` is already computed in the header. + Self::new_without_slasher_checks(block, &header, chain) + .map_err(|e| { + process_block_slash_info::<_, BlockError>( + chain, + BlockSlashInfo::from_early_error_block(header, e), + ) + }) + .inspect(|block| { + let current_span = Span::current(); + current_span.record("block_root", block.block_root.to_string()); + }) + } + + /// As for new, but doesn't pass the block to the slasher. + fn new_without_slasher_checks( + block: Arc>, + block_header: &SignedBeaconBlockHeader, + chain: &BeaconChain, + ) -> Result { + // Ensure the block is the correct structure for the fork at `block.slot()`. + block + .fork_name(&chain.spec) + .map_err(BlockError::InconsistentFork)?; + + // Do not gossip or process blocks from future slots. + let present_slot_with_tolerance = chain + .slot_clock + .now_with_future_tolerance(chain.spec.maximum_gossip_clock_disparity()) + .ok_or(BeaconChainError::UnableToReadSlot)?; + if block.slot() > present_slot_with_tolerance { + return Err(BlockError::FutureSlot { + present_slot: present_slot_with_tolerance, + block_slot: block.slot(), + }); + } + + // Do not gossip blocks that claim to contain more blobs than the max allowed + // at the given block epoch. + if let Ok(commitments) = block.message().body().blob_kzg_commitments() { + let max_blobs_at_epoch = chain + .spec + .max_blobs_per_block(block.slot().epoch(T::EthSpec::slots_per_epoch())) + as usize; + if commitments.len() > max_blobs_at_epoch { + return Err(BlockError::InvalidBlobCount { + max_blobs_at_epoch, + block: commitments.len(), + }); + } + } + + let block_root = get_block_header_root(block_header); + + // Do not gossip a block from a finalized slot. + check_block_against_finalized_slot(block.message(), block_root, chain)?; + + // Check if the block is already known. We know it is post-finalization, so it is + // sufficient to check the fork choice. + // + // In normal operation this isn't necessary, however it is useful immediately after a + // reboot if the `observed_block_producers` cache is empty. In that case, without this + // check, we will load the parent and state from disk only to find out later that we + // already know this block. + let fork_choice_read_lock = chain.canonical_head.fork_choice_read_lock(); + if fork_choice_read_lock.contains_block(&block_root) { + return Err(BlockError::DuplicateFullyImported(block_root)); + } + + // Do not process a block that is known to be invalid. + chain.check_invalid_block_roots(block_root)?; + + // Do not process a block that doesn't descend from the finalized root. + // + // We check this *before* we load the parent so that we can return a more detailed error. + let block = check_block_is_finalized_checkpoint_or_descendant( + chain, + &fork_choice_read_lock, + block, + )?; + + let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch()); + let (parent_block, block) = + verify_parent_block_is_known::(&fork_choice_read_lock, block)?; + drop(fork_choice_read_lock); + + // Track the number of skip slots between the block and its parent. + metrics::set_gauge( + &metrics::GOSSIP_BEACON_BLOCK_SKIPPED_SLOTS, + block + .slot() + .as_u64() + .saturating_sub(1) + .saturating_sub(parent_block.slot.into()) as i64, + ); + + // Paranoid check to prevent propagation of blocks that don't form a legitimate chain. + // + // This is not in the spec, but @protolambda tells me that the majority of other clients are + // already doing it. For reference: + // + // https://github.com/ethereum/eth2.0-specs/pull/2196 if parent_block.slot >= block.slot() { return Err(BlockError::BlockIsNotLaterThanParent { block_slot: block.slot(), @@ -1050,237 +1513,21 @@ impl GossipVerifiedBlock { block, block_root, parent: opt_parent, - consensus_context, - }) - } - - pub fn block_root(&self) -> Hash256 { - self.block_root - } -} - -impl IntoExecutionPendingBlock for GossipVerifiedBlock { - /// Completes verification of the wrapped `block`. - #[instrument( - name = "gossip_block_into_execution_pending_block_slashable", - level = "debug" - skip_all, - )] - fn into_execution_pending_block_slashable( - self, - block_root: Hash256, - chain: &Arc>, - notify_execution_layer: NotifyExecutionLayer, - ) -> Result, BlockSlashInfo> { - let execution_pending = - SignatureVerifiedBlock::from_gossip_verified_block_check_slashable(self, chain)?; - execution_pending.into_execution_pending_block_slashable( - block_root, - chain, - notify_execution_layer, - ) - } - - fn block(&self) -> &SignedBeaconBlock { - self.block.as_block() - } - - fn block_cloned(&self) -> Arc> { - self.block.clone() - } -} - -impl SignatureVerifiedBlock { - /// Instantiates `Self`, a wrapper that indicates that all signatures (except the deposit - /// signatures) are valid (i.e., signed by the correct public keys). - /// - /// Returns an error if the block is invalid, or if the block was unable to be verified. - pub fn new( - block: MaybeAvailableBlock, - block_root: Hash256, - chain: &BeaconChain, - ) -> Result { - // Ensure the block is the correct structure for the fork at `block.slot()`. - block - .as_block() - .fork_name(&chain.spec) - .map_err(BlockError::InconsistentFork)?; - - // Check whether the block is a banned block prior to loading the parent. - chain.check_invalid_block_roots(block_root)?; - - let (mut parent, block) = load_parent(block, chain)?; - - let state = cheap_state_advance_to_obtain_committees::<_, BlockError>( - &mut parent.pre_state, - parent.beacon_state_root, - block.slot(), - &chain.spec, - )?; - - let pubkey_cache = get_validator_pubkey_cache(chain)?; - - let mut signature_verifier = get_signature_verifier(&state, &pubkey_cache, &chain.spec); - - let mut consensus_context = - ConsensusContext::new(block.slot()).set_current_block_root(block_root); - - signature_verifier.include_all_signatures(block.as_block(), &mut consensus_context)?; - - if signature_verifier.verify().is_ok() { - Ok(Self { - consensus_context, - block, - block_root, - parent: Some(parent), - }) - } else { - // Re-verify the proposer signature in isolation to attribute fault - let pubkey = pubkey_cache - .get(block.message().proposer_index() as usize) - .ok_or_else(|| BlockError::UnknownValidator(block.message().proposer_index()))?; - if block.as_block().verify_signature( - Some(block_root), - pubkey, - &state.fork(), - chain.genesis_validators_root, - &chain.spec, - ) { - // Proposer signature is valid, the invalid signature must be in the body - Err(BlockError::InvalidSignature( - InvalidSignature::BlockBodySignatures, - )) - } else { - Err(BlockError::InvalidSignature( - InvalidSignature::ProposerSignature, - )) - } - } - } - - /// As for `new` above but producing `BlockSlashInfo`. - pub fn check_slashable( - block: MaybeAvailableBlock, - block_root: Hash256, - chain: &BeaconChain, - ) -> Result> { - let header = block.signed_block_header(); - Self::new(block, block_root, chain) - .map_err(|e| BlockSlashInfo::from_early_error_block(header, e)) - } - - /// Finishes signature verification on the provided `GossipVerifedBlock`. Does not re-verify - /// the proposer signature. - #[instrument(skip_all, level = "debug")] - pub fn from_gossip_verified_block( - from: GossipVerifiedBlock, - chain: &BeaconChain, - ) -> Result { - let (mut parent, block) = if let Some(parent) = from.parent { - (parent, from.block) - } else { - load_parent(from.block, chain)? - }; - - let state = cheap_state_advance_to_obtain_committees::<_, BlockError>( - &mut parent.pre_state, - parent.beacon_state_root, - block.slot(), - &chain.spec, - )?; - - let pubkey_cache = get_validator_pubkey_cache(chain)?; - - let mut signature_verifier = get_signature_verifier(&state, &pubkey_cache, &chain.spec); - - // Gossip verification has already checked the proposer index. Use it to check the RANDAO - // signature. - let mut consensus_context = from.consensus_context; - signature_verifier - .include_all_signatures_except_proposal(block.as_ref(), &mut consensus_context)?; - - let result = info_span!("signature_verify").in_scope(|| signature_verifier.verify()); - match result { - Ok(_) => Ok(Self { - block: MaybeAvailableBlock::AvailabilityPending { - block_root: from.block_root, - block, - }, - block_root: from.block_root, - parent: Some(parent), - consensus_context, - }), - Err(_) => Err(BlockError::InvalidSignature( - InvalidSignature::BlockBodySignatures, - )), - } - } - - /// Same as `from_gossip_verified_block` but producing slashing-relevant data as well. - pub fn from_gossip_verified_block_check_slashable( - from: GossipVerifiedBlock, - chain: &BeaconChain, - ) -> Result> { - let header = from.block.signed_block_header(); - Self::from_gossip_verified_block(from, chain) - .map_err(|e| BlockSlashInfo::from_early_error_block(header, e)) - } - - pub fn block_root(&self) -> Hash256 { - self.block_root - } - - pub fn slot(&self) -> Slot { - self.block.slot() - } -} - -impl IntoExecutionPendingBlock for SignatureVerifiedBlock { - /// Completes verification of the wrapped `block`. - #[instrument( - name = "sig_verified_block_into_execution_pending_block_slashable", - level = "debug" - skip_all, - )] - fn into_execution_pending_block_slashable( - self, - block_root: Hash256, - chain: &Arc>, - notify_execution_layer: NotifyExecutionLayer, - ) -> Result, BlockSlashInfo> { - let header = self.block.signed_block_header(); - let (parent, block) = if let Some(parent) = self.parent { - (parent, self.block) - } else { - load_parent(self.block, chain) - .map_err(|e| BlockSlashInfo::SignatureValid(header.clone(), e))? - }; - - ExecutionPendingBlock::from_signature_verified_components( - block, - block_root, - parent, - self.consensus_context, - chain, - notify_execution_layer, - ) - .map_err(|e| BlockSlashInfo::SignatureValid(header, e)) - } - - fn block(&self) -> &SignedBeaconBlock { - self.block.as_block() + consensus_context, + }) } - fn block_cloned(&self) -> Arc> { - self.block.block_cloned() + pub fn block_root(&self) -> Hash256 { + self.block_root } } -impl IntoExecutionPendingBlock for RpcBlock { - /// Verifies the `SignedBeaconBlock` by first transforming it into a `SignatureVerifiedBlock` - /// and then using that implementation of `IntoExecutionPendingBlock` to complete verification. +impl, B: GenericExecutionPendingBlock> + IntoExecutionPendingBlock for GossipVerifiedBlock +{ + /// Completes verification of the wrapped `block`. #[instrument( - name = "rpc_block_into_execution_pending_block_slashable", + name = "gossip_block_into_execution_pending_block_slashable", level = "debug" skip_all, )] @@ -1289,386 +1536,254 @@ impl IntoExecutionPendingBlock for RpcBlock block_root: Hash256, chain: &Arc>, notify_execution_layer: NotifyExecutionLayer, - ) -> Result, BlockSlashInfo> { - // Perform an early check to prevent wasting time on irrelevant blocks. - let block_root = check_block_relevancy(self.as_block(), block_root, chain) - .map_err(|e| BlockSlashInfo::SignatureNotChecked(self.signed_block_header(), e))?; - let maybe_available = chain - .data_availability_checker - .verify_kzg_for_rpc_block(self.clone()) - .map_err(|e| { - BlockSlashInfo::SignatureNotChecked( - self.signed_block_header(), - BlockError::AvailabilityCheck(e), - ) - })?; - SignatureVerifiedBlock::check_slashable(maybe_available, block_root, chain)? - .into_execution_pending_block_slashable(block_root, chain, notify_execution_layer) + ) -> Result> { + let execution_pending = + SignatureVerifiedBlock::from_gossip_verified_block_check_slashable(self, chain)?; + execution_pending.into_execution_pending_block_slashable( + block_root, + chain, + notify_execution_layer, + ) } fn block(&self) -> &SignedBeaconBlock { - self.as_block() + self.block.as_block() } fn block_cloned(&self) -> Arc> { - self.block_cloned() + self.block.clone() } } -impl ExecutionPendingBlock { - /// Instantiates `Self`, a wrapper that indicates that the given `block` is fully valid. See - /// the struct-level documentation for more information. - /// - /// Note: this function does not verify block signatures, it assumes they are valid. Signature - /// verification must be done upstream (e.g., via a `SignatureVerifiedBlock` +impl> SignatureVerifiedBlock { + /// Instantiates `Self`, a wrapper that indicates that all signatures (except the deposit + /// signatures) are valid (i.e., signed by the correct public keys). /// /// Returns an error if the block is invalid, or if the block was unable to be verified. - #[instrument(skip_all, level = "debug")] - pub fn from_signature_verified_components( - block: MaybeAvailableBlock, - block_root: Hash256, - parent: PreProcessingSnapshot, - mut consensus_context: ConsensusContext, - chain: &Arc>, - notify_execution_layer: NotifyExecutionLayer, - ) -> Result { - chain - .observed_slashable - .write() - .observe_slashable(block.slot(), block.message().proposer_index(), block_root) - .map_err(|e| BlockError::BeaconChainError(Box::new(e.into())))?; - - chain - .observed_block_producers - .write() - .observe_proposal(block_root, block.message()) - .map_err(|e| BlockError::BeaconChainError(Box::new(e.into())))?; - - if let Some(parent) = chain - .canonical_head - .fork_choice_read_lock() - .get_block(&block.parent_root()) - { - // Reject any block where the parent has an invalid payload. It's impossible for a valid - // block to descend from an invalid parent. - if parent.execution_status.is_invalid() { - return Err(BlockError::ParentExecutionPayloadInvalid { - parent_root: block.parent_root(), - }); - } - } else { - // Reject any block if its parent is not known to fork choice. - // - // A block that is not in fork choice is either: - // - // - Not yet imported: we should reject this block because we should only import a child - // after its parent has been fully imported. - // - Pre-finalized: if the parent block is _prior_ to finalization, we should ignore it - // because it will revert finalization. Note that the finalized block is stored in fork - // choice, so we will not reject any child of the finalized block (this is relevant during - // genesis). - return Err(BlockError::ParentUnknown { - parent_root: block.parent_root(), - }); - } - - /* - * Perform cursory checks to see if the block is even worth processing. - */ - check_block_relevancy(block.as_block(), block_root, chain)?; - - // Define a future that will verify the execution payload with an execution engine. - // - // We do this as early as possible so that later parts of this function can run in parallel - // with the payload verification. - let payload_notifier = PayloadNotifier::new( - chain.clone(), - block.block_cloned(), - &parent.pre_state, - notify_execution_layer, - )?; - let is_valid_merge_transition_block = - is_merge_transition_block(&parent.pre_state, block.message().body()); - - let payload_verification_future = async move { - let chain = payload_notifier.chain.clone(); - let block = payload_notifier.block.clone(); - - // If this block triggers the merge, check to ensure that it references valid execution - // blocks. - // - // The specification defines this check inside `on_block` in the fork-choice specification, - // however we perform the check here for two reasons: - // - // - There's no point in importing a block that will fail fork choice, so it's best to fail - // early. - // - Doing the check here means we can keep our fork-choice implementation "pure". I.e., no - // calls to remote servers. - if is_valid_merge_transition_block { - validate_merge_block(&chain, block.message(), AllowOptimisticImport::Yes).await?; - }; - - // The specification declares that this should be run *inside* `per_block_processing`, - // however we run it here to keep `per_block_processing` pure (i.e., no calls to external - // servers). - if let Some(started_execution) = chain.slot_clock.now_duration() { - chain.block_times_cache.write().set_time_started_execution( - block_root, - block.slot(), - started_execution, - ); - } - let payload_verification_status = payload_notifier.notify_new_payload().await?; - - Ok(PayloadVerificationOutcome { - payload_verification_status, - is_valid_merge_transition_block, - }) - }; - // Spawn the payload verification future as a new task, but don't wait for it to complete. - // The `payload_verification_future` will be awaited later to ensure verification completed - // successfully. - let current_span = Span::current(); - let payload_verification_handle = chain - .task_executor - .spawn_handle( - payload_verification_future.instrument(current_span), - "execution_payload_verification", - ) - .ok_or(BeaconChainError::RuntimeShutdown)?; - - /* - * Advance the given `parent.beacon_state` to the slot of the given `block`. - */ - - let catchup_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_CATCHUP_STATE); - - let mut state = parent.pre_state; - - // The block must have a higher slot than its parent. - if block.slot() <= parent.beacon_block.slot() { - return Err(BlockError::BlockIsNotLaterThanParent { - block_slot: block.slot(), - parent_slot: parent.beacon_block.slot(), - }); - } - - // Perform a sanity check on the pre-state. - let parent_slot = parent.beacon_block.slot(); - if state.slot() < parent_slot || state.slot() > block.slot() { - return Err(BeaconChainError::BadPreState { - parent_root: parent.beacon_block_root, - parent_slot, - block_root, - block_slot: block.slot(), - state_slot: state.slot(), - } - .into()); - } - - // Transition the parent state to the block slot. - // - // It is important to note that we're using a "pre-state" here, one that has potentially - // been advanced one slot forward from `parent.beacon_block.slot`. - let mut summaries = vec![]; - - let distance = block.slot().as_u64().saturating_sub(state.slot().as_u64()); - for _ in 0..distance { - let state_root = if parent.beacon_block.slot() == state.slot() { - // If it happens that `pre_state` has *not* already been advanced forward a single - // slot, then there is no need to compute the state root for this - // `per_slot_processing` call since that state root is already stored in the parent - // block. - parent.beacon_block.state_root() - } else { - // This is a new state we've reached, so stage it for storage in the DB. - // Computing the state root here is time-equivalent to computing it during slot - // processing, but we get early access to it. - let state_root = state.update_tree_hash_cache()?; - - // Store the state immediately. States are ONLY deleted on finalization pruning, so - // we won't have race conditions where we should have written a state and didn't. - let state_already_exists = - chain.store.load_hot_state_summary(&state_root)?.is_some(); - - if state_already_exists { - // If the state exists, we do not need to re-write it. - } else { - // Recycle store codepath to create a state summary and store the state / diff - let mut ops = vec![]; - chain.store.store_hot_state(&state_root, &state, &mut ops)?; - chain.store.hot_db.do_atomically(ops)?; - }; - - state_root - }; - - if let Some(summary) = per_slot_processing(&mut state, Some(state_root), &chain.spec)? { - // Expose Prometheus metrics. - if let Err(e) = summary.observe_metrics() { - error!( - src = "block_verification", - error = ?e, - "Failed to observe epoch summary metrics" - ); - } - summaries.push(summary); - } - } - metrics::stop_timer(catchup_timer); - - let block_slot = block.slot(); - let state_current_epoch = state.current_epoch(); - - // If the block is sufficiently recent, notify the validator monitor. - if let Some(slot) = chain.slot_clock.now() { - let epoch = slot.epoch(T::EthSpec::slots_per_epoch()); - if block_slot.epoch(T::EthSpec::slots_per_epoch()) - + VALIDATOR_MONITOR_HISTORIC_EPOCHS as u64 - >= epoch - { - let validator_monitor = chain.validator_monitor.read(); - // Update the summaries in a separate loop to `per_slot_processing`. This protects - // the `validator_monitor` lock from being bounced or held for a long time whilst - // performing `per_slot_processing`. - for (i, summary) in summaries.iter().enumerate() { - let epoch = state_current_epoch - Epoch::from(summaries.len() - i); - if let Err(e) = - validator_monitor.process_validator_statuses(epoch, summary, &chain.spec) - { - error!( - error = ?e, - "Failed to process validator statuses" - ); - } - } - } - } + pub fn new(block: G, block_root: Hash256, chain: &BeaconChain) -> Result { + // Ensure the block is the correct structure for the fork at `block.slot()`. - /* - * Build the committee caches on the state. - */ + let signed_beacon_block = block.block_cloned(); - let committee_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_COMMITTEE); + signed_beacon_block + .fork_name(&chain.spec) + .map_err(BlockError::InconsistentFork)?; - state.build_all_committee_caches(&chain.spec)?; + // Check whether the block is a banned block prior to loading the parent. + chain.check_invalid_block_roots(block_root)?; - metrics::stop_timer(committee_timer); + let (mut parent, signed_beacon_block) = load_parent(signed_beacon_block, chain)?; - /* - * If we have block reward listeners, compute the block reward and push it to the - * event handler. - */ - if let Some(ref event_handler) = chain.event_handler - && event_handler.has_block_reward_subscribers() - { - let mut reward_cache = Default::default(); - let block_reward = chain.compute_block_reward( - block.message(), - block_root, - &state, - &mut reward_cache, - true, - )?; - event_handler.register(EventKind::BlockReward(block_reward)); - } + let state = cheap_state_advance_to_obtain_committees::<_, BlockError>( + &mut parent.pre_state, + parent.beacon_state_root, + signed_beacon_block.slot(), + &chain.spec, + )?; - /* - * Perform `per_block_processing` on the block and state, returning early if the block is - * invalid. - */ + let pubkey_cache = get_validator_pubkey_cache(chain)?; - write_state(&format!("state_pre_block_{}", block_root), &state); - write_block(block.as_block(), block_root); + let mut signature_verifier = get_signature_verifier(&state, &pubkey_cache, &chain.spec); - let core_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_CORE); + let mut consensus_context = + ConsensusContext::new(signed_beacon_block.slot()).set_current_block_root(block_root); - if let Err(err) = per_block_processing( - &mut state, - block.as_block(), - // Signatures were verified earlier in this function. - BlockSignatureStrategy::NoVerification, - VerifyBlockRoot::True, - &mut consensus_context, - &chain.spec, - ) { - match err { - // Capture `BeaconStateError` so that we can easily distinguish between a block - // that's invalid and one that caused an internal error. - BlockProcessingError::BeaconStateError(e) => return Err(e.into()), - other => return Err(BlockError::PerBlockProcessingError(other)), + signature_verifier.include_all_signatures(&signed_beacon_block, &mut consensus_context)?; + + if signature_verifier.verify().is_ok() { + Ok(Self { + consensus_context, + block, + block_root, + parent: Some(parent), + }) + } else { + // Re-verify the proposer signature in isolation to attribute fault + let pubkey = pubkey_cache + .get(signed_beacon_block.message().proposer_index() as usize) + .ok_or_else(|| { + BlockError::UnknownValidator(signed_beacon_block.message().proposer_index()) + })?; + if signed_beacon_block.verify_signature( + Some(block_root), + pubkey, + &state.fork(), + chain.genesis_validators_root, + &chain.spec, + ) { + // Proposer signature is valid, the invalid signature must be in the body + Err(BlockError::InvalidSignature( + InvalidSignature::BlockBodySignatures, + )) + } else { + Err(BlockError::InvalidSignature( + InvalidSignature::ProposerSignature, + )) } + } + } + + /// As for `new` above but producing `BlockSlashInfo`. + pub fn check_slashable( + block: G, + block_root: Hash256, + chain: &BeaconChain, + ) -> Result> { + let signed_beacon_block = block.block_cloned(); + let header = signed_beacon_block.signed_block_header(); + Self::new(block, block_root, chain) + .map_err(|e| BlockSlashInfo::from_early_error_block(header, e)) + } + + /// Finishes signature verification on the provided `GossipVerifedBlock`. Does not re-verify + /// the proposer signature. + #[instrument(skip_all, level = "debug")] + pub fn from_gossip_verified_block( + from: GossipVerifiedBlock, + chain: &BeaconChain, + ) -> Result { + let (mut parent, block) = if let Some(parent) = from.parent { + (parent, from.block) + } else { + load_parent(from.block, chain)? }; - metrics::stop_timer(core_timer); + let state = cheap_state_advance_to_obtain_committees::<_, BlockError>( + &mut parent.pre_state, + parent.beacon_state_root, + block.slot(), + &chain.spec, + )?; - /* - * Calculate the state root of the newly modified state - */ + let pubkey_cache = get_validator_pubkey_cache(chain)?; - let state_root_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_STATE_ROOT); + let mut signature_verifier = get_signature_verifier(&state, &pubkey_cache, &chain.spec); - let state_root = state.update_tree_hash_cache()?; + // Gossip verification has already checked the proposer index. Use it to check the RANDAO + // signature. + let mut consensus_context = from.consensus_context; + signature_verifier + .include_all_signatures_except_proposal(block.as_ref(), &mut consensus_context)?; - metrics::stop_timer(state_root_timer); + let result = info_span!("signature_verify").in_scope(|| signature_verifier.verify()); + match result { + Ok(_) => Ok(Self { + block: G::new(from.block_root, block), + block_root: from.block_root, + parent: Some(parent), + consensus_context, + }), + Err(_) => Err(BlockError::InvalidSignature( + InvalidSignature::BlockBodySignatures, + )), + } + } - write_state(&format!("state_post_block_{}", block_root), &state); + /// Same as `from_gossip_verified_block` but producing slashing-relevant data as well. + pub fn from_gossip_verified_block_check_slashable( + from: GossipVerifiedBlock, + chain: &BeaconChain, + ) -> Result> { + let header = from.block.signed_block_header(); + Self::from_gossip_verified_block(from, chain) + .map_err(|e| BlockSlashInfo::from_early_error_block(header, e)) + } - /* - * Check to ensure the state root on the block matches the one we have calculated. - */ + pub fn block_root(&self) -> Hash256 { + self.block_root + } - if block.state_root() != state_root { - return Err(BlockError::StateRootMismatch { - block: block.state_root(), - local: state_root, - }); - } + pub fn slot(&self) -> Slot { + self.block_cloned().slot() + } - /* - * Apply the block's attestations to fork choice. - * - * We're running in parallel with the payload verification at this point, so this is - * free real estate. - */ - let current_slot = chain.slot()?; - let mut fork_choice = chain.canonical_head.fork_choice_write_lock(); + pub fn block_cloned(&self) -> Arc> { + self.block.block_cloned() + } +} - // Register each attester slashing in the block with fork choice. - for attester_slashing in block.message().body().attester_slashings() { - fork_choice.on_attester_slashing(attester_slashing); - } +impl, B: GenericExecutionPendingBlock> + IntoExecutionPendingBlock for SignatureVerifiedBlock +{ + /// Completes verification of the wrapped `block`. + #[instrument( + name = "sig_verified_block_into_execution_pending_block_slashable", + level = "debug" + skip_all, + )] + fn into_execution_pending_block_slashable( + self, + block_root: Hash256, + chain: &Arc>, + notify_execution_layer: NotifyExecutionLayer, + ) -> Result> { + let signed_beacon_block = self.block.block_cloned(); + let header = signed_beacon_block.signed_block_header(); + let (parent, _) = if let Some(parent) = self.parent { + (parent, signed_beacon_block) + } else { + load_parent(signed_beacon_block, chain) + .map_err(|e| BlockSlashInfo::SignatureValid(header.clone(), e))? + }; - // Register each attestation in the block with fork choice. - for (i, attestation) in block.message().body().attestations().enumerate() { - let indexed_attestation = consensus_context - .get_indexed_attestation(&state, attestation) - .map_err(|e| BlockError::PerBlockProcessingError(e.into_with_index(i)))?; + B::from_signature_verified_components( + self.block, + block_root, + parent, + self.consensus_context, + chain, + notify_execution_layer, + ) + .map_err(|e| BlockSlashInfo::SignatureValid(header, e)) + } - match fork_choice.on_attestation( - current_slot, - indexed_attestation, - AttestationFromBlock::True, - ) { - Ok(()) => Ok(()), - // Ignore invalid attestations whilst importing attestations from a block. The - // block might be very old and therefore the attestations useless to fork choice. - Err(ForkChoiceError::InvalidAttestation(_)) => Ok(()), - Err(e) => Err(BlockError::BeaconChainError(Box::new(e.into()))), - }?; - } - drop(fork_choice); + fn block(&self) -> &SignedBeaconBlock { + todo!() + } - Ok(Self { - block, - import_data: BlockImportData { - block_root, - state, - parent_block: parent.beacon_block, - consensus_context, - }, - payload_verification_handle, - }) + fn block_cloned(&self) -> Arc> { + self.block.block_cloned() + } +} + +impl>> + IntoExecutionPendingBlock, B> for RpcBlock +{ + /// Verifies the `SignedBeaconBlock` by first transforming it into a `SignatureVerifiedBlock` + /// and then using that implementation of `IntoExecutionPendingBlock` to complete verification. + #[instrument( + name = "rpc_block_into_execution_pending_block_slashable", + level = "debug" + skip_all, + )] + fn into_execution_pending_block_slashable( + self, + block_root: Hash256, + chain: &Arc>, + notify_execution_layer: NotifyExecutionLayer, + ) -> Result> { + // Perform an early check to prevent wasting time on irrelevant blocks. + let block_root = check_block_relevancy(self.as_block(), block_root, chain) + .map_err(|e| BlockSlashInfo::SignatureNotChecked(self.signed_block_header(), e))?; + let maybe_available = chain + .data_availability_checker + .verify_kzg_for_rpc_block(self.clone()) + .map_err(|e| { + BlockSlashInfo::SignatureNotChecked( + self.signed_block_header(), + BlockError::AvailabilityCheck(e), + ) + })?; + SignatureVerifiedBlock::check_slashable(maybe_available, block_root, chain)? + .into_execution_pending_block_slashable(block_root, chain, notify_execution_layer) + } + + fn block(&self) -> &SignedBeaconBlock { + self.as_block() + } + + fn block_cloned(&self) -> Arc> { + self.block_cloned() } } diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 644c4716985..9b9ef260683 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -374,21 +374,21 @@ impl DataAvailabilityChecker { pub fn verify_kzg_for_rpc_block( &self, block: RpcBlock, - ) -> Result, AvailabilityCheckError> { + ) -> Result, AvailabilityCheckError> { let (block_root, block, blobs, data_columns) = block.deconstruct(); if self.blobs_required_for_block(&block) { return if let Some(blob_list) = blobs { verify_kzg_for_blob_list(blob_list.iter(), &self.kzg) .map_err(AvailabilityCheckError::InvalidBlobs)?; - Ok(MaybeAvailableBlock::Available(AvailableBlock { + Ok(AvailableBlock { block_root, block, blob_data: AvailableBlockData::Blobs(blob_list), blobs_available_timestamp: None, spec: self.spec.clone(), - })) + }) } else { - Ok(MaybeAvailableBlock::AvailabilityPending { block_root, block }) + todo!() }; } if self.data_columns_required_for_block(&block) { @@ -400,7 +400,7 @@ impl DataAvailabilityChecker { &self.kzg, ) .map_err(AvailabilityCheckError::InvalidColumn)?; - Ok(MaybeAvailableBlock::Available(AvailableBlock { + Ok(AvailableBlock { block_root, block, blob_data: AvailableBlockData::DataColumns( @@ -411,19 +411,19 @@ impl DataAvailabilityChecker { ), blobs_available_timestamp: None, spec: self.spec.clone(), - })) + }) } else { - Ok(MaybeAvailableBlock::AvailabilityPending { block_root, block }) + todo!() }; } - Ok(MaybeAvailableBlock::Available(AvailableBlock { + Ok(AvailableBlock { block_root, block, blob_data: AvailableBlockData::NoData, blobs_available_timestamp: None, spec: self.spec.clone(), - })) + }) } /// Checks if a vector of blocks are available. Returns a vector of `MaybeAvailableBlock` @@ -436,7 +436,7 @@ impl DataAvailabilityChecker { pub fn verify_kzg_for_rpc_blocks( &self, blocks: Vec>, - ) -> Result>, AvailabilityCheckError> { + ) -> Result>, AvailabilityCheckError> { let mut results = Vec::with_capacity(blocks.len()); let all_blobs = blocks .iter() @@ -473,19 +473,19 @@ impl DataAvailabilityChecker { let maybe_available_block = if self.blobs_required_for_block(&block) { if let Some(blobs) = blobs { - MaybeAvailableBlock::Available(AvailableBlock { + AvailableBlock { block_root, block, blob_data: AvailableBlockData::Blobs(blobs), blobs_available_timestamp: None, spec: self.spec.clone(), - }) + } } else { - MaybeAvailableBlock::AvailabilityPending { block_root, block } + todo!() } } else if self.data_columns_required_for_block(&block) { if let Some(data_columns) = data_columns { - MaybeAvailableBlock::Available(AvailableBlock { + AvailableBlock { block_root, block, blob_data: AvailableBlockData::DataColumns( @@ -493,18 +493,18 @@ impl DataAvailabilityChecker { ), blobs_available_timestamp: None, spec: self.spec.clone(), - }) + } } else { - MaybeAvailableBlock::AvailabilityPending { block_root, block } + todo!() } } else { - MaybeAvailableBlock::Available(AvailableBlock { + AvailableBlock { block_root, block, blob_data: AvailableBlockData::NoData, blobs_available_timestamp: None, spec: self.spec.clone(), - }) + } }; results.push(maybe_available_block);