1111use std:: {
1212 collections:: { HashMap , VecDeque } ,
1313 ops:: Deref ,
14- sync:: Arc ,
14+ sync:: {
15+ Arc ,
16+ atomic:: { AtomicUsize , Ordering } ,
17+ } ,
1518 time:: Duration ,
1619} ;
1720
1821use iroh:: {
1922 Endpoint , NodeId ,
2023 endpoint:: { ConnectError , Connection } ,
2124} ;
22- use n0_future:: MaybeFuture ;
25+ use n0_future:: { MaybeFuture , Stream , StreamExt } ;
2326use snafu:: Snafu ;
2427use tokio:: {
2528 sync:: {
26- OwnedSemaphorePermit ,
29+ Notify ,
2730 mpsc:: { self , error:: SendError as TokioSendError } ,
2831 oneshot,
2932 } ,
3033 task:: JoinError ,
3134} ;
32- use tokio_util:: time:: FutureExt ;
33- use tracing:: { debug, error, trace} ;
35+ use tokio_util:: time:: FutureExt as TimeFutureExt ;
36+ use tracing:: { debug, error, info , trace} ;
3437
3538/// Configuration options for the connection pool
3639#[ derive( Debug , Clone , Copy ) ]
@@ -54,7 +57,7 @@ impl Default for Options {
5457#[ derive( Debug ) ]
5558pub struct ConnectionRef {
5659 connection : iroh:: endpoint:: Connection ,
57- _permit : OwnedSemaphorePermit ,
60+ _permit : OneConnection ,
5861}
5962
6063impl Deref for ConnectionRef {
@@ -66,10 +69,10 @@ impl Deref for ConnectionRef {
6669}
6770
6871impl ConnectionRef {
69- fn new ( connection : iroh:: endpoint:: Connection , permit : OwnedSemaphorePermit ) -> Self {
72+ fn new ( connection : iroh:: endpoint:: Connection , counter : OneConnection ) -> Self {
7073 Self {
7174 connection,
72- _permit : permit ,
75+ _permit : counter ,
7376 }
7477 }
7578}
@@ -124,6 +127,67 @@ struct RequestRef {
124127 tx : oneshot:: Sender < Result < ConnectionRef , PoolConnectError > > ,
125128}
126129
130+ #[ derive( Debug ) ]
131+ struct ConnectionCounterInner {
132+ count : AtomicUsize ,
133+ notify : Notify ,
134+ }
135+
136+ #[ derive( Debug , Clone ) ]
137+ struct ConnectionCounter {
138+ inner : Arc < ConnectionCounterInner > ,
139+ }
140+
141+ impl ConnectionCounter {
142+ fn new ( ) -> Self {
143+ Self {
144+ inner : Arc :: new ( ConnectionCounterInner {
145+ count : Default :: default ( ) ,
146+ notify : Notify :: new ( ) ,
147+ } ) ,
148+ }
149+ }
150+
151+ fn get_one ( & self ) -> OneConnection {
152+ self . inner . count . fetch_add ( 1 , Ordering :: SeqCst ) ;
153+ OneConnection {
154+ inner : self . inner . clone ( ) ,
155+ }
156+ }
157+
158+ fn is_idle ( & self ) -> bool {
159+ self . inner . count . load ( Ordering :: SeqCst ) == 0
160+ }
161+
162+ /// Infinite stream that yields when the connection is briefly idle.
163+ ///
164+ /// Note that you still have to check if the connection is still idle when
165+ /// you get the notification.
166+ ///
167+ /// Also note that this stream is triggered on [OneConnection::drop], so it
168+ /// won't trigger initially even though a [ConnectionCounter] starts up as
169+ /// idle.
170+ fn idle_stream ( self ) -> impl Stream < Item = ( ) > {
171+ n0_future:: stream:: unfold ( self , |c| async move {
172+ c. inner . notify . notified ( ) . await ;
173+ Some ( ( ( ) , c) )
174+ } )
175+ }
176+ }
177+
178+ #[ derive( Debug ) ]
179+ struct OneConnection {
180+ inner : Arc < ConnectionCounterInner > ,
181+ }
182+
183+ impl Drop for OneConnection {
184+ fn drop ( & mut self ) {
185+ if self . inner . count . fetch_sub ( 1 , Ordering :: SeqCst ) == 1 {
186+ self . inner . notify . notify_waiters ( ) ;
187+ }
188+ }
189+ }
190+
127191/// Run a connection actor for a single node
128192async fn run_connection_actor (
129193 node_id : NodeId ,
@@ -147,10 +211,11 @@ async fn run_connection_actor(
147211 return ;
148212 }
149213 }
150- let semaphore = Arc :: new ( tokio :: sync :: Semaphore :: new ( u32 :: MAX as usize ) ) ;
214+ let counter = ConnectionCounter :: new ( ) ;
151215 let idle_timer = MaybeFuture :: default ( ) ;
152- let idle_fut = MaybeFuture :: default ( ) ;
153- tokio:: pin!( idle_timer, idle_fut) ;
216+ let idle_stream = counter. clone ( ) . idle_stream ( ) ;
217+
218+ tokio:: pin!( idle_timer, idle_stream) ;
154219
155220 loop {
156221 tokio:: select! {
@@ -161,15 +226,9 @@ async fn run_connection_actor(
161226 match handler {
162227 Some ( RequestRef { id, tx } ) => {
163228 assert!( id == node_id, "Not for me!" ) ;
164- trace!( %node_id, "Received new request" ) ;
165229 match & state {
166230 Ok ( state) => {
167- // first acquire a permit for the op, then aquire all permits for idle
168- let permit = semaphore. clone( ) . acquire_owned( ) . await . expect( "semaphore closed" ) ;
169- let res = ConnectionRef :: new( state. clone( ) , permit) ;
170- if idle_fut. is_none( ) {
171- idle_fut. as_mut( ) . set_future( semaphore. clone( ) . acquire_many_owned( u32 :: MAX ) ) ;
172- }
231+ let res = ConnectionRef :: new( state. clone( ) , counter. get_one( ) ) ;
173232
174233 // clear the idle timer
175234 idle_timer. as_mut( ) . set_none( ) ;
@@ -187,7 +246,10 @@ async fn run_connection_actor(
187246 }
188247 }
189248
190- _ = & mut idle_fut => {
249+ _ = idle_stream. next( ) => {
250+ if !counter. is_idle( ) {
251+ continue ;
252+ } ;
191253 // notify the pool that we are idle.
192254 trace!( %node_id, "Idle" ) ;
193255 if context. owner. idle( node_id) . await . is_err( ) {
@@ -200,23 +262,23 @@ async fn run_connection_actor(
200262
201263 // Idle timeout - request shutdown
202264 _ = & mut idle_timer => {
203- debug !( %node_id, "Connection idle , requesting shutdown" ) ;
265+ trace !( %node_id, "Idle timer expired , requesting shutdown" ) ;
204266 context. owner. close( node_id) . await . ok( ) ;
205267 // Don't break here - wait for main actor to close our channel
206268 }
207269 }
208270 }
209271
210272 if let Ok ( connection) = state {
211- let reason = if semaphore . available_permits ( ) == u32 :: MAX as usize {
212- "idle "
273+ let reason = if counter . inner . count . load ( Ordering :: SeqCst ) > 0 {
274+ b"drop "
213275 } else {
214- "drop "
276+ b"idle "
215277 } ;
216- connection. close ( 0u32 . into ( ) , reason. as_bytes ( ) ) ;
278+ connection. close ( 0u32 . into ( ) , reason) ;
217279 }
218280
219- debug ! ( %node_id, "Connection actor shutting down" ) ;
281+ trace ! ( %node_id, "Connection actor shutting down" ) ;
220282}
221283
222284struct Actor {
0 commit comments