Skip to content

Commit 75dd479

Browse files
sanityclaude
andauthored
fix: topology manager uses actual connection count instead of filtered count (#1963)
Co-authored-by: Claude <noreply@anthropic.com>
1 parent fffb2c3 commit 75dd479

File tree

2 files changed

+43
-17
lines changed

2 files changed

+43
-17
lines changed

crates/core/src/ring/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,7 @@ impl Ring {
513513
&neighbor_locations,
514514
&self.connection_manager.own_location().location,
515515
Instant::now(),
516+
current_connections,
516517
);
517518

518519
tracing::info!(

crates/core/src/topology/mod.rs

Lines changed: 42 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,7 @@ impl TopologyManager {
283283
neighbor_locations: &BTreeMap<Location, Vec<Connection>>,
284284
my_location: &Option<Location>,
285285
at_time: Instant,
286+
current_connections: usize,
286287
) -> TopologyAdjustment {
287288
#[cfg(debug_assertions)]
288289
{
@@ -294,21 +295,22 @@ impl TopologyManager {
294295
{
295296
LAST_LOG.with(|last_log| {
296297
tracing::trace!(
297-
"Adjusting topology at {:?}. Current neighbors: {:?}",
298+
"Adjusting topology at {:?}. Current connections: {}, Filtered neighbors: {}",
298299
at_time,
300+
current_connections,
299301
neighbor_locations.len()
300302
);
301303
*last_log.borrow_mut() = Instant::now();
302304
});
303305
}
304306
}
305307

306-
if neighbor_locations.len() < self.limits.min_connections {
308+
if current_connections < self.limits.min_connections {
307309
let mut locations = Vec::new();
308-
let below_threshold = self.limits.min_connections - neighbor_locations.len();
310+
let below_threshold = self.limits.min_connections - current_connections;
309311
if below_threshold > 0 {
310312
// If we have no connections at all, bootstrap by targeting own location
311-
if neighbor_locations.is_empty() {
313+
if current_connections == 0 {
312314
match my_location {
313315
Some(location) => {
314316
// The first connect message should target the peer's own
@@ -331,7 +333,7 @@ impl TopologyManager {
331333
LAST_LOG.with(|last_log| {
332334
tracing::trace!(
333335
minimum_num_peers_hard_limit = self.limits.min_connections,
334-
num_peers = neighbor_locations.len(),
336+
num_peers = current_connections,
335337
to_add = below_threshold,
336338
"Bootstrap: adding first connection at own location"
337339
);
@@ -341,7 +343,7 @@ impl TopologyManager {
341343
}
342344
}
343345
// If we have 1-4 connections, use random locations for diversity
344-
else if neighbor_locations.len() < 5 {
346+
else if current_connections < 5 {
345347
for _i in 0..below_threshold {
346348
locations.push(Location::random());
347349
}
@@ -356,7 +358,7 @@ impl TopologyManager {
356358
LAST_LOG.with(|last_log| {
357359
tracing::trace!(
358360
minimum_num_peers_hard_limit = self.limits.min_connections,
359-
num_peers = neighbor_locations.len(),
361+
num_peers = current_connections,
360362
to_add = below_threshold,
361363
"Early stage: adding connections at random locations for diversity"
362364
);
@@ -381,6 +383,16 @@ impl TopologyManager {
381383
return TopologyAdjustment::AddConnections(locations);
382384
}
383385

386+
// Skip resource-based removal in very small networks to avoid destabilizing them
387+
// During startup or in small test networks, we need stability more than optimization
388+
if current_connections < 5 {
389+
debug!(
390+
"Skipping resource-based topology adjustment for small network (connections: {})",
391+
current_connections
392+
);
393+
return TopologyAdjustment::NoChange;
394+
}
395+
384396
let increase_usage_if_below: RateProportion =
385397
RateProportion::new(MINIMUM_DESIRED_RESOURCE_USAGE_PROPORTION);
386398
let decrease_usage_if_above: RateProportion =
@@ -392,11 +404,10 @@ impl TopologyManager {
392404
debug!(?usage_proportion, "Resource usage information");
393405

394406
let adjustment: anyhow::Result<TopologyAdjustment> =
395-
if neighbor_locations.len() > self.limits.max_connections {
407+
if current_connections > self.limits.max_connections {
396408
debug!(
397-
"Number of neighbors ({:?}) is above maximum ({:?}), removing connections",
398-
neighbor_locations.len(),
399-
self.limits.max_connections
409+
"Number of connections ({:?}) is above maximum ({:?}), removing connections",
410+
current_connections, self.limits.max_connections
400411
);
401412

402413
self.update_connection_acquisition_strategy(ConnectionAcquisitionStrategy::Slow);
@@ -721,8 +732,12 @@ mod tests {
721732
neighbor_locations.insert(peer.location.unwrap(), vec![]);
722733
}
723734

724-
let adjustment =
725-
resource_manager.adjust_topology(&neighbor_locations, &None, Instant::now());
735+
let adjustment = resource_manager.adjust_topology(
736+
&neighbor_locations,
737+
&None,
738+
Instant::now(),
739+
peers.len(),
740+
);
726741
match adjustment {
727742
TopologyAdjustment::RemoveConnections(peers) => {
728743
assert_eq!(peers.len(), 1);
@@ -766,8 +781,12 @@ mod tests {
766781
neighbor_locations.insert(peer.location.unwrap(), vec![]);
767782
}
768783

769-
let adjustment =
770-
resource_manager.adjust_topology(&neighbor_locations, &None, Instant::now());
784+
let adjustment = resource_manager.adjust_topology(
785+
&neighbor_locations,
786+
&None,
787+
Instant::now(),
788+
peers.len(),
789+
);
771790

772791
match adjustment {
773792
TopologyAdjustment::AddConnections(locations) => {
@@ -807,8 +826,12 @@ mod tests {
807826
neighbor_locations.insert(peer.location.unwrap(), vec![]);
808827
}
809828

810-
let adjustment =
811-
resource_manager.adjust_topology(&neighbor_locations, &None, report_time);
829+
let adjustment = resource_manager.adjust_topology(
830+
&neighbor_locations,
831+
&None,
832+
report_time,
833+
peers.len(),
834+
);
812835

813836
match adjustment {
814837
TopologyAdjustment::NoChange => {}
@@ -848,6 +871,7 @@ mod tests {
848871
&neighbor_locations,
849872
&Some(my_location),
850873
report_time,
874+
peers.len(),
851875
);
852876

853877
match adjustment {
@@ -992,6 +1016,7 @@ mod tests {
9921016
&neighbor_locations,
9931017
&Some(Location::new(0.5)),
9941018
Instant::now(),
1019+
1, // 1 current connection
9951020
);
9961021

9971022
match adjustment {

0 commit comments

Comments
 (0)