Skip to content

Commit 2dc4063

Browse files
committed
Use MaybeFuture
1 parent 0445f6c commit 2dc4063

File tree

3 files changed

+46
-30
lines changed

3 files changed

+46
-30
lines changed

iroh-connection-pool/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ edition = "2024"
55

66
[dependencies]
77
iroh = "0.91.1"
8-
n0-future = "0.1.3"
8+
n0-future = "0.2.0"
99
snafu = "0.8.6"
1010
tokio = "1.45"
1111
tokio-util = { version = "0.7", features = ["time"] }

iroh-connection-pool/src/connection_pool.rs

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use iroh::{
1414
Endpoint, NodeId,
1515
endpoint::{ConnectError, Connection},
1616
};
17-
use n0_future::boxed::BoxFuture;
17+
use n0_future::{MaybeFuture, boxed::BoxFuture};
1818
use snafu::Snafu;
1919
use tokio::{
2020
sync::mpsc,
@@ -91,12 +91,14 @@ async fn run_connection_actor(
9191
Ok(Err(e)) => Err(PoolConnectError::ConnectError(e)),
9292
Err(_) => Err(PoolConnectError::Timeout),
9393
};
94-
if !matches!(state, Ok(_)) {
95-
context.owner.close(node_id).await.ok();
94+
if state.is_err() {
95+
if context.owner.close(node_id).await.is_err() {
96+
return;
97+
}
9698
}
9799

98100
let mut tasks = JoinSet::new();
99-
let idle_timer = tokio::time::sleep(context.options.idle_timeout);
101+
let idle_timer = MaybeFuture::default();
100102
tokio::pin!(idle_timer);
101103

102104
loop {
@@ -107,8 +109,8 @@ async fn run_connection_actor(
107109
handler = rx.recv() => {
108110
match handler {
109111
Some(handler) => {
110-
// Reset idle timer by creating a new one
111-
idle_timer.set(tokio::time::sleep(context.options.idle_timeout));
112+
// clear the idle timer
113+
idle_timer.as_mut().set_none();
112114
tasks.spawn(handler(&state));
113115
}
114116
None => {
@@ -145,14 +147,19 @@ async fn run_connection_actor(
145147
}
146148
}
147149

148-
// If no tasks left and channel is closed, we can exit
149-
if tasks.is_empty() && rx.is_closed() {
150-
break;
150+
// We are idle
151+
if tasks.is_empty() {
152+
// If the channel is closed, we can exit
153+
if rx.is_closed() {
154+
break;
155+
}
156+
// set the idle timer
157+
idle_timer.as_mut().set_future(tokio::time::sleep(context.options.idle_timeout));
151158
}
152159
}
153160

154161
// Idle timeout - request shutdown
155-
_ = &mut idle_timer, if tasks.is_empty() => {
162+
_ = &mut idle_timer => {
156163
tracing::debug!("Connection to {} idle, requesting shutdown", node_id);
157164

158165
context.owner.close(node_id).await.ok();

iroh-connection-pool/src/connection_pool_0rtt.rs

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use iroh::{
1717
Endpoint, NodeId,
1818
endpoint::{ConnectOptions, ConnectWithOptsError, Connection, ConnectionError},
1919
};
20-
use n0_future::boxed::BoxFuture;
20+
use n0_future::{MaybeFuture, boxed::BoxFuture};
2121
use snafu::Snafu;
2222
use tokio::{
2323
sync::{broadcast, mpsc},
@@ -92,15 +92,18 @@ async fn connect(
9292
endpoint: &Endpoint,
9393
node_id: NodeId,
9494
alpn: &[u8],
95-
) -> (PoolConnectResult, Option<impl Future<Output = ()>>) {
95+
) -> (PoolConnectResult, MaybeFuture<impl Future<Output = ()>>) {
9696
let connecting = match endpoint
9797
.connect_with_opts(node_id, alpn, ConnectOptions::default())
9898
.await
9999
{
100100
Ok(connecting) => connecting,
101101
Err(cause) => {
102102
trace!("Failed to connect to node {}: {}", node_id, cause);
103-
return (Err(PoolConnectError::ConnectError(cause)), None);
103+
return (
104+
Err(PoolConnectError::ConnectError(cause)),
105+
MaybeFuture::None,
106+
);
104107
}
105108
};
106109
let (conn, zero_rtt_accepted) = match connecting.into_0rtt() {
@@ -114,14 +117,14 @@ async fn connect(
114117
Err(cause) => Err(PoolConnectError::ConnectionError(cause)),
115118
Ok(connection) => Ok((connection, accepted(true))),
116119
};
117-
return (res, None);
120+
return (res, MaybeFuture::None);
118121
}
119122
};
120123
let (tx, rx) = broadcast::channel(1);
121124
let complete = Box::pin(async move {
122125
tx.send(zero_rtt_accepted.await).ok();
123126
});
124-
(Ok((conn, rx)), Some(complete))
127+
(Ok((conn, rx)), MaybeFuture::Some(complete))
125128
}
126129

127130
/// Run a connection actor for a single node
@@ -131,19 +134,22 @@ async fn run_connection_actor(
131134
context: Arc<Context>,
132135
) {
133136
// Connect to the node
134-
let (mut state, mut forwarder) = match connect(&context.endpoint, node_id, &context.alpn)
137+
let (mut state, forwarder) = match connect(&context.endpoint, node_id, &context.alpn)
135138
.timeout(context.options.connect_timeout)
136139
.await
137140
{
138141
Ok((state, forwarder)) => (state, forwarder),
139-
Err(_) => (Err(PoolConnectError::Timeout), None),
142+
Err(_) => (Err(PoolConnectError::Timeout), MaybeFuture::None),
140143
};
141-
if !matches!(state, Ok((_, _))) {
142-
context.owner.close(node_id).await.ok();
144+
if state.is_err() {
145+
if context.owner.close(node_id).await.is_err() {
146+
return;
147+
}
143148
}
144149
let mut tasks = JoinSet::new();
145-
let idle_timer = tokio::time::sleep(context.options.idle_timeout);
150+
let idle_timer = MaybeFuture::default();
146151
tokio::pin!(idle_timer);
152+
tokio::pin!(forwarder);
147153

148154
loop {
149155
tokio::select! {
@@ -153,8 +159,7 @@ async fn run_connection_actor(
153159
handler = rx.recv() => {
154160
match handler {
155161
Some(handler) => {
156-
// Reset idle timer by creating a new one
157-
idle_timer.set(tokio::time::sleep(context.options.idle_timeout));
162+
idle_timer.as_mut().set_none();
158163
tasks.spawn(handler(&state));
159164
}
160165
None => {
@@ -191,22 +196,26 @@ async fn run_connection_actor(
191196
}
192197
}
193198

194-
// If no tasks left and channel is closed, we can exit
195-
if tasks.is_empty() && rx.is_closed() {
196-
break;
199+
// We are idle
200+
if tasks.is_empty() {
201+
// If the channel is closed, we can exit
202+
if rx.is_closed() {
203+
break;
204+
}
205+
// set the idle timer
206+
idle_timer.as_mut().set_future(tokio::time::sleep(context.options.idle_timeout));
197207
}
198208
}
199209

200210
// Idle timeout - request shutdown
201-
_ = &mut idle_timer, if tasks.is_empty() => {
211+
_ = &mut idle_timer => {
202212
trace!("Connection to {} idle, requesting shutdown", node_id);
203213
context.owner.close(node_id).await.ok();
204214
// Don't break here - wait for main actor to close our channel
205215
}
206216

207-
_ = forwarder.as_mut().unwrap(), if forwarder.is_some() => {
208-
forwarder = None;
209-
}
217+
// Poll the forwarder if we have one
218+
_ = &mut forwarder => {}
210219
}
211220
}
212221

0 commit comments

Comments
 (0)