From 167fef317680abbdf7d008485090e6c43b14ea06 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Fri, 14 Nov 2025 21:15:09 -0300 Subject: [PATCH] Consistent logic to select range sync start_slot --- beacon_node/network/src/sync/manager.rs | 50 +++------ .../network/src/sync/peer_sync_info.rs | 100 +++++++++++++++--- .../src/sync/range_sync/chain_collection.rs | 58 +++++----- .../network/src/sync/range_sync/range.rs | 63 +++++------ 4 files changed, 160 insertions(+), 111 deletions(-) diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 338f21ce987..d062979a165 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -42,12 +42,12 @@ use super::peer_sync_info::{PeerSyncType, remote_sync_type}; use super::range_sync::{EPOCHS_PER_BATCH, RangeSync, RangeSyncType}; use crate::network_beacon_processor::{ChainSegmentProcessId, NetworkBeaconProcessor}; use crate::service::NetworkMessage; -use crate::status::ToStatusMessage; use crate::sync::block_lookups::{ BlobRequestState, BlockComponent, BlockRequestState, CustodyRequestState, DownloadResult, }; use crate::sync::custody_backfill_sync::CustodyBackFillSync; use crate::sync::network_context::{PeerGroup, RpcResponseResult}; +use crate::sync::peer_sync_info::{LocalSyncInfo, PeerSyncTypeAdvanced}; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::validator_monitor::timestamp_now; use beacon_chain::{ @@ -384,16 +384,7 @@ impl SyncManager { /// If the peer is within the `SLOT_IMPORT_TOLERANCE`, then it's head is sufficiently close to /// ours that we consider it fully sync'd with respect to our current chain. fn add_peer(&mut self, peer_id: PeerId, remote: SyncInfo) { - // ensure the beacon chain still exists - let status = self.chain.status_message(); - let local = SyncInfo { - head_slot: *status.head_slot(), - head_root: *status.head_root(), - finalized_epoch: *status.finalized_epoch(), - finalized_root: *status.finalized_root(), - earliest_available_slot: status.earliest_available_slot().ok().cloned(), - }; - + let local = LocalSyncInfo::new(&self.chain); let sync_type = remote_sync_type(&local, &remote, &self.chain); // update the state of the peer. @@ -401,9 +392,9 @@ impl SyncManager { if is_still_connected { match sync_type { PeerSyncType::Behind => {} // Do nothing - PeerSyncType::Advanced => { + PeerSyncType::Advanced(advanced_type) => { self.range_sync - .add_peer(&mut self.network, local, peer_id, remote); + .add_peer(&mut self.network, local, peer_id, advanced_type); } PeerSyncType::FullySynced => { // Sync considers this peer close enough to the head to not trigger range sync. @@ -438,15 +429,7 @@ impl SyncManager { head_root: Hash256, head_slot: Option, ) { - let status = self.chain.status_message(); - let local = SyncInfo { - head_slot: *status.head_slot(), - head_root: *status.head_root(), - finalized_epoch: *status.finalized_epoch(), - finalized_root: *status.finalized_root(), - earliest_available_slot: status.earliest_available_slot().ok().cloned(), - }; - + let local = LocalSyncInfo::new(&self.chain); let head_slot = head_slot.unwrap_or_else(|| { debug!( local_head_slot = %local.head_slot, @@ -456,18 +439,17 @@ impl SyncManager { local.head_slot }); - let remote = SyncInfo { - head_slot, - head_root, - // Set finalized to same as local to trigger Head sync - finalized_epoch: local.finalized_epoch, - finalized_root: local.finalized_root, - earliest_available_slot: local.earliest_available_slot, - }; - for peer_id in peers { - self.range_sync - .add_peer(&mut self.network, local.clone(), *peer_id, remote.clone()); + self.range_sync.add_peer( + &mut self.network, + local.clone(), + *peer_id, + PeerSyncTypeAdvanced::Head { + target_root: head_root, + target_slot: head_slot, + start_epoch: local.local_irreversible_epoch, + }, + ); } } @@ -542,7 +524,7 @@ impl SyncManager { fn update_peer_sync_state( &mut self, peer_id: &PeerId, - local_sync_info: &SyncInfo, + local_sync_info: &LocalSyncInfo, remote_sync_info: &SyncInfo, sync_type: &PeerSyncType, ) -> bool { diff --git a/beacon_node/network/src/sync/peer_sync_info.rs b/beacon_node/network/src/sync/peer_sync_info.rs index 5ea1533d350..35566fa6a01 100644 --- a/beacon_node/network/src/sync/peer_sync_info.rs +++ b/beacon_node/network/src/sync/peer_sync_info.rs @@ -1,14 +1,16 @@ use super::manager::SLOT_IMPORT_TOLERANCE; +use crate::status::ToStatusMessage; use beacon_chain::{BeaconChain, BeaconChainTypes}; use lighthouse_network::{SyncInfo, SyncStatus as PeerSyncStatus}; use std::cmp::Ordering; +use types::{Epoch, EthSpec, Hash256, Slot}; /// The type of peer relative to our current state. pub enum PeerSyncType { /// The peer is on our chain and is fully synced with respect to our chain. FullySynced, /// The peer has a greater knowledge of the chain than us that warrants a full sync. - Advanced, + Advanced(PeerSyncTypeAdvanced), /// A peer is behind in the sync and not useful to us for downloading blocks. Behind, } @@ -18,13 +20,52 @@ impl PeerSyncType { match self { PeerSyncType::FullySynced => PeerSyncStatus::Synced { info: info.clone() }, PeerSyncType::Behind => PeerSyncStatus::Behind { info: info.clone() }, - PeerSyncType::Advanced => PeerSyncStatus::Advanced { info: info.clone() }, + PeerSyncType::Advanced(_) => PeerSyncStatus::Advanced { info: info.clone() }, + } + } +} + +pub enum PeerSyncTypeAdvanced { + Finalized { + target_slot: Slot, + target_root: Hash256, + start_epoch: Epoch, + }, + Head { + target_slot: Slot, + target_root: Hash256, + start_epoch: Epoch, + }, +} + +#[derive(Clone)] +pub(crate) struct LocalSyncInfo { + pub head_slot: Slot, + pub finalized_epoch: Epoch, + pub local_irreversible_epoch: Epoch, +} + +impl LocalSyncInfo { + pub fn new(chain: &BeaconChain) -> Self { + let status = chain.status_message(); + // Max with the store in case the node has triggered manual finalization + let local_irreversible_epoch = std::cmp::max( + chain.head().finalized_checkpoint().epoch, + chain + .store + .get_split_slot() + .epoch(T::EthSpec::slots_per_epoch()), + ); + Self { + head_slot: *status.head_slot(), + finalized_epoch: *status.finalized_epoch(), + local_irreversible_epoch, } } } pub fn remote_sync_type( - local: &SyncInfo, + local: &LocalSyncInfo, remote: &SyncInfo, chain: &BeaconChain, ) -> PeerSyncType { @@ -33,6 +74,10 @@ pub fn remote_sync_type( let near_range_start = local.head_slot.saturating_sub(SLOT_IMPORT_TOLERANCE); let near_range_end = local.head_slot.saturating_add(SLOT_IMPORT_TOLERANCE); + // With the remote peer's status message let's figure out if there are enough blocks to discover + // that we trigger sync from them. We don't want to sync any blocks from epochs prior to the + // local irreversible epoch. Our finalized epoch may be less than the local irreversible epoch. + match remote.finalized_epoch.cmp(&local.finalized_epoch) { Ordering::Less => { // The node has a lower finalized epoch, their chain is not useful to us. There are two @@ -63,7 +108,11 @@ pub fn remote_sync_type( { // This peer has a head ahead enough of ours and we have no knowledge of their best // block. - PeerSyncType::Advanced + PeerSyncType::Advanced(PeerSyncTypeAdvanced::Head { + target_root: remote.head_root, + target_slot: remote.head_slot, + start_epoch: local.local_irreversible_epoch, + }) } else { // This peer is either in the tolerance range, or ahead us with an already rejected // block. @@ -71,16 +120,43 @@ pub fn remote_sync_type( } } Ordering::Greater => { - if (local.finalized_epoch + 1 == remote.finalized_epoch - && near_range_start <= remote.head_slot - && remote.head_slot <= near_range_end) - || chain.block_is_known_to_fork_choice(&remote.head_root) - { - // This peer is near enough to us to be considered synced, or - // we have already synced up to this peer's head + if chain.block_is_known_to_fork_choice(&remote.head_root) { + // We have already synced up to this peer's head PeerSyncType::FullySynced } else { - PeerSyncType::Advanced + let finality_advanced = remote.finalized_epoch > local.finalized_epoch + 1; + let head_advanced = remote.head_slot > near_range_end; + let finality_ahead_local_irreversible = + remote.finalized_epoch > local.local_irreversible_epoch; + + if finality_advanced { + if finality_ahead_local_irreversible { + PeerSyncType::Advanced(PeerSyncTypeAdvanced::Finalized { + target_root: remote.finalized_root, + target_slot: remote + .finalized_epoch + .start_slot(T::EthSpec::slots_per_epoch()), + start_epoch: local.local_irreversible_epoch, + }) + } else if head_advanced { + PeerSyncType::Advanced(PeerSyncTypeAdvanced::Head { + target_root: remote.head_root, + target_slot: remote.head_slot, + start_epoch: local.local_irreversible_epoch, + }) + } else { + PeerSyncType::FullySynced + } + } else if head_advanced { + PeerSyncType::Advanced(PeerSyncTypeAdvanced::Head { + target_root: remote.head_root, + target_slot: remote.head_slot, + start_epoch: local.local_irreversible_epoch, + }) + } else { + // This peer is near enough to us to be considered synced + PeerSyncType::FullySynced + } } } } diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index 1d57ee6c3dc..a234e3bb6b9 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -7,14 +7,14 @@ use super::chain::{ChainId, ProcessingResult, RemoveChain, SyncingChain}; use super::sync_type::RangeSyncType; use crate::metrics; use crate::sync::network_context::SyncNetworkContext; +use crate::sync::peer_sync_info::LocalSyncInfo; +use crate::sync::range_sync::range::AwaitingHeadPeers; use beacon_chain::{BeaconChain, BeaconChainTypes}; use fnv::FnvHashMap; use lighthouse_network::PeerId; -use lighthouse_network::SyncInfo; use lighthouse_network::service::api_types::Id; use logging::crit; use smallvec::SmallVec; -use std::collections::HashMap; use std::collections::hash_map::Entry; use std::sync::Arc; use tracing::{debug, error}; @@ -193,24 +193,18 @@ impl ChainCollection { pub fn update( &mut self, network: &mut SyncNetworkContext, - local: &SyncInfo, - awaiting_head_peers: &mut HashMap, + local: &LocalSyncInfo, + awaiting_head_peers: &mut AwaitingHeadPeers, ) { // Remove any outdated finalized/head chains self.purge_outdated_chains(local, awaiting_head_peers); - let local_head_epoch = local.head_slot.epoch(T::EthSpec::slots_per_epoch()); // Choose the best finalized chain if one needs to be selected. - self.update_finalized_chains(network, local.finalized_epoch, local_head_epoch); + self.update_finalized_chains(network, local); if !matches!(self.state, RangeSyncState::Finalized(_)) { // Handle head syncing chains if there are no finalized chains left. - self.update_head_chains( - network, - local.finalized_epoch, - local_head_epoch, - awaiting_head_peers, - ); + self.update_head_chains(network, local, awaiting_head_peers); } } @@ -253,8 +247,7 @@ impl ChainCollection { fn update_finalized_chains( &mut self, network: &mut SyncNetworkContext, - local_epoch: Epoch, - local_head_epoch: Epoch, + local: &LocalSyncInfo, ) { // Find the chain with most peers and check if it is already syncing if let Some((mut new_id, max_peers)) = self @@ -303,8 +296,11 @@ impl ChainCollection { // update the state to a new finalized state self.state = RangeSyncState::Finalized(new_id); - if let Err(remove_reason) = chain.start_syncing(network, local_epoch, local_head_epoch) - { + if let Err(remove_reason) = chain.start_syncing( + network, + local.local_irreversible_epoch, + local.head_slot.epoch(T::EthSpec::slots_per_epoch()), + ) { if remove_reason.is_critical() { crit!(chain = new_id, reason = ?remove_reason, "Chain removed while switching chains"); } else { @@ -321,17 +317,16 @@ impl ChainCollection { fn update_head_chains( &mut self, network: &mut SyncNetworkContext, - local_epoch: Epoch, - local_head_epoch: Epoch, - awaiting_head_peers: &mut HashMap, + local: &LocalSyncInfo, + awaiting_head_peers: &mut AwaitingHeadPeers, ) { // Include the awaiting head peers - for (peer_id, peer_sync_info) in awaiting_head_peers.drain() { + for (peer_id, (target_root, target_slot)) in awaiting_head_peers.drain() { debug!("including head peer"); self.add_peer_or_create_chain( - local_epoch, - peer_sync_info.head_root, - peer_sync_info.head_slot, + local.local_irreversible_epoch, + target_root, + target_slot, peer_id, RangeSyncType::Head, network, @@ -361,9 +356,11 @@ impl ChainCollection { if !chain.is_syncing() { debug!(id = chain.id(), "New head chain started syncing"); } - if let Err(remove_reason) = - chain.start_syncing(network, local_epoch, local_head_epoch) - { + if let Err(remove_reason) = chain.start_syncing( + network, + local.local_irreversible_epoch, + local.head_slot.epoch(T::EthSpec::slots_per_epoch()), + ) { self.head_chains.remove(&id); if remove_reason.is_critical() { crit!(chain = id, reason = ?remove_reason, "Chain removed while switching head chains"); @@ -396,8 +393,8 @@ impl ChainCollection { /// finalized block slot. Peers that would create outdated chains are removed too. pub fn purge_outdated_chains( &mut self, - local_info: &SyncInfo, - awaiting_head_peers: &mut HashMap, + local_info: &LocalSyncInfo, + awaiting_head_peers: &mut AwaitingHeadPeers, ) { let local_finalized_slot = local_info .finalized_epoch @@ -411,9 +408,8 @@ impl ChainCollection { }; // Retain only head peers that remain relevant - awaiting_head_peers.retain(|_peer_id, peer_sync_info| { - !is_outdated(&peer_sync_info.head_slot, &peer_sync_info.head_root) - }); + awaiting_head_peers + .retain(|_peer_id, (target_root, target_slot)| !is_outdated(target_slot, target_root)); // Remove chains that are out-dated let mut removed_chains = Vec::new(); diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index c9656ad1d0d..734d1107d25 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -43,25 +43,27 @@ use super::chain::{ChainId, RemoveChain, SyncingChain}; use super::chain_collection::{ChainCollection, SyncChainStatus}; use super::sync_type::RangeSyncType; use crate::metrics; -use crate::status::ToStatusMessage; use crate::sync::BatchProcessResult; use crate::sync::batch::BatchId; use crate::sync::network_context::{RpcResponseError, SyncNetworkContext}; +use crate::sync::peer_sync_info::{LocalSyncInfo, PeerSyncTypeAdvanced}; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{BeaconChain, BeaconChainTypes}; +use lighthouse_network::PeerId; use lighthouse_network::rpc::GoodbyeReason; use lighthouse_network::service::api_types::Id; -use lighthouse_network::{PeerId, SyncInfo}; use logging::crit; use lru_cache::LRUTimeCache; use std::collections::HashMap; use std::sync::Arc; use tracing::{debug, trace, warn}; -use types::{Epoch, EthSpec, Hash256}; +use types::{Epoch, EthSpec, Hash256, Slot}; /// For how long we store failed finalized chains to prevent retries. const FAILED_CHAINS_EXPIRY_SECONDS: u64 = 30; +pub(crate) type AwaitingHeadPeers = HashMap; + /// The primary object dealing with long range/batch syncing. This contains all the active and /// non-active chains that need to be processed before the syncing is considered complete. This /// holds the current state of the long range sync. @@ -70,7 +72,7 @@ pub struct RangeSync { beacon_chain: Arc>, /// Last known sync info of our useful connected peers. We use this information to create Head /// chains after all finalized chains have ended. - awaiting_head_peers: HashMap, + awaiting_head_peers: AwaitingHeadPeers, /// A collection of chains that need to be downloaded. This stores any head or finalized chains /// that need to be downloaded. chains: ChainCollection, @@ -110,29 +112,28 @@ where pub fn add_peer( &mut self, network: &mut SyncNetworkContext, - local_info: SyncInfo, + local_info: LocalSyncInfo, peer_id: PeerId, - remote_info: SyncInfo, + advanced_type: PeerSyncTypeAdvanced, ) { // evaluate which chain to sync from // determine if we need to run a sync to the nearest finalized state or simply sync to // its current head - // convenience variable - let remote_finalized_slot = remote_info - .finalized_epoch - .start_slot(T::EthSpec::slots_per_epoch()); - // NOTE: A peer that has been re-status'd may now exist in multiple finalized chains. This // is OK since we since only one finalized chain at a time. // determine which kind of sync to perform and set up the chains - match RangeSyncType::new(self.beacon_chain.as_ref(), &local_info, &remote_info) { - RangeSyncType::Finalized => { + match advanced_type { + PeerSyncTypeAdvanced::Finalized { + target_root, + target_slot, + start_epoch, + } => { // Make sure we have not recently tried this chain - if self.failed_chains.contains(&remote_info.finalized_root) { - debug!(failed_root = ?remote_info.finalized_root, %peer_id,"Disconnecting peer that belongs to previously failed chain"); + if self.failed_chains.contains(&target_root) { + debug!(failed_root = ?target_root, %peer_id,"Disconnecting peer that belongs to previously failed chain"); network.goodbye_peer(peer_id, GoodbyeReason::IrrelevantNetwork); return; } @@ -145,15 +146,14 @@ where // to using exact epoch boundaries for batches (rather than one slot past the epoch // boundary), we need to sync finalized sync to 2 epochs + 1 slot past our peer's // finalized slot in order to finalize the chain locally. - let target_head_slot = - remote_finalized_slot + (2 * T::EthSpec::slots_per_epoch()) + 1; + let target_head_slot = target_slot + (2 * T::EthSpec::slots_per_epoch()) + 1; // Note: We keep current head chains. These can continue syncing whilst we complete // this new finalized chain. self.chains.add_peer_or_create_chain( - local_info.finalized_epoch, - remote_info.finalized_root, + start_epoch, + target_root, target_head_slot, peer_id, RangeSyncType::Finalized, @@ -163,14 +163,19 @@ where self.chains .update(network, &local_info, &mut self.awaiting_head_peers); } - RangeSyncType::Head => { + PeerSyncTypeAdvanced::Head { + target_root, + target_slot, + start_epoch, + } => { // This peer requires a head chain sync if self.chains.is_finalizing_sync() { // If there are finalized chains to sync, finish these first, before syncing head // chains. trace!(%peer_id, awaiting_head_peers = &self.awaiting_head_peers.len(),"Waiting for finalized sync to complete"); - self.awaiting_head_peers.insert(peer_id, remote_info); + self.awaiting_head_peers + .insert(peer_id, (target_root, target_slot)); return; } @@ -181,12 +186,10 @@ where // The new peer has the same finalized (earlier filters should prevent a peer with an // earlier finalized chain from reaching here). - let start_epoch = std::cmp::min(local_info.head_slot, remote_finalized_slot) - .epoch(T::EthSpec::slots_per_epoch()); self.chains.add_peer_or_create_chain( start_epoch, - remote_info.head_root, - remote_info.head_slot, + target_root, + target_slot, peer_id, RangeSyncType::Head, network, @@ -357,16 +360,8 @@ where network.status_peers(self.beacon_chain.as_ref(), chain.peers()); - let status = self.beacon_chain.status_message(); - let local = SyncInfo { - head_slot: *status.head_slot(), - head_root: *status.head_root(), - finalized_epoch: *status.finalized_epoch(), - finalized_root: *status.finalized_root(), - earliest_available_slot: status.earliest_available_slot().ok().cloned(), - }; - // update the state of the collection + let local = LocalSyncInfo::new(&self.beacon_chain); self.chains .update(network, &local, &mut self.awaiting_head_peers); }