From da22ce6a611a20e2ec078f8ba347ca9abf5aaff0 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 19 Aug 2025 11:06:35 +0200 Subject: [PATCH 1/4] WIP Use latest connection pool Somehow hangs still! --- Cargo.lock | 2 +- src/lib.rs | 113 ++++++++++++++++++--------------------------------- src/tests.rs | 14 ++----- 3 files changed, 44 insertions(+), 85 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0f67a78..59fdcdb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1575,7 +1575,7 @@ dependencies = [ [[package]] name = "iroh-connection-pool" version = "0.1.0" -source = "git+https://github.com/n0-computer/iroh-experiments?branch=handler-pool#2ba54259497187ef8d7e6a4488882045e1366ee1" +source = "git+https://github.com/n0-computer/iroh-experiments?branch=handler-pool#b6b7dee154623bb03b30dcb4647a4acddad4fae2" dependencies = [ "iroh", "n0-future 0.2.0", diff --git a/src/lib.rs b/src/lib.rs index d996b52..95abb00 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -79,7 +79,6 @@ use irpc::channel::{mpsc, oneshot}; use n0_future::{BufferedStreamExt, MaybeFuture, StreamExt, stream}; use rand::{Rng, SeedableRng, rngs::StdRng, seq::index::sample}; use serde::{Deserialize, Serialize}; -use snafu::Snafu; use tokio::task::JoinSet; #[cfg(test)] mod tests; @@ -775,7 +774,7 @@ mod routing { use crate::{ api::{ApiMessage, Lookup, NetworkGet, NetworkPut}, - pool::{ClientPool, PoolError}, + pool::ClientPool, routing::{ALPHA, BUCKET_COUNT, Buckets, Distance, K, NodeInfo, RoutingTable}, rpc::{Id, Kind, RpcClient, RpcMessage, SetResponse, Value}, u256::U256, @@ -988,11 +987,13 @@ pub mod pool { //! This can be implemented with an in-memory implementation for tests, //! and with a proper iroh connection pool for real use. + use std::sync::Arc; + use iroh::{ Endpoint, NodeAddr, NodeId, - endpoint::{Connection, RecvStream, SendStream}, + endpoint::{RecvStream, SendStream}, }; - use iroh_connection_pool::connection_pool::ConnectionPool; + use iroh_connection_pool::connection_pool::{ConnectionPool, ConnectionRef}; use snafu::Snafu; use tracing::trace; @@ -1023,16 +1024,7 @@ pub mod pool { /// /// You must not clone the client out of the closure. If you do, this client /// can become unusable at any time! - fn with_client( - &self, - id: NodeId, - f: F, - ) -> impl Future> + Send - where - F: FnOnce(RpcClient) -> Fut + Send + 'static, - Fut: Future> + Send + 'static, - R: Send + 'static, - E: From + Send + 'static; + fn client(&self, id: NodeId) -> impl Future> + Send; } /// Error when a pool can not obtain a client. @@ -1054,7 +1046,7 @@ pub mod pool { } #[derive(Debug, Clone)] - struct IrohConnection(Connection); + struct IrohConnection(Arc); impl irpc::rpc::RemoteConnection for IrohConnection { fn clone_boxed(&self) -> Box { @@ -1105,28 +1097,15 @@ pub mod pool { self.endpoint.add_node_addr_with_source(addr, "").ok(); } - async fn with_client(&self, node_id: NodeId, f: F) -> Result - where - F: FnOnce(RpcClient) -> Fut + Send + 'static, - Fut: Future> + Send + 'static, - R: Send + 'static, - E: From + Send + 'static, - { + async fn client(&self, node_id: NodeId) -> Result { trace!(%node_id, "Connecting"); - let res = self + let connection = self .inner - .with_connection(node_id, move |conn| async move { - trace!(%node_id, "Got connection"); - let client = RpcClient(irpc::Client::boxed(IrohConnection(conn))); - f(client).await - }) - .await; - res.map_err(|_| PoolError { - message: "Connection pool shut down".into(), - })? - .map_err(|e| PoolError { - message: format!("Failed to obtain client: {}", e), - })? + .connect(node_id) + .await + .map_err(|e| format!("Failed to connect: {}", e))?; + let client = RpcClient(irpc::Client::boxed(IrohConnection(Arc::new(connection)))); + Ok(client) } } } @@ -1320,22 +1299,6 @@ impl Default for Config { } } -#[derive(Debug, Snafu)] -enum Error { - #[snafu(transparent)] - Client { source: PoolError }, - #[snafu(transparent)] - Irpc { source: irpc::Error }, -} - -impl From for Error { - fn from(source: irpc::channel::SendError) -> Self { - Self::Irpc { - source: irpc::Error::from(source), - } - } -} - impl

Actor

where P: ClientPool, @@ -1634,14 +1597,13 @@ impl State

{ let value = msg.value.clone(); let tx = tx.clone(); async move { - pool.with_client(id, move |client| async move { - if client.set(msg.id, value).await.is_ok() { - tx.send(id).await?; - } - std::result::Result::<(), Error>::Ok(()) - }) - .await - .ok(); + let Ok(client) = pool.client(id).await else { + return; + }; + if client.set(msg.id, value).await.is_ok() { + tx.send(id).await.ok(); + } + drop(client); } }) .await; @@ -1665,16 +1627,19 @@ impl State

{ n: msg.n, }; async move { - pool.with_client(id, move |client| async move { - // Get all values of the specified kind for the key - let mut rx = client.get_all(msg.id, msg.kind, msg.seed, msg.n).await?; - while let Ok(Some(value)) = rx.recv().await { - tx.send((id, value)).await?; + let Ok(client) = pool.client(id).await else { + return; + }; + // Get all values of the specified kind for the key + let Ok(mut rx) = client.get_all(msg.id, msg.kind, msg.seed, msg.n).await else { + return; + }; + while let Ok(Some(value)) = rx.recv().await { + if tx.send((id, value)).await.is_err() { + break; } - std::result::Result::<(), Error>::Ok(()) - }) - .await - .ok(); + } + drop(client); } }) .await; @@ -1686,14 +1651,16 @@ impl State

{ } else { Some(self.pool.id()) }; - let infos = self + let client = self .pool - .with_client(id, move |client| async move { - let ids = client.find_node(target, requester).await?; - std::result::Result::, Error>::Ok(ids) - }) + .client(id) + .await + .map_err(|_| "Failed to get client")?; + let infos = client + .find_node(target, requester) .await .map_err(|_| "Failed to query node")?; + drop(client); let ids = infos.iter().map(|info| info.node_id).collect(); for info in infos { self.pool.add_node_addr(info); diff --git a/src/tests.rs b/src/tests.rs index 645915f..4faca05 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -23,23 +23,15 @@ struct TestPool { } impl ClientPool for TestPool { - async fn with_client(&self, id: NodeId, f: F) -> Result - where - F: FnOnce(RpcClient) -> Fut + Send + 'static, - Fut: Future> + Send + 'static, - R: Send + 'static, - E: From, - { + async fn client(&self, id: NodeId) -> Result { let client = self .clients .lock() .unwrap() .get(&id) .cloned() - .ok_or(E::from(PoolError { - message: "client not found".into(), - }))?; - f(client).await + .ok_or_else(|| format!("client not found: {}", id))?; + Ok(client) } fn id(&self) -> NodeId { From e05ceb4e76779feb89e475be64c91865affe5ae3 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 19 Aug 2025 11:48:08 +0200 Subject: [PATCH 2/4] WIP add println! --- src/lib.rs | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 95abb00..63acf45 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -69,7 +69,7 @@ //! [irpc blog post]: https://www.iroh.computer/blog/irpc/ use std::{ collections::{BTreeMap, BTreeSet, HashSet, VecDeque}, - time::{Duration, UNIX_EPOCH}, + time::{Duration, Instant, UNIX_EPOCH}, }; use futures_buffered::FuturesUnordered; @@ -848,6 +848,7 @@ mod u256 { } /// Get the number of leading zeros + #[allow(dead_code)] pub fn leading_zeros(&self) -> u32 { let mut count = 0; for &byte in self.0.iter().rev() { @@ -1651,16 +1652,29 @@ impl State

{ } else { Some(self.pool.id()) }; + println!("Querying node {} for target {}", id, target); + let t0 = Instant::now(); let client = self .pool .client(id) .await - .map_err(|_| "Failed to get client")?; + .map_err(|_| "Failed to get client"); + if let Err(e) = &client { + println!("Failed to get client: {e}"); + return Err("Failed to get client"); + } + let client = client?; let infos = client .find_node(target, requester) .await - .map_err(|_| "Failed to query node")?; + .map_err(|_| "Failed to query node"); + if let Err(e) = &infos { + println!("Failed to query node: {e}"); + return Err("Failed to query node"); + } + let infos = infos?; drop(client); + println!("Done with client after {:?}", t0.elapsed()); let ids = infos.iter().map(|info| info.node_id).collect(); for info in infos { self.pool.add_node_addr(info); From ac8fe9eae664f2bf295b1f67ab343075e916ddc4 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 19 Aug 2025 12:01:29 +0200 Subject: [PATCH 3/4] More logging --- src/lib.rs | 10 +++++----- src/tests.rs | 10 +++++++--- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 63acf45..edccd53 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -543,7 +543,7 @@ pub mod api { } } pub use api::ApiClient; -use tracing::{error, warn}; +use tracing::{error, info, warn}; mod routing { use std::{ @@ -1652,7 +1652,7 @@ impl State

{ } else { Some(self.pool.id()) }; - println!("Querying node {} for target {}", id, target); + info!(%id, %target, "Querying node"); let t0 = Instant::now(); let client = self .pool @@ -1660,7 +1660,7 @@ impl State

{ .await .map_err(|_| "Failed to get client"); if let Err(e) = &client { - println!("Failed to get client: {e}"); + info!(%id, "Failed to get client: {e}"); return Err("Failed to get client"); } let client = client?; @@ -1669,12 +1669,12 @@ impl State

{ .await .map_err(|_| "Failed to query node"); if let Err(e) = &infos { - println!("Failed to query node: {e}"); + info!(%id, "Failed to query node: {e}"); return Err("Failed to query node"); } let infos = infos?; drop(client); - println!("Done with client after {:?}", t0.elapsed()); + info!(%id, "Done with client after {:?}", t0.elapsed()); let ids = infos.iter().map(|info| info.node_id).collect(); for info in infos { self.pool.add_node_addr(info); diff --git a/src/tests.rs b/src/tests.rs index 4faca05..e4c6db1 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -197,8 +197,8 @@ async fn store_random_values(nodes: &Nodes, n: usize) -> irpc::Result<()> { let mut common_count = vec![0usize; n]; #[allow(clippy::needless_range_loop)] for i in 0..n { - if nodes.len() > 10000 { - println!("{i}"); + if nodes.len() > 1 { + println!("Value {i}"); } let text = format!("Item {i}"); let expected_ids = expected_ids(&ids, Id::blake3_hash(text.as_bytes()), 20); @@ -483,7 +483,11 @@ async fn iroh_create_nodes( .await?; let addr = endpoint.node_addr().initialized().await; discovery.add_node_info(addr.clone()); - let pool = ConnectionPool::new(endpoint.clone(), DHT_TEST_ALPN, Default::default()); + let pool = ConnectionPool::new(endpoint.clone(), DHT_TEST_ALPN, iroh_connection_pool::connection_pool::Options { + max_connections: 500, + idle_timeout: Duration::from_secs(1), + connect_timeout: Duration::from_secs(1), + }); let pool = IrohPool::new(endpoint.clone(), pool); let bootstrap = (0..n_bootstrap) .map(|i| node_ids[(offfset + i + 1) % n]) From 39bf105c8e0d90396b36f64b139a2787319a1120 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 19 Aug 2025 16:35:09 +0200 Subject: [PATCH 4/4] Use new conn pool with ConnectionRef --- Cargo.lock | 2 +- src/lib.rs | 165 +++++++++++++++++++++++++++++++++++++++------------ src/tests.rs | 53 ++++++++++------- 3 files changed, 159 insertions(+), 61 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 59fdcdb..085ffd8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1575,7 +1575,7 @@ dependencies = [ [[package]] name = "iroh-connection-pool" version = "0.1.0" -source = "git+https://github.com/n0-computer/iroh-experiments?branch=handler-pool#b6b7dee154623bb03b30dcb4647a4acddad4fae2" +source = "git+https://github.com/n0-computer/iroh-experiments?branch=handler-pool#048052f7a9d85e7f6e9756824679c5603a19b7fd" dependencies = [ "iroh", "n0-future 0.2.0", diff --git a/src/lib.rs b/src/lib.rs index edccd53..b65019a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -69,7 +69,8 @@ //! [irpc blog post]: https://www.iroh.computer/blog/irpc/ use std::{ collections::{BTreeMap, BTreeSet, HashSet, VecDeque}, - time::{Duration, Instant, UNIX_EPOCH}, + sync::Arc, + time::{Duration, UNIX_EPOCH}, }; use futures_buffered::FuturesUnordered; @@ -94,7 +95,12 @@ pub mod rpc { //! message type [`RpcMessage`]. //! //! The entry point is [`RpcClient`]. - use std::{fmt, num::NonZeroU64, ops::Deref}; + use std::{ + fmt, + num::NonZeroU64, + ops::Deref, + sync::{Arc, Weak}, + }; use iroh::{Endpoint, NodeAddr, NodeId, PublicKey}; use iroh_base::SignatureError; @@ -303,17 +309,26 @@ pub mod rpc { } #[derive(Debug, Clone)] - pub struct RpcClient(pub(crate) irpc::Client); + pub struct RpcClient(pub(crate) Arc>); + + #[derive(Debug, Clone)] + pub struct WeakRpcClient(pub(crate) Weak>); + + impl WeakRpcClient { + pub fn upgrade(&self) -> Option { + self.0.upgrade().map(RpcClient) + } + } impl RpcClient { pub fn remote(endpoint: Endpoint, id: Id) -> std::result::Result { let id = iroh::NodeId::from_bytes(&id)?; let client = irpc_iroh::client(endpoint, id, ALPN); - Ok(Self(client)) + Ok(Self::new(client)) } pub fn new(client: irpc::Client) -> Self { - Self(client) + Self(Arc::new(client)) } pub async fn set(&self, key: Id, value: Value) -> irpc::Result { @@ -339,6 +354,10 @@ pub mod rpc { ) -> irpc::Result> { self.0.rpc(FindNode { id, requester }).await } + + pub fn downgrade(&self) -> WeakRpcClient { + WeakRpcClient(Arc::downgrade(&self.0)) + } } } @@ -351,7 +370,12 @@ pub mod api { //! message type [`ApiMessage`]. //! //! The entry point is [`ApiClient`]. - use std::{collections::BTreeMap, num::NonZeroU64, time::Duration}; + use std::{ + collections::BTreeMap, + num::NonZeroU64, + sync::{Arc, Weak}, + time::Duration, + }; use iroh::NodeId; use irpc::{ @@ -409,7 +433,7 @@ pub mod api { } #[derive(Debug, Clone)] - pub struct ApiClient(pub(crate) irpc::Client); + pub struct ApiClient(pub(crate) Arc>); impl ApiClient { /// notify the node that we have just seen these nodes. @@ -512,32 +536,65 @@ pub mod api { self.0.rpc(SelfLookup).await.ok(); } + pub async fn random_lookup(&self) { + self.0.rpc(RandomLookup).await.ok(); + } + + pub async fn candidate_lookup(&self) { + self.0.rpc(CandidateLookup).await.ok(); + } + + pub fn downgrade(&self) -> WeakApiClient { + WeakApiClient(Arc::downgrade(&self.0)) + } + } + + #[derive(Debug, Clone)] + pub struct WeakApiClient(pub(crate) Weak>); + + impl WeakApiClient { + pub fn upgrade(&self) -> irpc::Result { + self.0 + .upgrade() + .map(ApiClient) + .ok_or(irpc::Error::Send(irpc::channel::SendError::ReceiverClosed)) + } + + pub async fn nodes_dead(&self, ids: &[NodeId]) -> irpc::Result<()> { + self.upgrade()?.nodes_dead(ids).await + } + + pub async fn nodes_seen(&self, ids: &[NodeId]) -> irpc::Result<()> { + self.upgrade()?.nodes_seen(ids).await + } + pub(crate) async fn self_lookup_periodic(self, interval: Duration) { loop { tokio::time::sleep(interval).await; - self.self_lookup().await; + let Ok(api) = self.upgrade() else { + return; + }; + api.self_lookup().await; } } - pub async fn random_lookup(&self) { - self.0.rpc(RandomLookup).await.ok(); - } - pub(crate) async fn random_lookup_periodic(self, interval: Duration) { loop { tokio::time::sleep(interval).await; - self.random_lookup().await; + let Ok(api) = self.upgrade() else { + return; + }; + api.random_lookup().await; } } - pub async fn candidate_lookup(&self) { - self.0.rpc(CandidateLookup).await.ok(); - } - pub(crate) async fn candidate_lookup_periodic(self, interval: Duration) { loop { - self.candidate_lookup().await; tokio::time::sleep(interval).await; + let Ok(api) = self.upgrade() else { + return; + }; + api.candidate_lookup().await; } } } @@ -773,7 +830,7 @@ mod routing { } use crate::{ - api::{ApiMessage, Lookup, NetworkGet, NetworkPut}, + api::{ApiMessage, Lookup, NetworkGet, NetworkPut, WeakApiClient}, pool::ClientPool, routing::{ALPHA, BUCKET_COUNT, Buckets, Distance, K, NodeInfo, RoutingTable}, rpc::{Id, Kind, RpcClient, RpcMessage, SetResponse, Value}, @@ -988,7 +1045,7 @@ pub mod pool { //! This can be implemented with an in-memory implementation for tests, //! and with a proper iroh connection pool for real use. - use std::sync::Arc; + use std::sync::{Arc, RwLock}; use iroh::{ Endpoint, NodeAddr, NodeId, @@ -996,9 +1053,9 @@ pub mod pool { }; use iroh_connection_pool::connection_pool::{ConnectionPool, ConnectionRef}; use snafu::Snafu; - use tracing::trace; + use tracing::error; - use crate::rpc::RpcClient; + use crate::rpc::{RpcClient, WeakRpcClient}; /// A pool that can efficiently provide clients given a node id, and knows its /// own identity. @@ -1038,11 +1095,28 @@ pub mod pool { pub struct IrohPool { endpoint: Endpoint, inner: ConnectionPool, + self_client: Arc>>, } impl IrohPool { pub fn new(endpoint: Endpoint, inner: ConnectionPool) -> Self { - Self { endpoint, inner } + Self { + endpoint, + inner, + self_client: Arc::new(RwLock::new(None)), + } + } + + /// Iroh connections to self are not allowed. But when storing or getting values, + /// it is perfectly reasonable to have the own id as the best location. + /// + /// To support this seamlessly, this allows creating a client to self. + /// + /// This has to be a weak client since we don't want the pool, which is owned by the + /// node actor, to keep the node actor alive. + pub fn set_self_client(&self, client: Option) { + let mut self_client = self.self_client.write().unwrap(); + *self_client = client; } } @@ -1099,13 +1173,24 @@ pub mod pool { } async fn client(&self, node_id: NodeId) -> Result { - trace!(%node_id, "Connecting"); + if node_id == self.id() { + // If we are trying to connect to ourselves, return the self client if available. + if let Some(client) = self.self_client.read().unwrap().clone() { + return client + .upgrade() + .ok_or_else(|| "Self client is no longer available".to_string()); + } else { + error!("Self client not set"); + return Err("Self client not set".to_string()); + } + } let connection = self .inner .connect(node_id) .await - .map_err(|e| format!("Failed to connect: {}", e))?; - let client = RpcClient(irpc::Client::boxed(IrohConnection(Arc::new(connection)))); + .map_err(|e| format!("Failed to connect: {}", e)); + let connection = connection?; + let client = RpcClient::new(irpc::Client::boxed(IrohConnection(Arc::new(connection)))); Ok(client) } } @@ -1115,7 +1200,7 @@ pub mod pool { #[derive(Debug, Clone)] struct State

{ /// ability to send messages to ourselves, e.g. to update the routing table - api: ApiClient, + api: WeakApiClient, /// client pool pool: P, /// configuration @@ -1311,10 +1396,10 @@ where config: Config, ) -> (Self, ApiClient) { let (api_tx, internal_rx) = tokio::sync::mpsc::channel(32); - let api = ApiClient(api_tx.into()); + let api = ApiClient(Arc::new(api_tx.into())); let mut tasks = JoinSet::new(); let state = State { - api: api.clone(), + api: api.downgrade(), pool, config: config.clone(), }; @@ -1433,6 +1518,9 @@ where // the self lookup won't be very useful. let api = self.state.api.clone(); self.tasks.spawn(async move { + let Ok(api) = api.upgrade() else { + return; + }; api.lookup(id, None).await.ok(); msg.tx.send(()).await.ok(); }); @@ -1452,6 +1540,9 @@ where }; let api = self.state.api.clone(); self.tasks.spawn(async move { + let Ok(api) = api.upgrade() else { + return; + }; api.lookup(id, None).await.ok(); msg.tx.send(()).await.ok(); }); @@ -1480,6 +1571,9 @@ where }) .collect::>(); self.tasks.spawn(async move { + let Ok(api) = api.upgrade() else { + return; + }; // this will check if they exist. If yes, they will be added to the routing table. for (id, ids) in groups { api.lookup(id, Some(ids)).await.ok(); @@ -1652,18 +1746,12 @@ impl State

{ } else { Some(self.pool.id()) }; - info!(%id, %target, "Querying node"); - let t0 = Instant::now(); + let client = self .pool .client(id) .await - .map_err(|_| "Failed to get client"); - if let Err(e) = &client { - info!(%id, "Failed to get client: {e}"); - return Err("Failed to get client"); - } - let client = client?; + .map_err(|_| "Error getting client")?; let infos = client .find_node(target, requester) .await @@ -1674,7 +1762,6 @@ impl State

{ } let infos = infos?; drop(client); - info!(%id, "Done with client after {:?}", t0.elapsed()); let ids = infos.iter().map(|info| info.node_id).collect(); for info in infos { self.pool.add_node_addr(info); @@ -1837,5 +1924,5 @@ fn create_node_impl( let (tx, rx) = tokio::sync::mpsc::channel(32); let (actor, api) = Actor::

::new(node, rx, pool, config); tokio::spawn(actor.run()); - (RpcClient(irpc::Client::local(tx)), api) + (RpcClient::new(irpc::Client::local(tx)), api) } diff --git a/src/tests.rs b/src/tests.rs index e4c6db1..e1df958 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -197,7 +197,7 @@ async fn store_random_values(nodes: &Nodes, n: usize) -> irpc::Result<()> { let mut common_count = vec![0usize; n]; #[allow(clippy::needless_range_loop)] for i in 0..n { - if nodes.len() > 1 { + if nodes.len() > 10000 { println!("Value {i}"); } let text = format!("Item {i}"); @@ -483,25 +483,28 @@ async fn iroh_create_nodes( .await?; let addr = endpoint.node_addr().initialized().await; discovery.add_node_info(addr.clone()); - let pool = ConnectionPool::new(endpoint.clone(), DHT_TEST_ALPN, iroh_connection_pool::connection_pool::Options { - max_connections: 500, - idle_timeout: Duration::from_secs(1), - connect_timeout: Duration::from_secs(1), - }); + let pool = ConnectionPool::new( + endpoint.clone(), + DHT_TEST_ALPN, + iroh_connection_pool::connection_pool::Options { + max_connections: 32, + idle_timeout: Duration::from_secs(1), + connect_timeout: Duration::from_secs(1), + }, + ); let pool = IrohPool::new(endpoint.clone(), pool); let bootstrap = (0..n_bootstrap) .map(|i| node_ids[(offfset + i + 1) % n]) .collect::>(); - Ok(( - endpoint, - create_node_impl( - *node_id, - pool, - bootstrap, - (*buckets).clone(), - Default::default(), - ), - )) + let (rpc, api) = create_node_impl( + *node_id, + pool.clone(), + bootstrap, + (*buckets).clone(), + Default::default(), + ); + pool.set_self_client(Some(rpc.downgrade())); + Ok((endpoint, (rpc, api))) } }) .buffered_unordered(32) @@ -538,11 +541,7 @@ fn spawn_routers(iroh_nodes: &IrohNodes) -> Vec { }) .collect() } - -#[tokio::test(flavor = "multi_thread")] -async fn iroh_perfect_routing_tables_500() -> TestResult<()> { - tracing_subscriber::fmt::try_init().ok(); - let n = 500; +async fn iroh_perfect_routing_tables(n: usize) -> TestResult<()> { let seed = 0; let bootstrap = 0; let secrets = create_secrets(seed, n); @@ -562,6 +561,18 @@ async fn iroh_perfect_routing_tables_500() -> TestResult<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread")] +async fn iroh_perfect_routing_tables_500() -> TestResult<()> { + iroh_perfect_routing_tables(500).await +} + +#[tokio::test(flavor = "multi_thread")] +#[ignore = "runs very long and takes a lot of mem"] +async fn iroh_perfect_routing_tables_10k() -> TestResult<()> { + iroh_perfect_routing_tables(10000).await +} + + #[tokio::test(flavor = "multi_thread")] async fn random_lookup_strategy() { let n = 1000;