Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 16 additions & 34 deletions beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ use super::peer_sync_info::{PeerSyncType, remote_sync_type};
use super::range_sync::{EPOCHS_PER_BATCH, RangeSync, RangeSyncType};
use crate::network_beacon_processor::{ChainSegmentProcessId, NetworkBeaconProcessor};
use crate::service::NetworkMessage;
use crate::status::ToStatusMessage;
use crate::sync::block_lookups::{
BlobRequestState, BlockComponent, BlockRequestState, CustodyRequestState, DownloadResult,
};
use crate::sync::custody_backfill_sync::CustodyBackFillSync;
use crate::sync::network_context::{PeerGroup, RpcResponseResult};
use crate::sync::peer_sync_info::{LocalSyncInfo, PeerSyncTypeAdvanced};
use beacon_chain::block_verification_types::AsBlock;
use beacon_chain::validator_monitor::timestamp_now;
use beacon_chain::{
Expand Down Expand Up @@ -384,26 +384,17 @@ impl<T: BeaconChainTypes> SyncManager<T> {
/// If the peer is within the `SLOT_IMPORT_TOLERANCE`, then it's head is sufficiently close to
/// ours that we consider it fully sync'd with respect to our current chain.
fn add_peer(&mut self, peer_id: PeerId, remote: SyncInfo) {
// ensure the beacon chain still exists
let status = self.chain.status_message();
let local = SyncInfo {
head_slot: *status.head_slot(),
head_root: *status.head_root(),
finalized_epoch: *status.finalized_epoch(),
finalized_root: *status.finalized_root(),
earliest_available_slot: status.earliest_available_slot().ok().cloned(),
};

let local = LocalSyncInfo::new(&self.chain);
let sync_type = remote_sync_type(&local, &remote, &self.chain);

// update the state of the peer.
let is_still_connected = self.update_peer_sync_state(&peer_id, &local, &remote, &sync_type);
if is_still_connected {
match sync_type {
PeerSyncType::Behind => {} // Do nothing
PeerSyncType::Advanced => {
PeerSyncType::Advanced(advanced_type) => {
self.range_sync
.add_peer(&mut self.network, local, peer_id, remote);
.add_peer(&mut self.network, local, peer_id, advanced_type);
}
PeerSyncType::FullySynced => {
// Sync considers this peer close enough to the head to not trigger range sync.
Expand Down Expand Up @@ -438,15 +429,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
head_root: Hash256,
head_slot: Option<Slot>,
) {
let status = self.chain.status_message();
let local = SyncInfo {
head_slot: *status.head_slot(),
head_root: *status.head_root(),
finalized_epoch: *status.finalized_epoch(),
finalized_root: *status.finalized_root(),
earliest_available_slot: status.earliest_available_slot().ok().cloned(),
};

let local = LocalSyncInfo::new(&self.chain);
let head_slot = head_slot.unwrap_or_else(|| {
debug!(
local_head_slot = %local.head_slot,
Expand All @@ -456,18 +439,17 @@ impl<T: BeaconChainTypes> SyncManager<T> {
local.head_slot
});

let remote = SyncInfo {
head_slot,
head_root,
// Set finalized to same as local to trigger Head sync
finalized_epoch: local.finalized_epoch,
finalized_root: local.finalized_root,
earliest_available_slot: local.earliest_available_slot,
};

for peer_id in peers {
self.range_sync
.add_peer(&mut self.network, local.clone(), *peer_id, remote.clone());
self.range_sync.add_peer(
&mut self.network,
local.clone(),
*peer_id,
PeerSyncTypeAdvanced::Head {
target_root: head_root,
target_slot: head_slot,
start_epoch: local.local_irreversible_epoch,
},
);
}
}

Expand Down Expand Up @@ -542,7 +524,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
fn update_peer_sync_state(
&mut self,
peer_id: &PeerId,
local_sync_info: &SyncInfo,
local_sync_info: &LocalSyncInfo,
remote_sync_info: &SyncInfo,
sync_type: &PeerSyncType,
) -> bool {
Expand Down
100 changes: 88 additions & 12 deletions beacon_node/network/src/sync/peer_sync_info.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use super::manager::SLOT_IMPORT_TOLERANCE;
use crate::status::ToStatusMessage;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use lighthouse_network::{SyncInfo, SyncStatus as PeerSyncStatus};
use std::cmp::Ordering;
use types::{Epoch, EthSpec, Hash256, Slot};

/// The type of peer relative to our current state.
pub enum PeerSyncType {
/// The peer is on our chain and is fully synced with respect to our chain.
FullySynced,
/// The peer has a greater knowledge of the chain than us that warrants a full sync.
Advanced,
Advanced(PeerSyncTypeAdvanced),
/// A peer is behind in the sync and not useful to us for downloading blocks.
Behind,
}
Expand All @@ -18,13 +20,52 @@ impl PeerSyncType {
match self {
PeerSyncType::FullySynced => PeerSyncStatus::Synced { info: info.clone() },
PeerSyncType::Behind => PeerSyncStatus::Behind { info: info.clone() },
PeerSyncType::Advanced => PeerSyncStatus::Advanced { info: info.clone() },
PeerSyncType::Advanced(_) => PeerSyncStatus::Advanced { info: info.clone() },
}
}
}

pub enum PeerSyncTypeAdvanced {
Finalized {
target_slot: Slot,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs comment

target_root: Hash256,
start_epoch: Epoch,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be per-peer?

},
Head {
target_slot: Slot,
target_root: Hash256,
start_epoch: Epoch,
},
}

#[derive(Clone)]
pub(crate) struct LocalSyncInfo {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs a comment

pub head_slot: Slot,
pub finalized_epoch: Epoch,
pub local_irreversible_epoch: Epoch,
}

impl LocalSyncInfo {
pub fn new<T: BeaconChainTypes>(chain: &BeaconChain<T>) -> Self {
let status = chain.status_message();
// Max with the store in case the node has triggered manual finalization
let local_irreversible_epoch = std::cmp::max(
chain.head().finalized_checkpoint().epoch,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May as well use status.finalized_epoch() here to avoid potential inconsistencies. It's also likely to be marginally quicker

chain
.store
.get_split_slot()
.epoch(T::EthSpec::slots_per_epoch()),
);
Self {
head_slot: *status.head_slot(),
finalized_epoch: *status.finalized_epoch(),
local_irreversible_epoch,
}
}
}

pub fn remote_sync_type<T: BeaconChainTypes>(
local: &SyncInfo,
local: &LocalSyncInfo,
remote: &SyncInfo,
chain: &BeaconChain<T>,
) -> PeerSyncType {
Expand All @@ -33,6 +74,10 @@ pub fn remote_sync_type<T: BeaconChainTypes>(
let near_range_start = local.head_slot.saturating_sub(SLOT_IMPORT_TOLERANCE);
let near_range_end = local.head_slot.saturating_add(SLOT_IMPORT_TOLERANCE);

// With the remote peer's status message let's figure out if there are enough blocks to discover
// that we trigger sync from them. We don't want to sync any blocks from epochs prior to the
// local irreversible epoch. Our finalized epoch may be less than the local irreversible epoch.

match remote.finalized_epoch.cmp(&local.finalized_epoch) {
Ordering::Less => {
// The node has a lower finalized epoch, their chain is not useful to us. There are two
Expand Down Expand Up @@ -63,24 +108,55 @@ pub fn remote_sync_type<T: BeaconChainTypes>(
{
// This peer has a head ahead enough of ours and we have no knowledge of their best
// block.
PeerSyncType::Advanced
PeerSyncType::Advanced(PeerSyncTypeAdvanced::Head {
target_root: remote.head_root,
target_slot: remote.head_slot,
start_epoch: local.local_irreversible_epoch,
})
} else {
// This peer is either in the tolerance range, or ahead us with an already rejected
// block.
PeerSyncType::FullySynced
}
}
Ordering::Greater => {
if (local.finalized_epoch + 1 == remote.finalized_epoch
&& near_range_start <= remote.head_slot
&& remote.head_slot <= near_range_end)
|| chain.block_is_known_to_fork_choice(&remote.head_root)
{
// This peer is near enough to us to be considered synced, or
// we have already synced up to this peer's head
if chain.block_is_known_to_fork_choice(&remote.head_root) {
// We have already synced up to this peer's head
PeerSyncType::FullySynced
} else {
PeerSyncType::Advanced
let finality_advanced = remote.finalized_epoch > local.finalized_epoch + 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A comment or a const for this + 1 would be good I think

let head_advanced = remote.head_slot > near_range_end;
let finality_ahead_local_irreversible =
remote.finalized_epoch > local.local_irreversible_epoch;

if finality_advanced {
if finality_ahead_local_irreversible {
PeerSyncType::Advanced(PeerSyncTypeAdvanced::Finalized {
target_root: remote.finalized_root,
target_slot: remote
.finalized_epoch
.start_slot(T::EthSpec::slots_per_epoch()),
start_epoch: local.local_irreversible_epoch,
})
} else if head_advanced {
PeerSyncType::Advanced(PeerSyncTypeAdvanced::Head {
target_root: remote.head_root,
target_slot: remote.head_slot,
start_epoch: local.local_irreversible_epoch,
})
} else {
PeerSyncType::FullySynced
}
} else if head_advanced {
PeerSyncType::Advanced(PeerSyncTypeAdvanced::Head {
target_root: remote.head_root,
target_slot: remote.head_slot,
start_epoch: local.local_irreversible_epoch,
})
} else {
// This peer is near enough to us to be considered synced
PeerSyncType::FullySynced
}
}
}
}
Expand Down
58 changes: 27 additions & 31 deletions beacon_node/network/src/sync/range_sync/chain_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ use super::chain::{ChainId, ProcessingResult, RemoveChain, SyncingChain};
use super::sync_type::RangeSyncType;
use crate::metrics;
use crate::sync::network_context::SyncNetworkContext;
use crate::sync::peer_sync_info::LocalSyncInfo;
use crate::sync::range_sync::range::AwaitingHeadPeers;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use fnv::FnvHashMap;
use lighthouse_network::PeerId;
use lighthouse_network::SyncInfo;
use lighthouse_network::service::api_types::Id;
use logging::crit;
use smallvec::SmallVec;
use std::collections::HashMap;
use std::collections::hash_map::Entry;
use std::sync::Arc;
use tracing::{debug, error};
Expand Down Expand Up @@ -193,24 +193,18 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
pub fn update(
&mut self,
network: &mut SyncNetworkContext<T>,
local: &SyncInfo,
awaiting_head_peers: &mut HashMap<PeerId, SyncInfo>,
local: &LocalSyncInfo,
awaiting_head_peers: &mut AwaitingHeadPeers,
) {
// Remove any outdated finalized/head chains
self.purge_outdated_chains(local, awaiting_head_peers);

let local_head_epoch = local.head_slot.epoch(T::EthSpec::slots_per_epoch());
// Choose the best finalized chain if one needs to be selected.
self.update_finalized_chains(network, local.finalized_epoch, local_head_epoch);
self.update_finalized_chains(network, local);

if !matches!(self.state, RangeSyncState::Finalized(_)) {
// Handle head syncing chains if there are no finalized chains left.
self.update_head_chains(
network,
local.finalized_epoch,
local_head_epoch,
awaiting_head_peers,
);
self.update_head_chains(network, local, awaiting_head_peers);
}
}

Expand Down Expand Up @@ -253,8 +247,7 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
fn update_finalized_chains(
&mut self,
network: &mut SyncNetworkContext<T>,
local_epoch: Epoch,
local_head_epoch: Epoch,
local: &LocalSyncInfo,
) {
// Find the chain with most peers and check if it is already syncing
if let Some((mut new_id, max_peers)) = self
Expand Down Expand Up @@ -303,8 +296,11 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
// update the state to a new finalized state
self.state = RangeSyncState::Finalized(new_id);

if let Err(remove_reason) = chain.start_syncing(network, local_epoch, local_head_epoch)
{
if let Err(remove_reason) = chain.start_syncing(
network,
local.local_irreversible_epoch,
local.head_slot.epoch(T::EthSpec::slots_per_epoch()),
) {
if remove_reason.is_critical() {
crit!(chain = new_id, reason = ?remove_reason, "Chain removed while switching chains");
} else {
Expand All @@ -321,17 +317,16 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
fn update_head_chains(
&mut self,
network: &mut SyncNetworkContext<T>,
local_epoch: Epoch,
local_head_epoch: Epoch,
awaiting_head_peers: &mut HashMap<PeerId, SyncInfo>,
local: &LocalSyncInfo,
awaiting_head_peers: &mut AwaitingHeadPeers,
) {
// Include the awaiting head peers
for (peer_id, peer_sync_info) in awaiting_head_peers.drain() {
for (peer_id, (target_root, target_slot)) in awaiting_head_peers.drain() {
debug!("including head peer");
self.add_peer_or_create_chain(
local_epoch,
peer_sync_info.head_root,
peer_sync_info.head_slot,
local.local_irreversible_epoch,
target_root,
target_slot,
peer_id,
RangeSyncType::Head,
network,
Expand Down Expand Up @@ -361,9 +356,11 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
if !chain.is_syncing() {
debug!(id = chain.id(), "New head chain started syncing");
}
if let Err(remove_reason) =
chain.start_syncing(network, local_epoch, local_head_epoch)
{
if let Err(remove_reason) = chain.start_syncing(
network,
local.local_irreversible_epoch,
local.head_slot.epoch(T::EthSpec::slots_per_epoch()),
) {
self.head_chains.remove(&id);
if remove_reason.is_critical() {
crit!(chain = id, reason = ?remove_reason, "Chain removed while switching head chains");
Expand Down Expand Up @@ -396,8 +393,8 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
/// finalized block slot. Peers that would create outdated chains are removed too.
pub fn purge_outdated_chains(
&mut self,
local_info: &SyncInfo,
awaiting_head_peers: &mut HashMap<PeerId, SyncInfo>,
local_info: &LocalSyncInfo,
awaiting_head_peers: &mut AwaitingHeadPeers,
) {
let local_finalized_slot = local_info
.finalized_epoch
Expand All @@ -411,9 +408,8 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
};

// Retain only head peers that remain relevant
awaiting_head_peers.retain(|_peer_id, peer_sync_info| {
!is_outdated(&peer_sync_info.head_slot, &peer_sync_info.head_root)
});
awaiting_head_peers
.retain(|_peer_id, (target_root, target_slot)| !is_outdated(target_slot, target_root));

// Remove chains that are out-dated
let mut removed_chains = Vec::new();
Expand Down
Loading