1010//! connect, and don't clone it out of the future.
1111use std:: {
1212 collections:: { HashMap , VecDeque } ,
13- ops:: Deref ,
14- sync:: {
15- Arc ,
16- atomic:: { AtomicUsize , Ordering } ,
17- } ,
13+ sync:: Arc ,
1814 time:: Duration ,
1915} ;
2016
2117use iroh:: {
2218 Endpoint , NodeId ,
2319 endpoint:: { ConnectError , Connection } ,
2420} ;
25- use n0_future:: { MaybeFuture , Stream , StreamExt } ;
21+ use n0_future:: { MaybeFuture , StreamExt } ;
2622use snafu:: Snafu ;
2723use tokio:: {
2824 sync:: {
29- Notify ,
3025 mpsc:: { self , error:: SendError as TokioSendError } ,
3126 oneshot,
3227 } ,
3328 task:: JoinError ,
3429} ;
3530use tokio_util:: time:: FutureExt as TimeFutureExt ;
36- use tracing:: { debug, error, info, trace} ;
31+ use tracing:: { debug, error, trace} ;
32+
33+ use crate :: { ConnectionCounter , ConnectionRef } ;
3734
3835/// Configuration options for the connection pool
3936#[ derive( Debug , Clone , Copy ) ]
@@ -53,30 +50,6 @@ impl Default for Options {
5350 }
5451}
5552
56- /// A reference to a connection that is owned by a connection pool.
57- #[ derive( Debug ) ]
58- pub struct ConnectionRef {
59- connection : iroh:: endpoint:: Connection ,
60- _permit : OneConnection ,
61- }
62-
63- impl Deref for ConnectionRef {
64- type Target = iroh:: endpoint:: Connection ;
65-
66- fn deref ( & self ) -> & Self :: Target {
67- & self . connection
68- }
69- }
70-
71- impl ConnectionRef {
72- fn new ( connection : iroh:: endpoint:: Connection , counter : OneConnection ) -> Self {
73- Self {
74- connection,
75- _permit : counter,
76- }
77- }
78- }
79-
8053struct Context {
8154 options : Options ,
8255 endpoint : Endpoint ,
@@ -127,67 +100,6 @@ struct RequestRef {
127100 tx : oneshot:: Sender < Result < ConnectionRef , PoolConnectError > > ,
128101}
129102
130- #[ derive( Debug ) ]
131- struct ConnectionCounterInner {
132- count : AtomicUsize ,
133- notify : Notify ,
134- }
135-
136- #[ derive( Debug , Clone ) ]
137- struct ConnectionCounter {
138- inner : Arc < ConnectionCounterInner > ,
139- }
140-
141- impl ConnectionCounter {
142- fn new ( ) -> Self {
143- Self {
144- inner : Arc :: new ( ConnectionCounterInner {
145- count : Default :: default ( ) ,
146- notify : Notify :: new ( ) ,
147- } ) ,
148- }
149- }
150-
151- fn get_one ( & self ) -> OneConnection {
152- self . inner . count . fetch_add ( 1 , Ordering :: SeqCst ) ;
153- OneConnection {
154- inner : self . inner . clone ( ) ,
155- }
156- }
157-
158- fn is_idle ( & self ) -> bool {
159- self . inner . count . load ( Ordering :: SeqCst ) == 0
160- }
161-
162- /// Infinite stream that yields when the connection is briefly idle.
163- ///
164- /// Note that you still have to check if the connection is still idle when
165- /// you get the notification.
166- ///
167- /// Also note that this stream is triggered on [OneConnection::drop], so it
168- /// won't trigger initially even though a [ConnectionCounter] starts up as
169- /// idle.
170- fn idle_stream ( self ) -> impl Stream < Item = ( ) > {
171- n0_future:: stream:: unfold ( self , |c| async move {
172- c. inner . notify . notified ( ) . await ;
173- Some ( ( ( ) , c) )
174- } )
175- }
176- }
177-
178- #[ derive( Debug ) ]
179- struct OneConnection {
180- inner : Arc < ConnectionCounterInner > ,
181- }
182-
183- impl Drop for OneConnection {
184- fn drop ( & mut self ) {
185- if self . inner . count . fetch_sub ( 1 , Ordering :: SeqCst ) == 1 {
186- self . inner . notify . notify_waiters ( ) ;
187- }
188- }
189- }
190-
191103/// Run a connection actor for a single node
192104async fn run_connection_actor (
193105 node_id : NodeId ,
@@ -270,11 +182,7 @@ async fn run_connection_actor(
270182 }
271183
272184 if let Ok ( connection) = state {
273- let reason = if counter. inner . count . load ( Ordering :: SeqCst ) > 0 {
274- b"drop"
275- } else {
276- b"idle"
277- } ;
185+ let reason = if counter. is_idle ( ) { b"idle" } else { b"drop" } ;
278186 connection. close ( 0u32 . into ( ) , reason) ;
279187 }
280188
@@ -411,6 +319,10 @@ impl ConnectionPool {
411319 Self { tx }
412320 }
413321
322+ /// Returns either a fresh connection or a reference to an existing one.
323+ ///
324+ /// This is guaranteed to return after approximately [Options::connect_timeout]
325+ /// with either an error or a connection.
414326 pub async fn connect (
415327 & self ,
416328 id : NodeId ,
0 commit comments