Skip to content

Commit b6b7dee

Browse files
committed
flatten error
1 parent 035284f commit b6b7dee

File tree

2 files changed

+38
-46
lines changed

2 files changed

+38
-46
lines changed

iroh-connection-pool/src/connection_pool.rs

Lines changed: 16 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@
99
//! It is important that you use the connection only in the future passed to
1010
//! connect, and don't clone it out of the future.
1111
use std::{
12-
collections::{HashMap, VecDeque}, ops::Deref, sync::Arc, time::Duration
12+
collections::{HashMap, VecDeque},
13+
ops::Deref,
14+
sync::Arc,
15+
time::Duration,
1316
};
1417

1518
use iroh::{
@@ -19,7 +22,11 @@ use iroh::{
1922
use n0_future::MaybeFuture;
2023
use snafu::Snafu;
2124
use tokio::{
22-
sync::{mpsc::{self, error::SendError as TokioSendError}, oneshot, OwnedSemaphorePermit},
25+
sync::{
26+
OwnedSemaphorePermit,
27+
mpsc::{self, error::SendError as TokioSendError},
28+
oneshot,
29+
},
2330
task::JoinError,
2431
};
2532
use tokio_util::time::FutureExt;
@@ -80,25 +87,25 @@ struct Context {
8087
/// errors such as timeouts and connection limits.
8188
#[derive(Debug, Clone)]
8289
pub enum PoolConnectError {
90+
/// Connection pool is shut down
91+
Shutdown,
8392
/// Timeout during connect
8493
Timeout,
8594
/// Too many connections
8695
TooManyConnections,
8796
/// Error during connect
8897
ConnectError(Arc<ConnectError>),
89-
/// Error during last execute
90-
ExecuteError(Arc<ExecuteError>),
9198
/// Handler actor panicked
9299
JoinError(Arc<JoinError>),
93100
}
94101

95102
impl std::fmt::Display for PoolConnectError {
96103
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97104
match self {
105+
PoolConnectError::Shutdown => write!(f, "Connection pool is shut down"),
98106
PoolConnectError::Timeout => write!(f, "Connection timed out"),
99107
PoolConnectError::TooManyConnections => write!(f, "Too many connections"),
100108
PoolConnectError::ConnectError(e) => write!(f, "Connection error: {}", e),
101-
PoolConnectError::ExecuteError(e) => write!(f, "Execution error: {}", e),
102109
PoolConnectError::JoinError(e) => write!(f, "Join error: {}", e),
103110
}
104111
}
@@ -286,8 +293,7 @@ impl Actor {
286293
trace!("removing oldest idle connection {}", idle);
287294
self.connections.remove(&idle);
288295
} else {
289-
msg.tx.send(Err(PoolConnectError::TooManyConnections))
290-
.ok();
296+
msg.tx.send(Err(PoolConnectError::TooManyConnections)).ok();
291297
continue;
292298
}
293299
}
@@ -327,19 +333,6 @@ pub enum ConnectionPoolError {
327333
Shutdown,
328334
}
329335

330-
/// An error during the usage of the connection.
331-
///
332-
/// The connection pool will recreate the connection if a handler returns this
333-
/// error. If you don't want this, swallow the error in the handler.
334-
#[derive(Debug, Snafu)]
335-
pub struct ExecuteError;
336-
337-
impl From<PoolConnectError> for ExecuteError {
338-
fn from(_: PoolConnectError) -> Self {
339-
ExecuteError
340-
}
341-
}
342-
343336
/// A connection pool
344337
#[derive(Debug, Clone)]
345338
pub struct ConnectionPool {
@@ -359,14 +352,13 @@ impl ConnectionPool {
359352
pub async fn connect(
360353
&self,
361354
id: NodeId,
362-
) -> std::result::Result<std::result::Result<ConnectionRef, PoolConnectError>, ConnectionPoolError>
363-
{
355+
) -> std::result::Result<ConnectionRef, PoolConnectError> {
364356
let (tx, rx) = oneshot::channel();
365357
self.tx
366358
.send(ActorMessage::RequestRef(RequestRef { id, tx }))
367359
.await
368-
.map_err(|_| ConnectionPoolError::Shutdown)?;
369-
Ok(rx.await.map_err(|_| ConnectionPoolError::Shutdown)?)
360+
.map_err(|_| PoolConnectError::Shutdown)?;
361+
Ok(rx.await.map_err(|_| PoolConnectError::Shutdown)??)
370362
}
371363

372364
/// Close an existing connection, if it exists

iroh-connection-pool/src/tests.rs

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use n0_snafu::ResultExt;
1111
use testresult::TestResult;
1212
use tracing::trace;
1313

14-
use crate::connection_pool::{ConnectionPool, ConnectionPoolError, Options, PoolConnectError};
14+
use crate::connection_pool::{ConnectionPool, Options, PoolConnectError};
1515

1616
const ECHO_ALPN: &[u8] = b"echo";
1717

@@ -92,27 +92,23 @@ impl EchoClient {
9292
&self,
9393
id: NodeId,
9494
text: Vec<u8>,
95-
) -> Result<
96-
Result<Result<(usize, Vec<u8>), n0_snafu::Error>, PoolConnectError>,
97-
ConnectionPoolError,
98-
> {
95+
) -> Result<Result<(usize, Vec<u8>), n0_snafu::Error>, PoolConnectError> {
9996
let conn = self.pool.connect(id).await?;
100-
let conn = match conn {
101-
Ok(conn) => conn,
102-
Err(e) => return Ok(Err(e)),
103-
};
10497
let id = conn.stable_id();
10598
match echo_client(&conn, &text).await {
106-
Ok(res) => Ok(Ok(Ok((id, res)))),
107-
Err(e) => Ok(Ok(Err(e))),
99+
Ok(res) => Ok(Ok((id, res))),
100+
Err(e) => Ok(Err(e)),
108101
}
109102
}
110103
}
111104

112105
#[tokio::test]
113106
async fn connection_pool_errors() -> TestResult<()> {
114107
let filter = tracing_subscriber::EnvFilter::from_default_env();
115-
tracing_subscriber::fmt().with_env_filter(filter).try_init().ok();
108+
tracing_subscriber::fmt()
109+
.with_env_filter(filter)
110+
.try_init()
111+
.ok();
116112
// set up static discovery for all addrs
117113
let discovery = StaticProvider::new();
118114
let endpoint = iroh::Endpoint::builder()
@@ -123,7 +119,7 @@ async fn connection_pool_errors() -> TestResult<()> {
123119
let client = EchoClient { pool };
124120
{
125121
let non_existing = SecretKey::from_bytes(&[0; 32]).public();
126-
let res = client.echo(non_existing, b"Hello, world!".to_vec()).await?;
122+
let res = client.echo(non_existing, b"Hello, world!".to_vec()).await;
127123
// trying to connect to a non-existing id will fail with ConnectError
128124
// because we don't have any information about the node
129125
assert!(matches!(res, Err(PoolConnectError::ConnectError(_))));
@@ -140,9 +136,7 @@ async fn connection_pool_errors() -> TestResult<()> {
140136
});
141137
// trying to connect to an id for which we have info, but the other
142138
// end is not listening, will lead to a timeout.
143-
let res = client
144-
.echo(non_listening, b"Hello, world!".to_vec())
145-
.await?;
139+
let res = client.echo(non_listening, b"Hello, world!".to_vec()).await;
146140
assert!(matches!(res, Err(PoolConnectError::Timeout)));
147141
}
148142
Ok(())
@@ -151,7 +145,10 @@ async fn connection_pool_errors() -> TestResult<()> {
151145
#[tokio::test]
152146
async fn connection_pool_smoke() -> TestResult<()> {
153147
let filter = tracing_subscriber::EnvFilter::from_default_env();
154-
tracing_subscriber::fmt().with_env_filter(filter).try_init().ok();
148+
tracing_subscriber::fmt()
149+
.with_env_filter(filter)
150+
.try_init()
151+
.ok();
155152
let n = 32;
156153
let nodes = echo_servers(n).await?;
157154
let ids = nodes
@@ -170,17 +167,17 @@ async fn connection_pool_smoke() -> TestResult<()> {
170167
let mut connection_ids = BTreeMap::new();
171168
let msg = b"Hello, world!".to_vec();
172169
for id in &ids {
173-
let (cid1, res) = client.echo(*id, msg.clone()).await???;
170+
let (cid1, res) = client.echo(*id, msg.clone()).await??;
174171
assert_eq!(res, msg);
175-
let (cid2, res) = client.echo(*id, msg.clone()).await???;
172+
let (cid2, res) = client.echo(*id, msg.clone()).await??;
176173
assert_eq!(res, msg);
177174
assert_eq!(cid1, cid2);
178175
connection_ids.insert(id, cid1);
179176
}
180177
tokio::time::sleep(Duration::from_millis(1000)).await;
181178
for id in &ids {
182179
let cid1 = *connection_ids.get(id).expect("Connection ID not found");
183-
let (cid2, res) = client.echo(*id, msg.clone()).await???;
180+
let (cid2, res) = client.echo(*id, msg.clone()).await??;
184181
assert_eq!(res, msg);
185182
assert_ne!(cid1, cid2);
186183
}
@@ -192,7 +189,10 @@ async fn connection_pool_smoke() -> TestResult<()> {
192189
#[tokio::test]
193190
async fn connection_pool_idle() -> TestResult<()> {
194191
let filter = tracing_subscriber::EnvFilter::from_default_env();
195-
tracing_subscriber::fmt().with_env_filter(filter).try_init().ok();
192+
tracing_subscriber::fmt()
193+
.with_env_filter(filter)
194+
.try_init()
195+
.ok();
196196
let n = 32;
197197
let nodes = echo_servers(n).await?;
198198
let ids = nodes
@@ -218,7 +218,7 @@ async fn connection_pool_idle() -> TestResult<()> {
218218
let client = EchoClient { pool };
219219
let msg = b"Hello, world!".to_vec();
220220
for id in &ids {
221-
let (_, res) = client.echo(*id, msg.clone()).await???;
221+
let (_, res) = client.echo(*id, msg.clone()).await??;
222222
assert_eq!(res, msg);
223223
}
224224
Ok(())

0 commit comments

Comments
 (0)