Skip to content

Commit 2177cad

Browse files
committed
Add prefix_mapping to Discovery
1 parent 115416e commit 2177cad

File tree

2 files changed

+84
-7
lines changed

2 files changed

+84
-7
lines changed

beacon_node/lighthouse_network/src/discovery/mod.rs

Lines changed: 76 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ mod prefix_mapping;
5252
mod subnet_predicate;
5353

5454
use crate::discovery::enr::{NEXT_FORK_DIGEST_ENR_KEY, PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY};
55+
use crate::discovery::prefix_mapping::PrefixMapping;
5556
pub use subnet_predicate::subnet_predicate;
5657
use types::non_zero_usize::new_non_zero_usize;
5758

@@ -103,6 +104,19 @@ struct SubnetQuery {
103104
subnet: Subnet,
104105
min_ttl: Option<Instant>,
105106
retries: usize,
107+
target: QueryTarget,
108+
}
109+
110+
/// Target for a peer discovery query.
111+
///
112+
/// Specifies which peers to search for during a discovery query.
113+
#[derive(Debug, Clone, PartialEq)]
114+
pub enum QueryTarget {
115+
/// Query for random peers in the network.
116+
Random,
117+
/// Query for peers with specific node ID prefixes.
118+
/// Used for deterministic subnet peer discovery to find peers in specific DHT keyspace regions.
119+
Prefix(Vec<NodeId>),
106120
}
107121

108122
impl SubnetQuery {
@@ -127,6 +141,7 @@ impl std::fmt::Debug for SubnetQuery {
127141
.field("subnet", &self.subnet)
128142
.field("min_ttl_secs", &min_ttl_secs)
129143
.field("retries", &self.retries)
144+
.field("target", &self.target)
130145
.finish()
131146
}
132147
}
@@ -197,6 +212,11 @@ pub struct Discovery<E: EthSpec> {
197212
update_ports: UpdatePorts,
198213

199214
spec: Arc<ChainSpec>,
215+
216+
/// Mapping from attestation subnet IDs to DHT key prefixes.
217+
/// Used for deterministic subnet peer discovery to target specific regions of the DHT keyspace
218+
/// when searching for subnet peers.
219+
prefix_mapping: PrefixMapping,
200220
}
201221

202222
impl<E: EthSpec> Discovery<E> {
@@ -330,6 +350,7 @@ impl<E: EthSpec> Discovery<E> {
330350
update_ports,
331351
enr_dir,
332352
spec: Arc::new(spec.clone()),
353+
prefix_mapping: PrefixMapping::new(spec.clone()),
333354
})
334355
}
335356

@@ -375,7 +396,30 @@ impl<E: EthSpec> Discovery<E> {
375396
);
376397

377398
for subnet in subnets_to_discover {
378-
self.add_subnet_query(subnet.subnet, subnet.min_ttl, 0);
399+
let query_target = match subnet.subnet {
400+
Subnet::Attestation(subnet_id) => {
401+
match self.prefix_mapping.get_prefixed_node_ids(&subnet_id) {
402+
Ok(node_ids) if node_ids.is_empty() => {
403+
warn!(
404+
?subnet_id,
405+
"No NodeIds given for prefix search, falling back to random search."
406+
);
407+
QueryTarget::Random
408+
}
409+
Ok(node_ids) => QueryTarget::Prefix(node_ids),
410+
Err(error) => {
411+
warn!(
412+
?error,
413+
?subnet_id,
414+
"Failed to get NodeIds for prefix search, falling back to random search."
415+
);
416+
QueryTarget::Random
417+
}
418+
}
419+
}
420+
Subnet::SyncCommittee(_) | Subnet::DataColumn(_) => QueryTarget::Random,
421+
};
422+
self.add_subnet_query(subnet.subnet, subnet.min_ttl, 0, query_target);
379423
}
380424
}
381425

@@ -660,7 +704,13 @@ impl<E: EthSpec> Discovery<E> {
660704

661705
/// Adds a subnet query if one doesn't exist. If a subnet query already exists, this
662706
/// updates the min_ttl field.
663-
fn add_subnet_query(&mut self, subnet: Subnet, min_ttl: Option<Instant>, retries: usize) {
707+
fn add_subnet_query(
708+
&mut self,
709+
subnet: Subnet,
710+
min_ttl: Option<Instant>,
711+
retries: usize,
712+
target: QueryTarget,
713+
) {
664714
// remove the entry and complete the query if greater than the maximum search count
665715
if retries > MAX_DISCOVERY_RETRY {
666716
debug!("Subnet peer discovery did not find sufficient peers. Reached max retry limit");
@@ -689,6 +739,7 @@ impl<E: EthSpec> Discovery<E> {
689739
subnet,
690740
min_ttl,
691741
retries,
742+
target,
692743
});
693744
metrics::set_gauge(
694745
&discovery_metrics::DISCOVERY_QUEUE,
@@ -884,7 +935,12 @@ impl<E: EthSpec> Discovery<E> {
884935
"Grouped subnet discovery query yielded no results."
885936
);
886937
queries.iter().for_each(|query| {
887-
self.add_subnet_query(query.subnet, query.min_ttl, query.retries + 1);
938+
self.add_subnet_query(
939+
query.subnet,
940+
query.min_ttl,
941+
query.retries + 1,
942+
query.target.clone(),
943+
);
888944
})
889945
}
890946
Ok(r) => {
@@ -916,7 +972,12 @@ impl<E: EthSpec> Discovery<E> {
916972
v.inc();
917973
}
918974
// A subnet query has completed. Add back to the queue, incrementing retries.
919-
self.add_subnet_query(query.subnet, query.min_ttl, query.retries + 1);
975+
self.add_subnet_query(
976+
query.subnet,
977+
query.min_ttl,
978+
query.retries + 1,
979+
query.target.clone(),
980+
);
920981

921982
// Check the specific subnet against the enr
922983
let subnet_predicate =
@@ -1283,17 +1344,24 @@ mod tests {
12831344
subnet: Subnet::Attestation(SubnetId::new(1)),
12841345
min_ttl: Some(now),
12851346
retries: 0,
1347+
target: QueryTarget::Random,
12861348
};
12871349
discovery.add_subnet_query(
12881350
subnet_query.subnet,
12891351
subnet_query.min_ttl,
12901352
subnet_query.retries,
1353+
subnet_query.target.clone(),
12911354
);
12921355
assert_eq!(discovery.queued_queries.back(), Some(&subnet_query));
12931356

12941357
// New query should replace old query
12951358
subnet_query.min_ttl = Some(now + Duration::from_secs(1));
1296-
discovery.add_subnet_query(subnet_query.subnet, subnet_query.min_ttl, 1);
1359+
discovery.add_subnet_query(
1360+
subnet_query.subnet,
1361+
subnet_query.min_ttl,
1362+
1,
1363+
subnet_query.target.clone(),
1364+
);
12971365

12981366
subnet_query.retries += 1;
12991367

@@ -1309,6 +1377,7 @@ mod tests {
13091377
subnet_query.subnet,
13101378
subnet_query.min_ttl,
13111379
MAX_DISCOVERY_RETRY + 1,
1380+
subnet_query.target,
13121381
);
13131382

13141383
assert_eq!(discovery.queued_queries.len(), 0);
@@ -1344,11 +1413,13 @@ mod tests {
13441413
subnet: Subnet::Attestation(SubnetId::new(1)),
13451414
min_ttl: instant1,
13461415
retries: 0,
1416+
target: QueryTarget::Random,
13471417
},
13481418
SubnetQuery {
13491419
subnet: Subnet::Attestation(SubnetId::new(2)),
13501420
min_ttl: instant2,
13511421
retries: 0,
1422+
target: QueryTarget::Random,
13521423
},
13531424
]);
13541425

beacon_node/lighthouse_network/src/discovery/prefix_mapping.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use alloy_primitives::U256;
22
use discv5::enr::NodeId;
3+
use rand::prelude::SliceRandom;
34
use std::collections::HashMap;
45
use types::{ChainSpec, SubnetId};
56

@@ -29,7 +30,7 @@ impl PrefixMapping {
2930
// prefix bits.
3031
let mask = U256::from(2_i32.pow(prefix_bits) - 1) << (256 - prefix_bits);
3132

32-
Ok(self
33+
let mut node_ids = self
3334
.mapping
3435
.get(subnet_id)
3536
.ok_or("No prefix mapping for subnet_id")?
@@ -46,7 +47,12 @@ impl PrefixMapping {
4647

4748
NodeId::from(raw_node_id)
4849
})
49-
.collect::<Vec<_>>())
50+
.collect::<Vec<_>>();
51+
// Shuffle the order of `NodeId`s to avoid always querying the same prefixes first and to
52+
// distribute discovery queries more evenly across the keyspace.
53+
node_ids.shuffle(&mut rand::rng());
54+
55+
Ok(node_ids)
5056
}
5157
}
5258

0 commit comments

Comments
 (0)