Skip to content

Commit 0edac5c

Browse files
committed
Add config object
1 parent 806fa45 commit 0edac5c

File tree

1 file changed

+27
-8
lines changed

1 file changed

+27
-8
lines changed

src/util/connection_pool.rs

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use tokio::sync::{
2929
oneshot, Notify,
3030
};
3131
use tokio_util::time::FutureExt as TimeFutureExt;
32-
use tracing::{debug, error, trace};
32+
use tracing::{debug, error, info, trace};
3333

3434
/// Configuration options for the connection pool
3535
#[derive(Debug, Clone, Copy)]
@@ -142,17 +142,26 @@ impl Context {
142142
.await
143143
.map_err(|_| PoolConnectError::Timeout)
144144
.and_then(|r| r.map_err(PoolConnectError::from));
145-
if let Err(e) = &state {
146-
debug!(%node_id, "Failed to connect {e:?}, requesting shutdown");
147-
if context.owner.close(node_id).await.is_err() {
148-
return;
145+
let conn_close = match &state {
146+
Ok(conn) => {
147+
let conn = conn.clone();
148+
MaybeFuture::Some(async move { conn.closed().await })
149149
}
150-
}
150+
Err(e) => {
151+
debug!(%node_id, "Failed to connect {e:?}, requesting shutdown");
152+
tokio::time::sleep(Duration::from_secs(1)).await;
153+
if context.owner.close(node_id).await.is_err() {
154+
return;
155+
}
156+
MaybeFuture::None
157+
}
158+
};
159+
151160
let counter = ConnectionCounter::new();
152161
let idle_timer = MaybeFuture::default();
153162
let idle_stream = counter.clone().idle_stream();
154163

155-
tokio::pin!(idle_timer, idle_stream);
164+
tokio::pin!(idle_timer, idle_stream, conn_close);
156165

157166
loop {
158167
tokio::select! {
@@ -166,6 +175,7 @@ impl Context {
166175
match &state {
167176
Ok(state) => {
168177
let res = ConnectionRef::new(state.clone(), counter.get_one());
178+
info!(%node_id, "Handing out ConnectionRef {}", counter.current());
169179

170180
// clear the idle timer
171181
idle_timer.as_mut().set_none();
@@ -177,12 +187,17 @@ impl Context {
177187
}
178188
}
179189
None => {
180-
// Channel closed - finish remaining tasks and exit
190+
// Channel closed - exit
181191
break;
182192
}
183193
}
184194
}
185195

196+
_ = &mut conn_close => {
197+
// connection was closed by somebody, notify owner that we should be removed
198+
context.owner.close(node_id).await.ok();
199+
}
200+
186201
_ = idle_stream.next() => {
187202
if !counter.is_idle() {
188203
continue;
@@ -417,6 +432,10 @@ impl ConnectionCounter {
417432
}
418433
}
419434

435+
fn current(&self) -> usize {
436+
self.inner.count.load(Ordering::SeqCst)
437+
}
438+
420439
/// Increase the connection count and return a guard for the new connection
421440
fn get_one(&self) -> OneConnection {
422441
self.inner.count.fetch_add(1, Ordering::SeqCst);

0 commit comments

Comments
 (0)