Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions beacon_node/lighthouse_network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ pub struct Config {

/// Flag for advertising a fake CGC to peers for testing ONLY.
pub advertise_false_custody_group_count: Option<u64>,

/// Whether prefix-based attestation subnet peer discovery is enabled.
pub prefix_search_enabled: bool,
}

impl Config {
Expand Down Expand Up @@ -364,6 +367,7 @@ impl Default for Config {
inbound_rate_limiter_config: None,
idontwant_message_size_threshold: DEFAULT_IDONTWANT_MESSAGE_SIZE_THRESHOLD,
advertise_false_custody_group_count: None,
prefix_search_enabled: false,
}
}
}
Expand Down
171 changes: 157 additions & 14 deletions beacon_node/lighthouse_network/src/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,11 @@ use tokio::sync::mpsc;
use tracing::{debug, error, info, trace, warn};
use types::{ChainSpec, EnrForkId, EthSpec};

mod prefix_mapping;
mod subnet_predicate;

use crate::discovery::enr::{NEXT_FORK_DIGEST_ENR_KEY, PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY};
use crate::discovery::prefix_mapping::PrefixMapping;
pub use subnet_predicate::subnet_predicate;
use types::non_zero_usize::new_non_zero_usize;

Expand Down Expand Up @@ -101,6 +104,19 @@ struct SubnetQuery {
subnet: Subnet,
min_ttl: Option<Instant>,
retries: usize,
target: QueryTarget,
}

/// Target for a peer discovery query.
///
/// Specifies which peers to search for during a discovery query.
#[derive(Debug, Clone, PartialEq)]
pub enum QueryTarget {
/// Query for random peers in the network.
Random,
/// Query for peers with specific node ID prefixes.
/// Used for deterministic subnet peer discovery to find peers in specific DHT keyspace regions.
Prefix(Vec<NodeId>),
}

impl SubnetQuery {
Expand All @@ -125,6 +141,7 @@ impl std::fmt::Debug for SubnetQuery {
.field("subnet", &self.subnet)
.field("min_ttl_secs", &min_ttl_secs)
.field("retries", &self.retries)
.field("target", &self.target)
.finish()
}
}
Expand Down Expand Up @@ -195,6 +212,14 @@ pub struct Discovery<E: EthSpec> {
update_ports: UpdatePorts,

spec: Arc<ChainSpec>,

/// Whether prefix-based attestation subnet peer discovery is enabled.
prefix_search_enabled: bool,

/// Mapping from attestation subnet IDs to DHT key prefixes.
/// Used for deterministic subnet peer discovery to target specific regions of the DHT keyspace
/// when searching for subnet peers.
prefix_mapping: PrefixMapping,
}

impl<E: EthSpec> Discovery<E> {
Expand Down Expand Up @@ -328,6 +353,8 @@ impl<E: EthSpec> Discovery<E> {
update_ports,
enr_dir,
spec: Arc::new(spec.clone()),
prefix_search_enabled: config.prefix_search_enabled,
prefix_mapping: PrefixMapping::new(spec.clone()),
})
}

Expand Down Expand Up @@ -358,7 +385,9 @@ impl<E: EthSpec> Discovery<E> {
let target_peers = std::cmp::min(FIND_NODE_QUERY_CLOSEST_PEERS, target_peers);
debug!(target_peers, "Starting a peer discovery request");
self.find_peer_active = true;
self.start_query(QueryType::FindPeers, target_peers, |_| true);
self.start_query(QueryType::FindPeers, target_peers, NodeId::random(), |_| {
true
});
}

/// Processes a request to search for more peers on a subnet.
Expand All @@ -371,8 +400,35 @@ impl<E: EthSpec> Discovery<E> {
subnets = ?subnets_to_discover.iter().map(|s| s.subnet).collect::<Vec<_>>(),
"Starting discovery query for subnets"
);

for subnet in subnets_to_discover {
self.add_subnet_query(subnet.subnet, subnet.min_ttl, 0);
let query_target = if self.prefix_search_enabled
&& let Subnet::Attestation(subnet_id) = &subnet.subnet
{
// Use prefix-based peer discovery for attestation subnets.
match self.prefix_mapping.get_prefixed_node_ids(subnet_id) {
Ok(node_ids) if node_ids.is_empty() => {
warn!(
?subnet_id,
"No NodeIds provided for prefix search, falling back to random search."
);
QueryTarget::Random
}
Ok(node_ids) => QueryTarget::Prefix(node_ids),
Err(error) => {
warn!(
?error,
?subnet_id,
"Failed to get NodeIds for prefix search, falling back to random search."
);
QueryTarget::Random
}
}
} else {
QueryTarget::Random
};

self.add_subnet_query(subnet.subnet, subnet.min_ttl, 0, query_target);
}
}

Expand Down Expand Up @@ -657,7 +713,13 @@ impl<E: EthSpec> Discovery<E> {

/// Adds a subnet query if one doesn't exist. If a subnet query already exists, this
/// updates the min_ttl field.
fn add_subnet_query(&mut self, subnet: Subnet, min_ttl: Option<Instant>, retries: usize) {
fn add_subnet_query(
&mut self,
subnet: Subnet,
min_ttl: Option<Instant>,
retries: usize,
target: QueryTarget,
) {
// remove the entry and complete the query if greater than the maximum search count
if retries > MAX_DISCOVERY_RETRY {
debug!("Subnet peer discovery did not find sufficient peers. Reached max retry limit");
Expand Down Expand Up @@ -686,6 +748,7 @@ impl<E: EthSpec> Discovery<E> {
subnet,
min_ttl,
retries,
target,
});
metrics::set_gauge(
&discovery_metrics::DISCOVERY_QUEUE,
Expand Down Expand Up @@ -782,16 +845,75 @@ impl<E: EthSpec> Discovery<E> {

// Only start a discovery query if we have a subnet to look for.
if !filtered_subnet_queries.is_empty() {
// build the subnet predicate as a combination of the eth2_fork_predicate and the subnet predicate
let subnet_predicate = subnet_predicate::<E>(filtered_subnets, self.spec.clone());
// Split queries into random and prefix-based searches.
let (random_search_queries, prefix_search_queries): (Vec<_>, Vec<_>) =
filtered_subnet_queries
.into_iter()
.partition(|q| q.target == QueryTarget::Random);

if !random_search_queries.is_empty() {
self.random_search(random_search_queries);
}

debug!(
subnets = ?filtered_subnet_queries,
"Starting grouped subnet query"
);
if !prefix_search_queries.is_empty() {
self.prefix_search(prefix_search_queries);
}
}
}

/// Starts a grouped discovery query for the given subnets using a random NodeId as the target.
/// This performs a single discovery query that searches for peers across all specified subnets
/// simultaneously, using a random target node in the DHT keyspace.
fn random_search(&mut self, subnet_queries: Vec<SubnetQuery>) {
// build the subnet predicate as a combination of the eth2_fork_predicate and the subnet predicate
let subnet_predicate = subnet_predicate::<E>(
subnet_queries.iter().map(|q| q.subnet).collect::<Vec<_>>(),
self.spec.clone(),
);
debug!(
subnets = ?subnet_queries,
"Starting grouped subnet query"
);
self.start_query(
QueryType::Subnet(subnet_queries),
TARGET_PEERS_FOR_GROUPED_QUERY,
NodeId::random(),
subnet_predicate,
);
}

/// Starts individual discovery queries for each subnet using specific NodeId prefixes as
/// targets.
/// Each query targets a specific region of the DHT keyspace based on the prefix mapping,
/// allowing deterministic subnet peer discovery.
fn prefix_search(&mut self, subnet_queries: Vec<SubnetQuery>) {
for query in subnet_queries {
let target_node = match &query.target {
QueryTarget::Random => {
warn!(
?query,
"Unexpected QueryTarget::Random in prefix_search, using random NodeId instead.",
);
NodeId::random()
}
QueryTarget::Prefix(node_ids) => {
if node_ids.is_empty() {
warn!(
?query,
"Empty node_ids in prefix search, using random NodeId instead.",
);
NodeId::random()
} else {
node_ids[query.retries % node_ids.len()]
}
}
};
let subnet_predicate = subnet_predicate::<E>(vec![query.subnet], self.spec.clone());
debug!(?query, "Starting prefix search query",);
self.start_query(
QueryType::Subnet(filtered_subnet_queries),
QueryType::Subnet(vec![query]),
TARGET_PEERS_FOR_GROUPED_QUERY,
target_node,
subnet_predicate,
);
}
Expand All @@ -806,6 +928,7 @@ impl<E: EthSpec> Discovery<E> {
&mut self,
query: QueryType,
target_peers: usize,
target_node: NodeId,
additional_predicate: impl Fn(&Enr) -> bool + Send + 'static,
) {
let enr_fork_id = match self.local_enr().eth2() {
Expand All @@ -832,7 +955,7 @@ impl<E: EthSpec> Discovery<E> {
let query_future = self
.discv5
// Generate a random target node id.
.find_node_predicate(NodeId::random(), predicate, target_peers)
.find_node_predicate(target_node, predicate, target_peers)
.map(|v| QueryResult {
query_type: query,
result: v,
Expand Down Expand Up @@ -881,7 +1004,12 @@ impl<E: EthSpec> Discovery<E> {
"Grouped subnet discovery query yielded no results."
);
queries.iter().for_each(|query| {
self.add_subnet_query(query.subnet, query.min_ttl, query.retries + 1);
self.add_subnet_query(
query.subnet,
query.min_ttl,
query.retries + 1,
query.target.clone(),
);
})
}
Ok(r) => {
Expand Down Expand Up @@ -913,7 +1041,12 @@ impl<E: EthSpec> Discovery<E> {
v.inc();
}
// A subnet query has completed. Add back to the queue, incrementing retries.
self.add_subnet_query(query.subnet, query.min_ttl, query.retries + 1);
self.add_subnet_query(
query.subnet,
query.min_ttl,
query.retries + 1,
query.target.clone(),
);

// Check the specific subnet against the enr
let subnet_predicate =
Expand Down Expand Up @@ -1280,17 +1413,24 @@ mod tests {
subnet: Subnet::Attestation(SubnetId::new(1)),
min_ttl: Some(now),
retries: 0,
target: QueryTarget::Random,
};
discovery.add_subnet_query(
subnet_query.subnet,
subnet_query.min_ttl,
subnet_query.retries,
subnet_query.target.clone(),
);
assert_eq!(discovery.queued_queries.back(), Some(&subnet_query));

// New query should replace old query
subnet_query.min_ttl = Some(now + Duration::from_secs(1));
discovery.add_subnet_query(subnet_query.subnet, subnet_query.min_ttl, 1);
discovery.add_subnet_query(
subnet_query.subnet,
subnet_query.min_ttl,
1,
subnet_query.target.clone(),
);

subnet_query.retries += 1;

Expand All @@ -1306,6 +1446,7 @@ mod tests {
subnet_query.subnet,
subnet_query.min_ttl,
MAX_DISCOVERY_RETRY + 1,
subnet_query.target,
);

assert_eq!(discovery.queued_queries.len(), 0);
Expand Down Expand Up @@ -1341,11 +1482,13 @@ mod tests {
subnet: Subnet::Attestation(SubnetId::new(1)),
min_ttl: instant1,
retries: 0,
target: QueryTarget::Random,
},
SubnetQuery {
subnet: Subnet::Attestation(SubnetId::new(2)),
min_ttl: instant2,
retries: 0,
target: QueryTarget::Random,
},
]);

Expand Down
Loading
Loading