diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index adc400b1c17..8a1fa3d1606 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3976,22 +3976,24 @@ impl BeaconChain { // See https://github.com/sigp/lighthouse/issues/2028 let (_, signed_block, block_data) = signed_block.deconstruct(); - match self.get_blobs_or_columns_store_op(block_root, signed_block.slot(), block_data) { - Ok(Some(blobs_or_columns_store_op)) => { - ops.push(blobs_or_columns_store_op); - } - Ok(None) => {} - Err(e) => { - error!( - msg = "Restoring fork choice from disk", - error = &e, - ?block_root, - "Failed to store data columns into the database" - ); - return Err(self - .handle_import_block_db_write_error(fork_choice) - .err() - .unwrap_or(BlockError::InternalError(e))); + if let Some(block_data) = block_data { + match self.get_blobs_or_columns_store_op(block_root, signed_block.slot(), block_data) { + Ok(Some(blobs_or_columns_store_op)) => { + ops.push(blobs_or_columns_store_op); + } + Ok(None) => {} + Err(e) => { + error!( + msg = "Restoring fork choice from disk", + error = &e, + ?block_root, + "Failed to store data columns into the database" + ); + return Err(self + .handle_import_block_db_write_error(fork_choice) + .err() + .unwrap_or(BlockError::InternalError(e))); + } } } diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 374f1e2b360..816ab862cbe 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -646,19 +646,26 @@ pub fn signature_verify_chain_segment( )?; // unzip chain segment and verify kzg in bulk - let (roots, blocks): (Vec<_>, Vec<_>) = chain_segment.into_iter().unzip(); - let maybe_available_blocks = chain + let (roots, blocks): (Vec<_>, Vec<_>) = chain_segment + .into_iter() + .filter_map(|(block_root, block)| match block { + RpcBlock::FullyAvailable(available_block) => Some((block_root, available_block)), + RpcBlock::BlockOnly { .. } => None, + }) + .unzip(); + + let available_blocks = chain .data_availability_checker .verify_kzg_for_rpc_blocks(blocks)?; // zip it back up let mut signature_verified_blocks = roots .into_iter() - .zip(maybe_available_blocks) - .map(|(block_root, maybe_available_block)| { - let consensus_context = ConsensusContext::new(maybe_available_block.slot()) - .set_current_block_root(block_root); + .zip(available_blocks) + .map(|(block_root, available_block)| { + let consensus_context = + ConsensusContext::new(available_block.slot()).set_current_block_root(block_root); SignatureVerifiedBlock { - block: maybe_available_block, + block: MaybeAvailableBlock::Available(available_block), block_root, parent: None, consensus_context, diff --git a/beacon_node/beacon_chain/src/block_verification_types.rs b/beacon_node/beacon_chain/src/block_verification_types.rs index 5978e97c4d9..6108d43f1f7 100644 --- a/beacon_node/beacon_chain/src/block_verification_types.rs +++ b/beacon_node/beacon_chain/src/block_verification_types.rs @@ -1,15 +1,15 @@ +use crate::PayloadVerificationOutcome; use crate::data_availability_checker::AvailabilityCheckError; -pub use crate::data_availability_checker::{AvailableBlock, MaybeAvailableBlock}; -use crate::data_column_verification::{CustodyDataColumn, CustodyDataColumnList}; -use crate::{PayloadVerificationOutcome, get_block_root}; +pub use crate::data_availability_checker::{ + AvailableBlock, AvailableBlockData, MaybeAvailableBlock, +}; use educe::Educe; -use ssz_types::VariableList; use state_processing::ConsensusContext; use std::fmt::{Debug, Formatter}; use std::sync::Arc; use types::blob_sidecar::BlobIdentifier; use types::{ - BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, Epoch, EthSpec, Hash256, + BeaconBlockRef, BeaconState, BlindedPayload, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock, SignedBeaconBlockHeader, Slot, }; @@ -28,147 +28,65 @@ use types::{ /// over rpc do not contain the proposer signature for dos resistance. #[derive(Clone, Educe)] #[educe(Hash(bound(E: EthSpec)))] -pub struct RpcBlock { - block_root: Hash256, - block: RpcBlockInner, +pub enum RpcBlock { + FullyAvailable(AvailableBlock), + BlockOnly { + block: Arc>, + block_root: Hash256, + }, } impl Debug for RpcBlock { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "RpcBlock({:?})", self.block_root) + write!(f, "RpcBlock({:?})", self.block_root()) } } impl RpcBlock { pub fn block_root(&self) -> Hash256 { - self.block_root + match self { + RpcBlock::FullyAvailable(available_block) => available_block.block_root(), + RpcBlock::BlockOnly { block_root, .. } => *block_root, + } } pub fn as_block(&self) -> &SignedBeaconBlock { - match &self.block { - RpcBlockInner::Block(block) => block, - RpcBlockInner::BlockAndBlobs(block, _) => block, - RpcBlockInner::BlockAndCustodyColumns(block, _) => block, + match self { + RpcBlock::FullyAvailable(available_block) => available_block.block(), + RpcBlock::BlockOnly { block, .. } => block, } } pub fn block_cloned(&self) -> Arc> { - match &self.block { - RpcBlockInner::Block(block) => block.clone(), - RpcBlockInner::BlockAndBlobs(block, _) => block.clone(), - RpcBlockInner::BlockAndCustodyColumns(block, _) => block.clone(), - } - } - - pub fn blobs(&self) -> Option<&BlobSidecarList> { - match &self.block { - RpcBlockInner::Block(_) => None, - RpcBlockInner::BlockAndBlobs(_, blobs) => Some(blobs), - RpcBlockInner::BlockAndCustodyColumns(_, _) => None, + match self { + RpcBlock::FullyAvailable(available_block) => available_block.block_cloned(), + RpcBlock::BlockOnly { block, .. } => block.clone(), } } - pub fn custody_columns(&self) -> Option<&CustodyDataColumnList> { - match &self.block { - RpcBlockInner::Block(_) => None, - RpcBlockInner::BlockAndBlobs(_, _) => None, - RpcBlockInner::BlockAndCustodyColumns(_, data_columns) => Some(data_columns), + pub fn block_data(&self) -> Option<&AvailableBlockData> { + match self { + RpcBlock::FullyAvailable(available_block) => Some(available_block.data()), + RpcBlock::BlockOnly { .. } => None, } } } -/// Note: This variant is intentionally private because we want to safely construct the -/// internal variants after applying consistency checks to ensure that the block and blobs -/// are consistent with respect to each other. -#[derive(Debug, Clone, Educe)] -#[educe(Hash(bound(E: EthSpec)))] -enum RpcBlockInner { - /// Single block lookup response. This should potentially hit the data availability cache. - Block(Arc>), - /// This variant is used with parent lookups and by-range responses. It should have all blobs - /// ordered, all block roots matching, and the correct number of blobs for this block. - BlockAndBlobs(Arc>, BlobSidecarList), - /// This variant is used with parent lookups and by-range responses. It should have all - /// requested data columns, all block roots matching for this block. - BlockAndCustodyColumns(Arc>, CustodyDataColumnList), -} - impl RpcBlock { - /// Constructs a `Block` variant. - pub fn new_without_blobs( - block_root: Option, - block: Arc>, - ) -> Self { - let block_root = block_root.unwrap_or_else(|| get_block_root(&block)); - - Self { - block_root, - block: RpcBlockInner::Block(block), - } - } - - /// Constructs a new `BlockAndBlobs` variant after making consistency - /// checks between the provided blocks and blobs. This struct makes no - /// guarantees about whether blobs should be present, only that they are - /// consistent with the block. An empty list passed in for `blobs` is - /// viewed the same as `None` passed in. pub fn new( - block_root: Option, block: Arc>, - blobs: Option>, + block_data: Option>, + spec: Arc, ) -> Result { - let block_root = block_root.unwrap_or_else(|| get_block_root(&block)); - // Treat empty blob lists as if they are missing. - let blobs = blobs.filter(|b| !b.is_empty()); - - if let (Some(blobs), Ok(block_commitments)) = ( - blobs.as_ref(), - block.message().body().blob_kzg_commitments(), - ) { - if blobs.len() != block_commitments.len() { - return Err(AvailabilityCheckError::MissingBlobs); - } - for (blob, &block_commitment) in blobs.iter().zip(block_commitments.iter()) { - let blob_commitment = blob.kzg_commitment; - if blob_commitment != block_commitment { - return Err(AvailabilityCheckError::KzgCommitmentMismatch { - block_commitment, - blob_commitment, - }); - } - } + match block_data { + Some(block_data) => Ok(RpcBlock::FullyAvailable(AvailableBlock::new( + block, block_data, spec, + )?)), + None => Ok(RpcBlock::BlockOnly { + block_root: block.canonical_root(), + block, + }), } - let inner = match blobs { - Some(blobs) => RpcBlockInner::BlockAndBlobs(block, blobs), - None => RpcBlockInner::Block(block), - }; - Ok(Self { - block_root, - block: inner, - }) - } - - pub fn new_with_custody_columns( - block_root: Option, - block: Arc>, - custody_columns: Vec>, - ) -> Result { - let block_root = block_root.unwrap_or_else(|| get_block_root(&block)); - - if block.num_expected_blobs() > 0 && custody_columns.is_empty() { - // The number of required custody columns is out of scope here. - return Err(AvailabilityCheckError::MissingCustodyColumns); - } - // Treat empty data column lists as if they are missing. - let inner = if !custody_columns.is_empty() { - RpcBlockInner::BlockAndCustodyColumns(block, VariableList::new(custody_columns)?) - } else { - RpcBlockInner::Block(block) - }; - Ok(Self { - block_root, - block: inner, - }) } #[allow(clippy::type_complexity)] @@ -177,28 +95,33 @@ impl RpcBlock { ) -> ( Hash256, Arc>, - Option>, - Option>, + Option>, ) { - let block_root = self.block_root(); - match self.block { - RpcBlockInner::Block(block) => (block_root, block, None, None), - RpcBlockInner::BlockAndBlobs(block, blobs) => (block_root, block, Some(blobs), None), - RpcBlockInner::BlockAndCustodyColumns(block, data_columns) => { - (block_root, block, None, Some(data_columns)) - } + match self { + RpcBlock::FullyAvailable(available_block) => available_block.deconstruct(), + RpcBlock::BlockOnly { block, block_root } => (block_root, block, None), } } + pub fn n_blobs(&self) -> usize { - match &self.block { - RpcBlockInner::Block(_) | RpcBlockInner::BlockAndCustodyColumns(_, _) => 0, - RpcBlockInner::BlockAndBlobs(_, blobs) => blobs.len(), + if let Some(block_data) = self.block_data() { + match block_data { + AvailableBlockData::NoData | AvailableBlockData::DataColumns(_) => 0, + AvailableBlockData::Blobs(blobs) => blobs.len(), + } + } else { + 0 } } + pub fn n_data_columns(&self) -> usize { - match &self.block { - RpcBlockInner::Block(_) | RpcBlockInner::BlockAndBlobs(_, _) => 0, - RpcBlockInner::BlockAndCustodyColumns(_, data_columns) => data_columns.len(), + if let Some(block_data) = self.block_data() { + match block_data { + AvailableBlockData::NoData | AvailableBlockData::Blobs(_) => 0, + AvailableBlockData::DataColumns(columns) => columns.len(), + } + } else { + 0 } } } @@ -500,17 +423,21 @@ impl AsBlock for RpcBlock { self.as_block().message() } fn as_block(&self) -> &SignedBeaconBlock { - match &self.block { - RpcBlockInner::Block(block) => block, - RpcBlockInner::BlockAndBlobs(block, _) => block, - RpcBlockInner::BlockAndCustodyColumns(block, _) => block, + match self { + Self::BlockOnly { + block, + block_root: _, + } => block, + Self::FullyAvailable(available_block) => available_block.block(), } } fn block_cloned(&self) -> Arc> { - match &self.block { - RpcBlockInner::Block(block) => block.clone(), - RpcBlockInner::BlockAndBlobs(block, _) => block.clone(), - RpcBlockInner::BlockAndCustodyColumns(block, _) => block.clone(), + match self { + RpcBlock::FullyAvailable(available_block) => available_block.block_cloned(), + RpcBlock::BlockOnly { + block, + block_root: _, + } => block.clone(), } } fn canonical_root(&self) -> Hash256 { diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 3e859456b18..7c6d7ecdba9 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -10,6 +10,7 @@ use crate::data_availability_checker::overflow_lru_cache::{ use crate::{ BeaconChain, BeaconChainTypes, BeaconStore, BlockProcessStatus, CustodyContext, metrics, }; +use educe::Educe; use kzg::Kzg; use slot_clock::SlotClock; use std::fmt; @@ -31,8 +32,8 @@ mod state_lru_cache; use crate::data_availability_checker::error::Error; use crate::data_column_verification::{ - CustodyDataColumn, GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn, - KzgVerifiedDataColumn, verify_kzg_for_data_column_list, + GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn, + verify_kzg_for_data_column_list, }; use crate::metrics::{ KZG_DATA_COLUMN_RECONSTRUCTION_ATTEMPTS, KZG_DATA_COLUMN_RECONSTRUCTION_FAILURES, @@ -366,6 +367,43 @@ impl DataAvailabilityChecker { .remove_pre_execution_block(block_root); } + pub fn verify_kzg_for_fully_available_rpc_block( + &self, + available_block: AvailableBlock, + ) -> Result, AvailabilityCheckError> { + match &available_block.blob_data { + AvailableBlockData::NoData => { + if self.blobs_required_for_block(&available_block.block) + || self.data_columns_required_for_block(&available_block.block) + { + if available_block.block.fork_name_unchecked().fulu_enabled() { + return Err(AvailabilityCheckError::MissingCustodyColumns); + } else { + return Err(AvailabilityCheckError::MissingBlobs); + } + } + } + AvailableBlockData::Blobs(blobs) => { + // TODO(rpc-block) should we return an error if blobs not required? + // the blobs variant should only exist for a block that requires blobs + if self.blobs_required_for_block(&available_block.block) { + verify_kzg_for_blob_list(blobs.iter(), &self.kzg) + .map_err(AvailabilityCheckError::InvalidBlobs)?; + } + } + AvailableBlockData::DataColumns(data_columns) => { + // TODO(rpc-block) should we return an error if columns not required? + // the columns variant should only exist for a block that requires columns + if self.data_columns_required_for_block(&available_block.block) { + verify_kzg_for_data_column_list(data_columns.iter(), &self.kzg) + .map_err(AvailabilityCheckError::InvalidColumn)?; + } + } + } + + Ok(MaybeAvailableBlock::Available(available_block)) + } + /// Verifies kzg commitments for an RpcBlock, returns a `MaybeAvailableBlock` that may /// include the fully available block. /// @@ -375,55 +413,14 @@ impl DataAvailabilityChecker { &self, block: RpcBlock, ) -> Result, AvailabilityCheckError> { - let (block_root, block, blobs, data_columns) = block.deconstruct(); - if self.blobs_required_for_block(&block) { - return if let Some(blob_list) = blobs { - verify_kzg_for_blob_list(blob_list.iter(), &self.kzg) - .map_err(AvailabilityCheckError::InvalidBlobs)?; - Ok(MaybeAvailableBlock::Available(AvailableBlock { - block_root, - block, - blob_data: AvailableBlockData::Blobs(blob_list), - blobs_available_timestamp: None, - spec: self.spec.clone(), - })) - } else { - Ok(MaybeAvailableBlock::AvailabilityPending { block_root, block }) - }; - } - if self.data_columns_required_for_block(&block) { - return if let Some(data_column_list) = data_columns.as_ref() { - verify_kzg_for_data_column_list( - data_column_list - .iter() - .map(|custody_column| custody_column.as_data_column()), - &self.kzg, - ) - .map_err(AvailabilityCheckError::InvalidColumn)?; - Ok(MaybeAvailableBlock::Available(AvailableBlock { - block_root, - block, - blob_data: AvailableBlockData::DataColumns( - data_column_list - .into_iter() - .map(|d| d.clone_arc()) - .collect(), - ), - blobs_available_timestamp: None, - spec: self.spec.clone(), - })) - } else { + match block { + RpcBlock::FullyAvailable(available_rpc_block) => { + self.verify_kzg_for_fully_available_rpc_block(available_rpc_block) + } + RpcBlock::BlockOnly { block_root, block } => { Ok(MaybeAvailableBlock::AvailabilityPending { block_root, block }) - }; + } } - - Ok(MaybeAvailableBlock::Available(AvailableBlock { - block_root, - block, - blob_data: AvailableBlockData::NoData, - blobs_available_timestamp: None, - spec: self.spec.clone(), - })) } /// Checks if a vector of blocks are available. Returns a vector of `MaybeAvailableBlock` @@ -435,14 +432,14 @@ impl DataAvailabilityChecker { #[instrument(skip_all)] pub fn verify_kzg_for_rpc_blocks( &self, - blocks: Vec>, - ) -> Result>, AvailabilityCheckError> { - let mut results = Vec::with_capacity(blocks.len()); - let all_blobs = blocks + available_blocks: Vec>, + ) -> Result>, AvailabilityCheckError> { + let mut results = Vec::with_capacity(available_blocks.len()); + let all_blobs = available_blocks .iter() - .filter(|block| self.blobs_required_for_block(block.as_block())) + .filter(|available_block| self.blobs_required_for_block(&available_block.block)) // this clone is cheap as it's cloning an Arc - .filter_map(|block| block.blobs().cloned()) + .filter_map(|available_block| available_block.blob_data.blobs()) .flatten() .collect::>(); @@ -452,13 +449,12 @@ impl DataAvailabilityChecker { .map_err(AvailabilityCheckError::InvalidBlobs)?; } - let all_data_columns = blocks + let all_data_columns = available_blocks .iter() - .filter(|block| self.data_columns_required_for_block(block.as_block())) + .filter(|available_block| self.data_columns_required_for_block(&available_block.block)) // this clone is cheap as it's cloning an Arc - .filter_map(|block| block.custody_columns().cloned()) + .filter_map(|available_block| available_block.blob_data.data_columns()) .flatten() - .map(CustodyDataColumn::into_inner) .collect::>(); // verify kzg for all data columns at once @@ -468,46 +464,14 @@ impl DataAvailabilityChecker { .map_err(AvailabilityCheckError::InvalidColumn)?; } - for block in blocks { - let (block_root, block, blobs, data_columns) = block.deconstruct(); - - let maybe_available_block = if self.blobs_required_for_block(&block) { - if let Some(blobs) = blobs { - MaybeAvailableBlock::Available(AvailableBlock { - block_root, - block, - blob_data: AvailableBlockData::Blobs(blobs), - blobs_available_timestamp: None, - spec: self.spec.clone(), - }) - } else { - MaybeAvailableBlock::AvailabilityPending { block_root, block } - } - } else if self.data_columns_required_for_block(&block) { - if let Some(data_columns) = data_columns { - MaybeAvailableBlock::Available(AvailableBlock { - block_root, - block, - blob_data: AvailableBlockData::DataColumns( - data_columns.into_iter().map(|d| d.into_inner()).collect(), - ), - blobs_available_timestamp: None, - spec: self.spec.clone(), - }) - } else { - MaybeAvailableBlock::AvailabilityPending { block_root, block } - } - } else { - MaybeAvailableBlock::Available(AvailableBlock { - block_root, - block, - blob_data: AvailableBlockData::NoData, - blobs_available_timestamp: None, - spec: self.spec.clone(), - }) - }; + for available_block in available_blocks { + if self.blobs_required_for_block(&available_block.block) { + return Err(AvailabilityCheckError::MissingBlobs); + } else if self.data_columns_required_for_block(&available_block.block) { + return Err(AvailabilityCheckError::MissingCustodyColumns); + } - results.push(maybe_available_block); + results.push(available_block); } Ok(results) @@ -749,7 +713,7 @@ async fn availability_cache_maintenance_service( } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum AvailableBlockData { /// Block is pre-Deneb or has zero blobs NoData, @@ -759,14 +723,49 @@ pub enum AvailableBlockData { DataColumns(DataColumnSidecarList), } +impl AvailableBlockData { + pub fn new( + blobs: Option>, + columns: Option>, + ) -> Self { + if let Some(blobs) = blobs { + Self::Blobs(blobs) + } else if let Some(columns) = columns { + Self::DataColumns(columns) + } else { + Self::NoData + } + } + + pub fn blobs(&self) -> Option> { + match self { + AvailableBlockData::NoData => None, + AvailableBlockData::Blobs(blobs) => Some(blobs.clone()), + AvailableBlockData::DataColumns(_) => None, + } + } + + pub fn data_columns(&self) -> Option> { + match self { + AvailableBlockData::NoData => None, + AvailableBlockData::Blobs(_) => None, + AvailableBlockData::DataColumns(data_columns) => Some(data_columns.clone()), + } + } +} + /// A fully available block that is ready to be imported into fork choice. -#[derive(Debug)] +#[derive(Debug, Clone, Educe)] +#[educe(Hash(bound(E: EthSpec)))] pub struct AvailableBlock { block_root: Hash256, block: Arc>, + #[educe(Hash(ignore))] blob_data: AvailableBlockData, + #[educe(Hash(ignore))] /// Timestamp at which this block first became available (UNIX timestamp, time since 1970). blobs_available_timestamp: Option, + #[educe(Hash(ignore))] pub spec: Arc, } @@ -786,6 +785,23 @@ impl AvailableBlock { } } + pub fn new( + block: Arc>, + block_data: AvailableBlockData, + spec: Arc, + ) -> Result { + // check blob lengths match + // POSSIBLY - verify kzg - but probably not + // but this variant *should* ensure the data IS available + Ok(Self { + block_root: block.canonical_root(), + block, + blob_data: block_data, + blobs_available_timestamp: None, + spec, + }) + } + pub fn block(&self) -> &SignedBeaconBlock { &self.block } @@ -801,6 +817,10 @@ impl AvailableBlock { &self.blob_data } + pub fn block_root(&self) -> Hash256 { + self.block_root + } + pub fn has_blobs(&self) -> bool { match self.blob_data { AvailableBlockData::NoData => false, @@ -810,14 +830,20 @@ impl AvailableBlock { } #[allow(clippy::type_complexity)] - pub fn deconstruct(self) -> (Hash256, Arc>, AvailableBlockData) { + pub fn deconstruct( + self, + ) -> ( + Hash256, + Arc>, + Option>, + ) { let AvailableBlock { block_root, block, blob_data, .. } = self; - (block_root, block, blob_data) + (block_root, block, Some(blob_data)) } /// Only used for testing @@ -865,6 +891,7 @@ mod test { use super::*; use crate::CustodyContext; use crate::custody_context::NodeCustodyType; + use crate::data_column_verification::CustodyDataColumn; use crate::test_utils::{ EphemeralHarnessType, NumBlobs, generate_data_column_indices_rand_order, generate_rand_block_and_data_columns, get_kzg, @@ -1057,9 +1084,6 @@ mod test { let custody_columns = if index == 0 { // 128 valid data columns in the first block data_columns - .into_iter() - .map(CustodyDataColumn::from_asserted_custody) - .collect::>() } else { // invalid data columns in the second block data_columns @@ -1070,17 +1094,29 @@ mod test { ..d.as_ref().clone() }; CustodyDataColumn::from_asserted_custody(Arc::new(invalid_sidecar)) + .as_data_column() + .clone() }) .collect::>() }; - RpcBlock::new_with_custody_columns(None, Arc::new(block), custody_columns) + let block_data = AvailableBlockData::new(None, Some(custody_columns)); + + RpcBlock::new(Arc::new(block), Some(block_data), spec.clone()) .expect("should create RPC block with custody columns") }) .collect::>(); + let available_blocks = blocks_with_columns + .iter() + .filter_map(|block| match block { + RpcBlock::FullyAvailable(available_block) => Some(available_block.clone()), + RpcBlock::BlockOnly { .. } => None, + }) + .collect::>(); + // WHEN verifying all blocks together (totalling 256 data columns) - let verification_result = da_checker.verify_kzg_for_rpc_blocks(blocks_with_columns); + let verification_result = da_checker.verify_kzg_for_rpc_blocks(available_blocks); // THEN batch block verification should fail due to 128 invalid columns in the second block verification_result.expect_err("should have failed to verify blocks"); diff --git a/beacon_node/beacon_chain/src/data_availability_checker/error.rs b/beacon_node/beacon_chain/src/data_availability_checker/error.rs index c9efb7a4149..1d9b8820d36 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/error.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/error.rs @@ -22,6 +22,7 @@ pub enum Error { BlockReplayError(state_processing::BlockReplayError), RebuildingStateCaches(BeaconStateError), SlotClockError, + InvalidFork, } #[derive(PartialEq, Eq)] @@ -44,7 +45,8 @@ impl Error { | Error::ParentStateMissing(_) | Error::BlockReplayError(_) | Error::RebuildingStateCaches(_) - | Error::SlotClockError => ErrorCategory::Internal, + | Error::SlotClockError + | Error::InvalidFork => ErrorCategory::Internal, Error::InvalidBlobs { .. } | Error::InvalidColumn { .. } | Error::ReconstructColumnsError { .. } diff --git a/beacon_node/beacon_chain/src/historical_blocks.rs b/beacon_node/beacon_chain/src/historical_blocks.rs index e4040eea6b0..d183b924a58 100644 --- a/beacon_node/beacon_chain/src/historical_blocks.rs +++ b/beacon_node/beacon_chain/src/historical_blocks.rs @@ -155,26 +155,26 @@ impl BeaconChain { ); } - match &block_data { - AvailableBlockData::NoData => {} - AvailableBlockData::Blobs(..) => { - new_oldest_blob_slot = Some(block.slot()); + if let Some(block_data) = block_data { + match &block_data { + AvailableBlockData::NoData => todo!(), + AvailableBlockData::Blobs(_) => new_oldest_blob_slot = Some(block.slot()), + AvailableBlockData::DataColumns(_) => { + new_oldest_data_column_slot = Some(block.slot()) + } } - AvailableBlockData::DataColumns(_) => { - new_oldest_data_column_slot = Some(block.slot()); - } - } - // Store the blobs or data columns too - if let Some(op) = self - .get_blobs_or_columns_store_op(block_root, block.slot(), block_data) - .map_err(|e| { - HistoricalBlockError::StoreError(StoreError::DBError { - message: format!("get_blobs_or_columns_store_op error {e:?}"), - }) - })? - { - blob_batch.extend(self.store.convert_to_kv_batch(vec![op])?); + // Store the blobs or data columns too + if let Some(op) = self + .get_blobs_or_columns_store_op(block_root, block.slot(), block_data) + .map_err(|e| { + HistoricalBlockError::StoreError(StoreError::DBError { + message: format!("get_blobs_or_columns_store_op error {e:?}"), + }) + })? + { + blob_batch.extend(self.store.convert_to_kv_batch(vec![op])?); + } } // Store block roots, including at all skip slots in the freezer DB. diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 759b7e9bd77..bd891d7fb79 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -1,7 +1,6 @@ use crate::blob_verification::GossipVerifiedBlob; -use crate::block_verification_types::{AsBlock, RpcBlock}; +use crate::block_verification_types::{AsBlock, AvailableBlockData, RpcBlock}; use crate::custody_context::NodeCustodyType; -use crate::data_column_verification::CustodyDataColumn; use crate::kzg_utils::build_data_column_sidecars; use crate::observed_operations::ObservationOutcome; pub use crate::persisted_beacon_chain::PersistedBeaconChain; @@ -2369,7 +2368,7 @@ where self.set_current_slot(slot); let (block, blob_items) = block_contents; - let rpc_block = self.build_rpc_block_from_blobs(block_root, block, blob_items)?; + let rpc_block = self.build_rpc_block_from_blobs(block, blob_items, true)?; let block_hash: SignedBeaconBlockHash = self .chain .process_block( @@ -2393,7 +2392,7 @@ where let (block, blob_items) = block_contents; let block_root = block.canonical_root(); - let rpc_block = self.build_rpc_block_from_blobs(block_root, block, blob_items)?; + let rpc_block = self.build_rpc_block_from_blobs(block, blob_items, true)?; let block_hash: SignedBeaconBlockHash = self .chain .process_block( @@ -2424,7 +2423,8 @@ where .blob_kzg_commitments() .is_ok_and(|c| !c.is_empty()); if !has_blobs { - return RpcBlock::new_without_blobs(Some(block_root), block); + return RpcBlock::new(block, Some(AvailableBlockData::NoData), self.spec.clone()) + .unwrap(); } // Blobs are stored as data columns from Fulu (PeerDAS) @@ -2432,21 +2432,24 @@ where let columns = self.chain.get_data_columns(&block_root).unwrap().unwrap(); let custody_columns = columns .into_iter() - .map(CustodyDataColumn::from_asserted_custody) + // TODO(investigate the custody data column conversion) + // .map(CustodyDataColumn::from_asserted_custody) .collect::>(); - RpcBlock::new_with_custody_columns(Some(block_root), block, custody_columns).unwrap() + let block_data = AvailableBlockData::new(None, Some(custody_columns)); + RpcBlock::new(block, Some(block_data), self.spec.clone()).unwrap() } else { let blobs = self.chain.get_blobs(&block_root).unwrap().blobs(); - RpcBlock::new(Some(block_root), block, blobs).unwrap() + let block_data = AvailableBlockData::new(blobs, None); + RpcBlock::new(block, Some(block_data), self.spec.clone()).unwrap() } } /// Builds an `RpcBlock` from a `SignedBeaconBlock` and `BlobsList`. pub fn build_rpc_block_from_blobs( &self, - block_root: Hash256, block: Arc>>, blob_items: Option<(KzgProofs, BlobsList)>, + is_available: bool, ) -> Result, BlockError> { Ok(if self.spec.is_peer_das_enabled_for_epoch(block.epoch()) { let epoch = block.slot().epoch(E::slots_per_epoch()); @@ -2459,11 +2462,20 @@ where let columns = generate_data_column_sidecars_from_block(&block, &self.spec) .into_iter() .filter(|d| sampling_columns.contains(&d.index)) - .map(CustodyDataColumn::from_asserted_custody) + // TODO(investigate the custody data column conversion) + // .map(CustodyDataColumn::from_asserted_custody) .collect::>(); - RpcBlock::new_with_custody_columns(Some(block_root), block, columns)? + if is_available { + let block_data: AvailableBlockData = + AvailableBlockData::new(None, Some(columns)); + RpcBlock::new(block, Some(block_data), self.spec.clone())? + } else { + RpcBlock::new(block, None, self.spec.clone())? + } + } else if is_available { + RpcBlock::new(block, Some(AvailableBlockData::NoData), self.spec.clone())? } else { - RpcBlock::new_without_blobs(Some(block_root), block) + RpcBlock::new(block, None, self.spec.clone())? } } else { let blobs = blob_items @@ -2472,7 +2484,12 @@ where }) .transpose() .unwrap(); - RpcBlock::new(Some(block_root), block, blobs)? + if is_available { + let block_data: AvailableBlockData = AvailableBlockData::new(blobs, None); + RpcBlock::new(block, Some(block_data), self.spec.clone())? + } else { + RpcBlock::new(block, None, self.spec.clone())? + } }) } diff --git a/beacon_node/beacon_chain/tests/blob_verification.rs b/beacon_node/beacon_chain/tests/blob_verification.rs index c42a2828c01..7ad60572c60 100644 --- a/beacon_node/beacon_chain/tests/blob_verification.rs +++ b/beacon_node/beacon_chain/tests/blob_verification.rs @@ -76,7 +76,7 @@ async fn rpc_blobs_with_invalid_header_signature() { // Process the block without blobs so that it doesn't become available. harness.advance_slot(); let rpc_block = harness - .build_rpc_block_from_blobs(block_root, signed_block.clone(), None) + .build_rpc_block_from_blobs(signed_block.clone(), None, false) .unwrap(); let availability = harness .chain @@ -84,11 +84,12 @@ async fn rpc_blobs_with_invalid_header_signature() { block_root, rpc_block, NotifyExecutionLayer::Yes, - BlockImportSource::RangeSync, + BlockImportSource::Lookup, || Ok(()), ) .await .unwrap(); + assert_eq!( availability, AvailabilityProcessingStatus::MissingComponents(slot, block_root) @@ -113,6 +114,8 @@ async fn rpc_blobs_with_invalid_header_signature() { .process_rpc_blobs(slot, block_root, blob_sidecars) .await .unwrap_err(); + + println!("{:?}", err); assert!(matches!( err, BlockError::InvalidSignature(InvalidSignature::ProposerSignature) diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index 881885cef23..231a154cb09 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -1,6 +1,7 @@ #![cfg(not(debug_assertions))] use beacon_chain::block_verification_types::{AsBlock, ExecutedBlock, RpcBlock}; +use beacon_chain::data_availability_checker::AvailableBlockData; use beacon_chain::data_column_verification::CustodyDataColumn; use beacon_chain::{ AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, ExecutionPendingBlock, @@ -129,13 +130,14 @@ fn get_harness( fn chain_segment_blocks( chain_segment: &[BeaconSnapshot], chain_segment_sidecars: &[Option>], + spec: Arc, ) -> Vec> { chain_segment .iter() .zip(chain_segment_sidecars.iter()) .map(|(snapshot, data_sidecars)| { let block = snapshot.beacon_block.clone(); - build_rpc_block(block, data_sidecars) + build_rpc_block(block, data_sidecars, spec.clone()) }) .collect() } @@ -143,15 +145,26 @@ fn chain_segment_blocks( fn build_rpc_block( block: Arc>, data_sidecars: &Option>, + spec: Arc, ) -> RpcBlock { match data_sidecars { Some(DataSidecars::Blobs(blobs)) => { - RpcBlock::new(None, block, Some(blobs.clone())).unwrap() + let block_data = AvailableBlockData::new(Some(blobs.clone()), None); + RpcBlock::new(block, Some(block_data), spec).unwrap() } Some(DataSidecars::DataColumns(columns)) => { - RpcBlock::new_with_custody_columns(None, block, columns.clone()).unwrap() + let block_data = AvailableBlockData::new( + None, + Some( + columns + .iter() + .map(|c| c.as_data_column().clone()) + .collect::>(), + ), + ); + RpcBlock::new(block, Some(block_data), spec).unwrap() } - None => RpcBlock::new_without_blobs(None, block), + None => RpcBlock::new(block, Some(AvailableBlockData::NoData), spec).unwrap(), } } @@ -262,9 +275,10 @@ fn update_data_column_signed_header( async fn chain_segment_full_segment() { let harness = get_harness(VALIDATOR_COUNT, NodeCustodyType::Fullnode); let (chain_segment, chain_segment_blobs) = get_chain_segment().await; - let blocks: Vec> = chain_segment_blocks(&chain_segment, &chain_segment_blobs) - .into_iter() - .collect(); + let blocks: Vec> = + chain_segment_blocks(&chain_segment, &chain_segment_blobs, harness.spec.clone()) + .into_iter() + .collect(); harness .chain @@ -298,9 +312,11 @@ async fn chain_segment_full_segment() { #[tokio::test] async fn chain_segment_varying_chunk_size() { let (chain_segment, chain_segment_blobs) = get_chain_segment().await; - let blocks: Vec> = chain_segment_blocks(&chain_segment, &chain_segment_blobs) - .into_iter() - .collect(); + let harness = get_harness(VALIDATOR_COUNT, NodeCustodyType::Fullnode); + let blocks: Vec> = + chain_segment_blocks(&chain_segment, &chain_segment_blobs, harness.spec.clone()) + .into_iter() + .collect(); for chunk_size in &[1, 2, 31, 32, 33] { let harness = get_harness(VALIDATOR_COUNT, NodeCustodyType::Fullnode); @@ -342,9 +358,10 @@ async fn chain_segment_non_linear_parent_roots() { /* * Test with a block removed. */ - let mut blocks: Vec> = chain_segment_blocks(&chain_segment, &chain_segment_blobs) - .into_iter() - .collect(); + let mut blocks: Vec> = + chain_segment_blocks(&chain_segment, &chain_segment_blobs, harness.spec.clone()) + .into_iter() + .collect(); blocks.remove(2); assert!( @@ -362,16 +379,19 @@ async fn chain_segment_non_linear_parent_roots() { /* * Test with a modified parent root. */ - let mut blocks: Vec> = chain_segment_blocks(&chain_segment, &chain_segment_blobs) - .into_iter() - .collect(); + let mut blocks: Vec> = + chain_segment_blocks(&chain_segment, &chain_segment_blobs, harness.spec.clone()) + .into_iter() + .collect(); let (mut block, signature) = blocks[3].as_block().clone().deconstruct(); *block.parent_root_mut() = Hash256::zero(); - blocks[3] = RpcBlock::new_without_blobs( - None, + blocks[3] = RpcBlock::new( Arc::new(SignedBeaconBlock::from_block(block, signature)), - ); + Some(AvailableBlockData::NoData), + harness.spec.clone(), + ) + .unwrap(); assert!( matches!( @@ -399,15 +419,18 @@ async fn chain_segment_non_linear_slots() { * Test where a child is lower than the parent. */ - let mut blocks: Vec> = chain_segment_blocks(&chain_segment, &chain_segment_blobs) - .into_iter() - .collect(); + let mut blocks: Vec> = + chain_segment_blocks(&chain_segment, &chain_segment_blobs, harness.spec.clone()) + .into_iter() + .collect(); let (mut block, signature) = blocks[3].as_block().clone().deconstruct(); *block.slot_mut() = Slot::new(0); - blocks[3] = RpcBlock::new_without_blobs( - None, + blocks[3] = RpcBlock::new( Arc::new(SignedBeaconBlock::from_block(block, signature)), - ); + Some(AvailableBlockData::NoData), + harness.spec.clone(), + ) + .unwrap(); assert!( matches!( @@ -425,15 +448,18 @@ async fn chain_segment_non_linear_slots() { * Test where a child is equal to the parent. */ - let mut blocks: Vec> = chain_segment_blocks(&chain_segment, &chain_segment_blobs) - .into_iter() - .collect(); + let mut blocks: Vec> = + chain_segment_blocks(&chain_segment, &chain_segment_blobs, harness.spec.clone()) + .into_iter() + .collect(); let (mut block, signature) = blocks[3].as_block().clone().deconstruct(); *block.slot_mut() = blocks[2].slot(); - blocks[3] = RpcBlock::new_without_blobs( - None, + blocks[3] = RpcBlock::new( Arc::new(SignedBeaconBlock::from_block(block, signature)), - ); + Some(AvailableBlockData::NoData), + harness.spec.clone(), + ) + .unwrap(); assert!( matches!( @@ -459,7 +485,9 @@ async fn assert_invalid_signature( let blocks: Vec> = snapshots .iter() .zip(chain_segment_blobs.iter()) - .map(|(snapshot, blobs)| build_rpc_block(snapshot.beacon_block.clone(), blobs)) + .map(|(snapshot, blobs)| { + build_rpc_block(snapshot.beacon_block.clone(), blobs, harness.spec.clone()) + }) .collect(); // Ensure the block will be rejected if imported in a chain segment. @@ -484,7 +512,9 @@ async fn assert_invalid_signature( .iter() .take(block_index) .zip(chain_segment_blobs.iter()) - .map(|(snapshot, blobs)| build_rpc_block(snapshot.beacon_block.clone(), blobs)) + .map(|(snapshot, blobs)| { + build_rpc_block(snapshot.beacon_block.clone(), blobs, harness.spec.clone()) + }) .collect(); // We don't care if this fails, we just call this to ensure that all prior blocks have been // imported prior to this test. @@ -501,6 +531,7 @@ async fn assert_invalid_signature( build_rpc_block( snapshots[block_index].beacon_block.clone(), &chain_segment_blobs[block_index], + harness.spec.clone(), ), NotifyExecutionLayer::Yes, BlockImportSource::Lookup, @@ -558,7 +589,9 @@ async fn invalid_signature_gossip_block() { .iter() .take(block_index) .zip(chain_segment_blobs.iter()) - .map(|(snapshot, blobs)| build_rpc_block(snapshot.beacon_block.clone(), blobs)) + .map(|(snapshot, blobs)| { + build_rpc_block(snapshot.beacon_block.clone(), blobs, harness.spec.clone()) + }) .collect(); harness .chain @@ -567,7 +600,7 @@ async fn invalid_signature_gossip_block() { .into_block_error() .expect("should import all blocks prior to the one being tested"); let signed_block = SignedBeaconBlock::from_block(block, junk_signature()); - let rpc_block = RpcBlock::new_without_blobs(None, Arc::new(signed_block)); + let rpc_block = RpcBlock::new(Arc::new(signed_block), None, harness.spec.clone()).unwrap(); let process_res = harness .chain .process_block( @@ -609,7 +642,9 @@ async fn invalid_signature_block_proposal() { let blocks: Vec> = snapshots .iter() .zip(chain_segment_blobs.iter()) - .map(|(snapshot, blobs)| build_rpc_block(snapshot.beacon_block.clone(), blobs)) + .map(|(snapshot, blobs)| { + build_rpc_block(snapshot.beacon_block.clone(), blobs, harness.spec.clone()) + }) .collect::>(); // Ensure the block will be rejected if imported in a chain segment. let process_res = harness @@ -926,7 +961,9 @@ async fn invalid_signature_deposit() { let blocks: Vec> = snapshots .iter() .zip(chain_segment_blobs.iter()) - .map(|(snapshot, blobs)| build_rpc_block(snapshot.beacon_block.clone(), blobs)) + .map(|(snapshot, blobs)| { + build_rpc_block(snapshot.beacon_block.clone(), blobs, harness.spec.clone()) + }) .collect(); assert!( !matches!( @@ -1568,7 +1605,8 @@ async fn add_base_block_to_altair_chain() { )); // Ensure that it would be impossible to import via `BeaconChain::process_block`. - let base_rpc_block = RpcBlock::new_without_blobs(None, Arc::new(base_block.clone())); + let base_rpc_block = + RpcBlock::new(Arc::new(base_block.clone()), None, harness.spec.clone()).unwrap(); assert!(matches!( harness .chain @@ -1592,7 +1630,7 @@ async fn add_base_block_to_altair_chain() { harness .chain .process_chain_segment( - vec![RpcBlock::new_without_blobs(None, Arc::new(base_block))], + vec![RpcBlock::new(Arc::new(base_block), None, harness.spec.clone()).unwrap()], NotifyExecutionLayer::Yes, ) .await, @@ -1705,7 +1743,8 @@ async fn add_altair_block_to_base_chain() { )); // Ensure that it would be impossible to import via `BeaconChain::process_block`. - let altair_rpc_block = RpcBlock::new_without_blobs(None, Arc::new(altair_block.clone())); + let altair_rpc_block = + RpcBlock::new(Arc::new(altair_block.clone()), None, harness.spec.clone()).unwrap(); assert!(matches!( harness .chain @@ -1729,7 +1768,7 @@ async fn add_altair_block_to_base_chain() { harness .chain .process_chain_segment( - vec![RpcBlock::new_without_blobs(None, Arc::new(altair_block))], + vec![RpcBlock::new(Arc::new(altair_block), None, harness.spec.clone()).unwrap()], NotifyExecutionLayer::Yes ) .await, @@ -1792,7 +1831,7 @@ async fn import_duplicate_block_unrealized_justification() { // Create two verified variants of the block, representing the same block being processed in // parallel. let notify_execution_layer = NotifyExecutionLayer::Yes; - let rpc_block = RpcBlock::new_without_blobs(Some(block_root), block.clone()); + let rpc_block = RpcBlock::new(block.clone(), None, harness.spec.clone()).unwrap(); let verified_block1 = rpc_block .clone() .into_execution_pending_block(block_root, chain, notify_execution_layer) diff --git a/beacon_node/beacon_chain/tests/column_verification.rs b/beacon_node/beacon_chain/tests/column_verification.rs index 229ae1e1998..4f3e928170d 100644 --- a/beacon_node/beacon_chain/tests/column_verification.rs +++ b/beacon_node/beacon_chain/tests/column_verification.rs @@ -80,7 +80,7 @@ async fn rpc_columns_with_invalid_header_signature() { // Process the block without blobs so that it doesn't become available. harness.advance_slot(); let rpc_block = harness - .build_rpc_block_from_blobs(block_root, signed_block.clone(), None) + .build_rpc_block_from_blobs(signed_block.clone(), None, false) .unwrap(); let availability = harness .chain diff --git a/beacon_node/beacon_chain/tests/payload_invalidation.rs b/beacon_node/beacon_chain/tests/payload_invalidation.rs index 5bd43835e33..9d2b152a9d8 100644 --- a/beacon_node/beacon_chain/tests/payload_invalidation.rs +++ b/beacon_node/beacon_chain/tests/payload_invalidation.rs @@ -685,7 +685,8 @@ async fn invalidates_all_descendants() { assert_eq!(fork_parent_state.slot(), fork_parent_slot); let ((fork_block, _), _fork_post_state) = rig.harness.make_block(fork_parent_state, fork_slot).await; - let fork_rpc_block = RpcBlock::new_without_blobs(None, fork_block.clone()); + let fork_rpc_block = + RpcBlock::new(fork_block.clone(), None, rig.harness.chain.spec.clone()).unwrap(); let fork_block_root = rig .harness .chain @@ -787,7 +788,8 @@ async fn switches_heads() { let ((fork_block, _), _fork_post_state) = rig.harness.make_block(fork_parent_state, fork_slot).await; let fork_parent_root = fork_block.parent_root(); - let fork_rpc_block = RpcBlock::new_without_blobs(None, fork_block.clone()); + let fork_rpc_block = + RpcBlock::new(fork_block.clone(), None, rig.harness.chain.spec.clone()).unwrap(); let fork_block_root = rig .harness .chain @@ -1059,7 +1061,7 @@ async fn invalid_parent() { )); // Ensure the block built atop an invalid payload is invalid for import. - let rpc_block = RpcBlock::new_without_blobs(None, block.clone()); + let rpc_block = RpcBlock::new(block.clone(), None, rig.harness.chain.spec.clone()).unwrap(); assert!(matches!( rig.harness.chain.process_block(rpc_block.block_root(), rpc_block, NotifyExecutionLayer::Yes, BlockImportSource::Lookup, || Ok(()), @@ -1384,7 +1386,8 @@ async fn recover_from_invalid_head_by_importing_blocks() { } = InvalidHeadSetup::new().await; // Import the fork block, it should become the head. - let fork_rpc_block = RpcBlock::new_without_blobs(None, fork_block.clone()); + let fork_rpc_block = + RpcBlock::new(fork_block.clone(), None, rig.harness.chain.spec.clone()).unwrap(); rig.harness .chain .process_block( diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 0733d901fc3..79de89abb6e 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -3040,7 +3040,7 @@ async fn weak_subjectivity_sync_test( AvailableBlock::__new_for_testing( block_root, Arc::new(corrupt_block), - data, + data.expect("Expect block data"), Arc::new(spec), ) }; @@ -3543,7 +3543,8 @@ async fn process_blocks_and_attestations_for_unaligned_checkpoint() { assert_eq!(split.block_root, valid_fork_block.parent_root()); assert_ne!(split.state_root, unadvanced_split_state_root); - let invalid_fork_rpc_block = RpcBlock::new_without_blobs(None, invalid_fork_block.clone()); + let invalid_fork_rpc_block = + RpcBlock::new(invalid_fork_block.clone(), None, harness.spec.clone()).unwrap(); // Applying the invalid block should fail. let err = harness .chain @@ -3559,7 +3560,8 @@ async fn process_blocks_and_attestations_for_unaligned_checkpoint() { assert!(matches!(err, BlockError::WouldRevertFinalizedSlot { .. })); // Applying the valid block should succeed, but it should not become head. - let valid_fork_rpc_block = RpcBlock::new_without_blobs(None, valid_fork_block.clone()); + let valid_fork_rpc_block = + RpcBlock::new(valid_fork_block.clone(), None, harness.spec.clone()).unwrap(); harness .chain .process_block( diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index 9671a72da26..7e90d73ccf0 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -314,9 +314,14 @@ pub async fn publish_block>( slot = %block.slot(), "Block previously seen" ); + let Ok(rpc_block) = RpcBlock::new(block.clone(), None, chain.spec.clone()) else { + return Err(warp_utils::reject::custom_bad_request( + "Unable to construct rpc block".to_string(), + )); + }; let import_result = Box::pin(chain.process_block( block_root, - RpcBlock::new_without_blobs(Some(block_root), block.clone()), + rpc_block, NotifyExecutionLayer::Yes, BlockImportSource::HttpApi, publish_fn, diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index e49ae134fe4..6cb0acf35a4 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -8,7 +8,6 @@ use crate::sync::{ }; use beacon_chain::block_verification_types::{AsBlock, RpcBlock}; use beacon_chain::data_availability_checker::AvailabilityCheckError; -use beacon_chain::data_availability_checker::MaybeAvailableBlock; use beacon_chain::historical_data_columns::HistoricalDataColumnError; use beacon_chain::{ AvailabilityProcessingStatus, BeaconChainTypes, BlockError, ChainSegmentResult, @@ -722,18 +721,21 @@ impl NetworkBeaconProcessor { downloaded_blocks: Vec>, ) -> (usize, Result<(), ChainSegmentFailed>) { let total_blocks = downloaded_blocks.len(); + let available_blocks = downloaded_blocks + .iter() + .filter_map(|rpc_block| match rpc_block { + RpcBlock::FullyAvailable(available_block) => Some(available_block.clone()), + // TODO filter and return error if this variant is found + RpcBlock::BlockOnly { .. } => None, + }) + .collect::>(); + let available_blocks = match self .chain .data_availability_checker - .verify_kzg_for_rpc_blocks(downloaded_blocks) + .verify_kzg_for_rpc_blocks(available_blocks) { - Ok(blocks) => blocks - .into_iter() - .filter_map(|maybe_available| match maybe_available { - MaybeAvailableBlock::Available(block) => Some(block), - MaybeAvailableBlock::AvailabilityPending { .. } => None, - }) - .collect::>(), + Ok(blocks) => blocks, Err(e) => match e { AvailabilityCheckError::StoreError(_) => { return ( diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 841a8679cfd..a0201eb4a7d 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -399,7 +399,7 @@ impl TestRig { self.network_beacon_processor .send_rpc_beacon_block( block_root, - RpcBlock::new_without_blobs(Some(block_root), self.next_block.clone()), + RpcBlock::new(self.next_block.clone(), None, self._harness.spec.clone()).unwrap(), std::time::Duration::default(), BlockProcessType::SingleBlock { id: 0 }, ) @@ -411,7 +411,7 @@ impl TestRig { self.network_beacon_processor .send_rpc_beacon_block( block_root, - RpcBlock::new_without_blobs(Some(block_root), self.next_block.clone()), + RpcBlock::new(self.next_block.clone(), None, self._harness.spec.clone()).unwrap(), std::time::Duration::default(), BlockProcessType::SingleBlock { id: 1 }, ) diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index ed9a11a03de..7cbe2e97465 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -1,5 +1,7 @@ use beacon_chain::{ - block_verification_types::RpcBlock, data_column_verification::CustodyDataColumn, get_block_root, + block_verification_types::{AvailableBlockData, RpcBlock}, + data_column_verification::CustodyDataColumn, + get_block_root, }; use lighthouse_network::{ PeerId, @@ -194,7 +196,7 @@ impl RangeBlockComponentsRequest { /// Returns `Some(Err(_))` if there are issues coupling blocks with their data. pub fn responses( &mut self, - spec: &ChainSpec, + spec: Arc, ) -> Option>, CouplingError>> { let Some(blocks) = self.blocks_request.to_finished() else { return None; @@ -202,9 +204,11 @@ impl RangeBlockComponentsRequest { // Increment the attempt once this function returns the response or errors match &mut self.block_data_request { - RangeBlockDataRequest::NoData => { - Some(Self::responses_with_blobs(blocks.to_vec(), vec![], spec)) - } + RangeBlockDataRequest::NoData => Some(Self::responses_with_blobs( + blocks.to_vec(), + vec![], + spec.clone(), + )), RangeBlockDataRequest::Blobs(request) => { let Some(blobs) = request.to_finished() else { return None; @@ -248,6 +252,7 @@ impl RangeBlockComponentsRequest { column_to_peer_id, expected_custody_columns, *attempt, + spec, ); if let Err(CouplingError::DataColumnPeerFailure { @@ -272,7 +277,7 @@ impl RangeBlockComponentsRequest { fn responses_with_blobs( blocks: Vec>>, blobs: Vec>>, - spec: &ChainSpec, + spec: Arc, ) -> Result>, CouplingError> { // There can't be more more blobs than blocks. i.e. sending any blob (empty // included) for a skipped slot is not permitted. @@ -315,8 +320,9 @@ impl RangeBlockComponentsRequest { .map_err(|_| { CouplingError::BlobPeerFailure("Blobs returned exceeds max length".to_string()) })?; + let block_data = AvailableBlockData::new(Some(blobs), None); responses.push( - RpcBlock::new(None, block, Some(blobs)) + RpcBlock::new(block, Some(block_data), spec.clone()) .map_err(|e| CouplingError::BlobPeerFailure(format!("{e:?}")))?, ) } @@ -339,6 +345,7 @@ impl RangeBlockComponentsRequest { column_to_peer: HashMap, expects_custody_columns: &[ColumnIndex], attempt: usize, + spec: Arc, ) -> Result>, CouplingError> { // Group data columns by block_root and index let mut data_columns_by_block = @@ -415,11 +422,14 @@ impl RangeBlockComponentsRequest { ); } - RpcBlock::new_with_custody_columns(Some(block_root), block, custody_columns) + let block_data = AvailableBlockData::new(None, Some(custody_columns.iter().map(|c| c.as_data_column().clone()).collect::>())); + + RpcBlock::new(block, Some(block_data), spec.clone()) .map_err(|e| CouplingError::InternalError(format!("{:?}", e)))? } else { // Block has no data, expects zero columns - RpcBlock::new_without_blobs(Some(block_root), block) + RpcBlock::new(block, Some(AvailableBlockData::NoData), spec.clone()) + .map_err(|e| CouplingError::InternalError(format!("{:?}", e)))? }); } @@ -512,8 +522,8 @@ mod tests { } fn is_finished(info: &mut RangeBlockComponentsRequest) -> bool { - let spec = test_spec::(); - info.responses(&spec).is_some() + let spec = Arc::new(test_spec::()); + info.responses(spec).is_some() } #[test] @@ -534,8 +544,10 @@ mod tests { // Send blocks and complete terminate response info.add_blocks(blocks_req_id, blocks).unwrap(); + let spec = Arc::new(test_spec::()); + // Assert response is finished and RpcBlocks can be constructed - info.responses(&test_spec::()).unwrap().unwrap(); + info.responses(spec).unwrap().unwrap(); } #[test] @@ -565,15 +577,17 @@ mod tests { // Expect no blobs returned info.add_blobs(blobs_req_id, vec![]).unwrap(); + let spec = Arc::new(test_spec::()); + // Assert response is finished and RpcBlocks can be constructed, even if blobs weren't returned. // This makes sure we don't expect blobs here when they have expired. Checking this logic should // be hendled elsewhere. - info.responses(&test_spec::()).unwrap().unwrap(); + info.responses(spec).unwrap().unwrap(); } #[test] fn rpc_block_with_custody_columns() { - let spec = test_spec::(); + let spec = Arc::new(test_spec::()); let expects_custody_columns = vec![1, 2, 3, 4]; let mut rng = XorShiftRng::from_seed([42; 16]); let blocks = (0..4) @@ -638,12 +652,12 @@ mod tests { } // All completed construct response - info.responses(&spec).unwrap().unwrap(); + info.responses(spec).unwrap().unwrap(); } #[test] fn rpc_block_with_custody_columns_batched() { - let spec = test_spec::(); + let spec = Arc::new(test_spec::()); let batched_column_requests = [vec![1_u64, 2], vec![3, 4]]; let expects_custody_columns = batched_column_requests .iter() @@ -723,13 +737,13 @@ mod tests { } // All completed construct response - info.responses(&spec).unwrap().unwrap(); + info.responses(spec).unwrap().unwrap(); } #[test] fn missing_custody_columns_from_faulty_peers() { // GIVEN: A request expecting custody columns from multiple peers - let spec = test_spec::(); + let spec = Arc::new(test_spec::()); let expected_custody_columns = vec![1, 2, 3, 4]; let mut rng = XorShiftRng::from_seed([42; 16]); let blocks = (0..2) @@ -792,7 +806,7 @@ mod tests { } // WHEN: Attempting to construct RPC blocks - let result = info.responses(&spec).unwrap(); + let result = info.responses(spec).unwrap(); // THEN: Should fail with PeerFailure identifying the faulty peers assert!(result.is_err()); @@ -815,7 +829,7 @@ mod tests { #[test] fn retry_logic_after_peer_failures() { // GIVEN: A request expecting custody columns where some peers initially fail - let spec = test_spec::(); + let spec = Arc::new(test_spec::()); let expected_custody_columns = vec![1, 2]; let mut rng = XorShiftRng::from_seed([42; 16]); let blocks = (0..2) @@ -874,7 +888,7 @@ mod tests { info.add_custody_columns(*req2, vec![]).unwrap(); // WHEN: First attempt to get responses fails - let result = info.responses(&spec).unwrap(); + let result = info.responses(spec.clone()).unwrap(); assert!(result.is_err()); // AND: We retry with a new peer for the failed column @@ -897,7 +911,7 @@ mod tests { .unwrap(); // WHEN: Attempting to get responses again - let result = info.responses(&spec).unwrap(); + let result = info.responses(spec).unwrap(); // THEN: Should succeed with complete RPC blocks assert!(result.is_ok()); @@ -908,7 +922,7 @@ mod tests { #[test] fn max_retries_exceeded_behavior() { // GIVEN: A request where peers consistently fail to provide required columns - let spec = test_spec::(); + let spec = Arc::new(test_spec::()); let expected_custody_columns = vec![1, 2]; let mut rng = XorShiftRng::from_seed([42; 16]); let blocks = (0..1) @@ -968,7 +982,7 @@ mod tests { // WHEN: Multiple retry attempts are made (up to max retries) for _ in 0..MAX_COLUMN_RETRIES { - let result = info.responses(&spec).unwrap(); + let result = info.responses(spec.clone()).unwrap(); assert!(result.is_err()); if let Err(super::CouplingError::DataColumnPeerFailure { @@ -981,7 +995,7 @@ mod tests { } // AND: One final attempt after exceeding max retries - let result = info.responses(&spec).unwrap(); + let result = info.responses(spec).unwrap(); // THEN: Should fail with exceeded_retries = true assert!(result.is_err()); diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 2e0c56db23f..ff7b476a96f 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -777,7 +777,7 @@ impl SyncNetworkContext { } let range_req = entry.get_mut(); - if let Some(blocks_result) = range_req.responses(&self.chain.spec) { + if let Some(blocks_result) = range_req.responses(self.chain.spec.clone()) { if let Err(CouplingError::DataColumnPeerFailure { error, faulty_peers: _, @@ -1606,7 +1606,8 @@ impl SyncNetworkContext { .beacon_processor_if_enabled() .ok_or(SendErrorProcessor::ProcessorNotAvailable)?; - let block = RpcBlock::new_without_blobs(Some(block_root), block); + let block = RpcBlock::new(block, None, self.chain.spec.clone()) + .map_err(|_| SendErrorProcessor::SendError)?; debug!(block = ?block_root, id, "Sending block for processing"); // Lookup sync event safety: If `beacon_processor.send_rpc_beacon_block` returns Ok() sync diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index ef52f896785..34a55f46f4f 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -1929,7 +1929,6 @@ mod deneb_only { block_verification_types::{AsBlock, RpcBlock}, data_availability_checker::AvailabilityCheckError, }; - use ssz_types::RuntimeVariableList; use std::collections::VecDeque; struct DenebTester { @@ -2283,17 +2282,9 @@ mod deneb_only { fn parent_block_unknown_parent(mut self) -> Self { self.rig.log("parent_block_unknown_parent"); let block = self.unknown_parent_block.take().unwrap(); - let max_len = self.rig.spec.max_blobs_per_block(block.epoch()) as usize; // Now this block is the one we expect requests from self.block = block.clone(); - let block = RpcBlock::new( - Some(block.canonical_root()), - block, - self.unknown_parent_blobs - .take() - .map(|vec| RuntimeVariableList::new(vec, max_len).unwrap()), - ) - .unwrap(); + let block = RpcBlock::new(block, None, self.rig.spec.clone()).unwrap(); self.rig.parent_block_processed( self.block_root, BlockProcessingResult::Err(BlockError::ParentUnknown { diff --git a/beacon_node/network/src/sync/tests/range.rs b/beacon_node/network/src/sync/tests/range.rs index cb728a90c1b..7098374a028 100644 --- a/beacon_node/network/src/sync/tests/range.rs +++ b/beacon_node/network/src/sync/tests/range.rs @@ -5,6 +5,7 @@ use crate::sync::SyncMessage; use crate::sync::manager::SLOT_IMPORT_TOLERANCE; use crate::sync::network_context::RangeRequestId; use crate::sync::range_sync::RangeSyncType; +use beacon_chain::block_verification_types::AvailableBlockData; use beacon_chain::data_column_verification::CustodyDataColumn; use beacon_chain::test_utils::{AttestationStrategy, BlockStrategy}; use beacon_chain::{EngineState, NotifyExecutionLayer, block_verification_types::RpcBlock}; @@ -427,7 +428,7 @@ impl TestRig { .chain .process_block( block_root, - build_rpc_block(block.into(), &data_sidecars), + build_rpc_block(block.into(), &data_sidecars, self.spec.clone()), NotifyExecutionLayer::Yes, BlockImportSource::RangeSync, || Ok(()), @@ -443,16 +444,27 @@ impl TestRig { fn build_rpc_block( block: Arc>, data_sidecars: &Option>, + spec: Arc, ) -> RpcBlock { match data_sidecars { Some(DataSidecars::Blobs(blobs)) => { - RpcBlock::new(None, block, Some(blobs.clone())).unwrap() + let block_data = AvailableBlockData::new(Some(blobs.clone()), None); + RpcBlock::new(block, Some(block_data), spec).unwrap() } Some(DataSidecars::DataColumns(columns)) => { - RpcBlock::new_with_custody_columns(None, block, columns.clone()).unwrap() + let block_data = AvailableBlockData::new( + None, + Some( + columns + .iter() + .map(|c| c.as_data_column().clone()) + .collect::>(), + ), + ); + RpcBlock::new(block, Some(block_data), spec).unwrap() } // Block has no data, expects zero columns - None => RpcBlock::new_without_blobs(None, block), + None => RpcBlock::new(block, Some(AvailableBlockData::NoData), spec).unwrap(), } } diff --git a/testing/ef_tests/src/cases/fork_choice.rs b/testing/ef_tests/src/cases/fork_choice.rs index 8e9d438a243..49c0b17ddaf 100644 --- a/testing/ef_tests/src/cases/fork_choice.rs +++ b/testing/ef_tests/src/cases/fork_choice.rs @@ -542,13 +542,16 @@ impl Tester { let block = Arc::new(block); let result: Result, _> = self - .block_on_dangerous(self.harness.chain.process_block( - block_root, - RpcBlock::new_without_blobs(Some(block_root), block.clone()), - NotifyExecutionLayer::Yes, - BlockImportSource::Lookup, - || Ok(()), - ))? + .block_on_dangerous( + self.harness.chain.process_block( + block_root, + RpcBlock::new(block.clone(), None, self.harness.chain.spec.clone()) + .map_err(|e| Error::InternalError(format!("{:?}", e)))?, + NotifyExecutionLayer::Yes, + BlockImportSource::Lookup, + || Ok(()), + ), + )? .map(|avail: AvailabilityProcessingStatus| avail.try_into()); let success = data_column_success && result.as_ref().is_ok_and(|inner| inner.is_ok()); if success != valid { @@ -632,13 +635,16 @@ impl Tester { let block = Arc::new(block); let result: Result, _> = self - .block_on_dangerous(self.harness.chain.process_block( - block_root, - RpcBlock::new_without_blobs(Some(block_root), block.clone()), - NotifyExecutionLayer::Yes, - BlockImportSource::Lookup, - || Ok(()), - ))? + .block_on_dangerous( + self.harness.chain.process_block( + block_root, + RpcBlock::new(block.clone(), None, self.harness.chain.spec.clone()) + .map_err(|e| Error::InternalError(format!("{:?}", e)))?, + NotifyExecutionLayer::Yes, + BlockImportSource::Lookup, + || Ok(()), + ), + )? .map(|avail: AvailabilityProcessingStatus| avail.try_into()); let success = blob_success && result.as_ref().is_ok_and(|inner| inner.is_ok()); if success != valid {