@@ -37,8 +37,8 @@ use std::{
3737 } ,
3838} ;
3939
40- use anyhow:: { anyhow, Result } ;
41- use backon:: { BackoffBuilder , ExponentialBuilder , Retryable } ;
40+ use anyhow:: { anyhow, Context , Result } ;
41+ use backon:: { Backoff , BackoffBuilder , ExponentialBuilder } ;
4242use bytes:: { Bytes , BytesMut } ;
4343use iroh_base:: { NodeId , PublicKey , RelayUrl , SecretKey } ;
4444use iroh_metrics:: { inc, inc_by} ;
@@ -48,7 +48,6 @@ use iroh_relay::{
4848 PingTracker , MAX_PACKET_SIZE ,
4949} ;
5050use n0_future:: {
51- boxed:: BoxFuture ,
5251 task:: JoinSet ,
5352 time:: { self , Duration , Instant , MissedTickBehavior } ,
5453 FuturesUnorderedBounded , SinkExt , StreamExt ,
@@ -214,6 +213,17 @@ struct RelayConnectionOptions {
214213 insecure_skip_cert_verify : bool ,
215214}
216215
216+ /// Possible reasons for a failed relay connection.
217+ #[ derive( Debug , thiserror:: Error ) ]
218+ enum RelayConnectionError {
219+ #[ error( "Failed to connect to relay server: {0:#}" ) ]
220+ Connecting ( #[ source] anyhow:: Error ) ,
221+ #[ error( "Failed to handshake with relay server: {0:#}" ) ]
222+ Handshake ( #[ source] anyhow:: Error ) ,
223+ #[ error( "Lost connection to relay server: {0:#}" ) ]
224+ Established ( #[ source] anyhow:: Error ) ,
225+ }
226+
217227impl ActiveRelayActor {
218228 fn new ( opts : ActiveRelayActorOptions ) -> Self {
219229 let ActiveRelayActorOptions {
@@ -271,22 +281,25 @@ impl ActiveRelayActor {
271281 /// The main actor run loop.
272282 ///
273283 /// Primarily switches between the dialing and connected states.
274- async fn run ( mut self ) -> anyhow :: Result < ( ) > {
284+ async fn run ( mut self ) -> Result < ( ) > {
275285 inc ! ( MagicsockMetrics , num_relay_conns_added) ;
276286
277- loop {
278- let Some ( client) = self . run_dialing ( ) . instrument ( info_span ! ( "dialing" ) ) . await else {
279- break ;
280- } ;
281- match self
282- . run_connected ( client)
283- . instrument ( info_span ! ( "connected" ) )
284- . await
285- {
286- Ok ( _) => break ,
287- Err ( err) => {
288- debug ! ( "Connection to relay server lost: {err:#}" ) ;
289- continue ;
287+ let mut backoff = Self :: build_backoff ( ) ;
288+
289+ while let Err ( err) = self . run_once ( ) . await {
290+ warn ! ( "{err}" ) ;
291+ match err {
292+ RelayConnectionError :: Connecting ( _) | RelayConnectionError :: Handshake ( _) => {
293+ // If dialing failed, or if the relay connection failed before we received a pong,
294+ // we wait an exponentially increasing time until we attempt to reconnect again.
295+ let delay = backoff. next ( ) . context ( "Retries exceeded" ) ?;
296+ debug ! ( "Retry in {delay:?}" ) ;
297+ time:: sleep ( delay) . await ;
298+ }
299+ RelayConnectionError :: Established ( _) => {
300+ // If the relay connection remained established long enough so that we received a pong
301+ // from the relay server, we reset the backoff and attempt to reconnect immediately.
302+ backoff = Self :: build_backoff ( ) ;
290303 }
291304 }
292305 }
@@ -295,6 +308,31 @@ impl ActiveRelayActor {
295308 Ok ( ( ) )
296309 }
297310
311+ fn build_backoff ( ) -> impl Backoff {
312+ ExponentialBuilder :: new ( )
313+ . with_min_delay ( Duration :: from_millis ( 10 ) )
314+ . with_max_delay ( Duration :: from_secs ( 16 ) )
315+ . with_jitter ( )
316+ . without_max_times ( )
317+ . build ( )
318+ }
319+
320+ /// Attempt to connect to the relay, and run the connected actor loop.
321+ ///
322+ /// Returns `Ok(())` if the actor loop should shut down. Returns an error if dialing failed,
323+ /// or if the relay connection failed while connected. In both cases, the connection should
324+ /// be retried with a backoff.
325+ async fn run_once ( & mut self ) -> Result < ( ) , RelayConnectionError > {
326+ let client = match self . run_dialing ( ) . instrument ( info_span ! ( "dialing" ) ) . await {
327+ Some ( Ok ( client) ) => client,
328+ Some ( Err ( err) ) => return Err ( RelayConnectionError :: Connecting ( err) ) ,
329+ None => return Ok ( ( ) ) ,
330+ } ;
331+ self . run_connected ( client)
332+ . instrument ( info_span ! ( "connected" ) )
333+ . await
334+ }
335+
298336 fn reset_inactive_timeout ( & mut self ) {
299337 self . inactive_timeout
300338 . as_mut ( )
@@ -315,9 +353,9 @@ impl ActiveRelayActor {
315353
316354 /// Actor loop when connecting to the relay server.
317355 ///
318- /// Returns `None` if the actor needs to shut down. Returns `Some(client)` when the
319- /// connection is established.
320- async fn run_dialing ( & mut self ) -> Option < iroh_relay:: client:: Client > {
356+ /// Returns `None` if the actor needs to shut down. Returns `Some(Ok( client) )` when the
357+ /// connection is established, and `Some(Err(err))` if dialing the relay failed .
358+ async fn run_dialing ( & mut self ) -> Option < Result < iroh_relay:: client:: Client > > {
321359 debug ! ( "Actor loop: connecting to relay." ) ;
322360
323361 // We regularly flush the relay_datagrams_send queue so it is not full of stale
@@ -330,7 +368,8 @@ impl ActiveRelayActor {
330368 send_datagram_flush. set_missed_tick_behavior ( MissedTickBehavior :: Delay ) ;
331369 send_datagram_flush. reset ( ) ; // Skip the immediate interval
332370
333- let mut dialing_fut = self . dial_relay ( ) ;
371+ let dialing_fut = self . dial_relay ( ) ;
372+ tokio:: pin!( dialing_fut) ;
334373 loop {
335374 tokio:: select! {
336375 biased;
@@ -352,11 +391,10 @@ impl ActiveRelayActor {
352391 res = & mut dialing_fut => {
353392 match res {
354393 Ok ( client) => {
355- break Some ( client) ;
394+ break Some ( Ok ( client) ) ;
356395 }
357396 Err ( err) => {
358- warn!( "Client failed to connect: {err:#}" ) ;
359- dialing_fut = self . dial_relay( ) ;
397+ break Some ( Err ( err) ) ;
360398 }
361399 }
362400 }
@@ -403,54 +441,26 @@ impl ActiveRelayActor {
403441 /// The future only completes once the connection is established and retries
404442 /// connections. It currently does not ever return `Err` as the retries continue
405443 /// forever.
406- fn dial_relay ( & self ) -> BoxFuture < Result < Client > > {
407- let backoff = ExponentialBuilder :: new ( )
408- . with_min_delay ( Duration :: from_millis ( 10 ) )
409- . with_max_delay ( Duration :: from_secs ( 5 ) )
410- . without_max_times ( )
411- . build ( ) ;
412- let connect_fn = {
413- let client_builder = self . relay_client_builder . clone ( ) ;
414- move || {
415- let client_builder = client_builder. clone ( ) ;
416- async move {
417- match time:: timeout ( CONNECT_TIMEOUT , client_builder. connect ( ) ) . await {
418- Ok ( Ok ( client) ) => Ok ( client) ,
419- Ok ( Err ( err) ) => {
420- warn ! ( "Relay connection failed: {err:#}" ) ;
421- Err ( err)
422- }
423- Err ( _) => {
424- warn ! ( ?CONNECT_TIMEOUT , "Timeout connecting to relay" ) ;
425- Err ( anyhow ! ( "Timeout" ) )
426- }
427- }
428- }
429- }
430- } ;
431-
432- // We implement our own `Sleeper` here, so that we can use the `backon`
433- // crate with our own implementation of `time::sleep` (from `n0_future`)
434- // that works in browsers.
435- struct Sleeper ;
436-
437- impl backon:: Sleeper for Sleeper {
438- type Sleep = time:: Sleep ;
439-
440- fn sleep ( & self , dur : Duration ) -> Self :: Sleep {
441- time:: sleep ( dur)
444+ // This is using `impl Future` to return a future without a reference to self.
445+ fn dial_relay ( & self ) -> impl Future < Output = Result < Client > > {
446+ let client_builder = self . relay_client_builder . clone ( ) ;
447+ async move {
448+ match time:: timeout ( CONNECT_TIMEOUT , client_builder. connect ( ) ) . await {
449+ Ok ( Ok ( client) ) => Ok ( client) ,
450+ Ok ( Err ( err) ) => Err ( err) ,
451+ Err ( _) => Err ( anyhow ! ( "Connecting timed out after {CONNECT_TIMEOUT:?}" ) ) ,
442452 }
443453 }
444-
445- let retry_fut = connect_fn. retry ( backoff) . sleep ( Sleeper ) ;
446- Box :: pin ( retry_fut)
447454 }
448455
449456 /// Runs the actor loop when connected to a relay server.
450457 ///
451458 /// Returns `Ok` if the actor needs to shut down. `Err` is returned if the connection
452459 /// to the relay server is lost.
453- async fn run_connected ( & mut self , client : iroh_relay:: client:: Client ) -> Result < ( ) > {
460+ async fn run_connected (
461+ & mut self ,
462+ client : iroh_relay:: client:: Client ,
463+ ) -> Result < ( ) , RelayConnectionError > {
454464 debug ! ( "Actor loop: connected to relay" ) ;
455465 event ! (
456466 target: "iroh::_events::relay::connected" ,
@@ -466,6 +476,7 @@ impl ActiveRelayActor {
466476 nodes_present : BTreeSet :: new ( ) ,
467477 last_packet_src : None ,
468478 pong_pending : None ,
479+ established : false ,
469480 #[ cfg( test) ]
470481 test_pong : None ,
471482 } ;
@@ -474,9 +485,9 @@ impl ActiveRelayActor {
474485 let mut send_datagrams_buf = Vec :: with_capacity ( SEND_DATAGRAM_BATCH_SIZE ) ;
475486
476487 // Regularly send pings so we know the connection is healthy.
488+ // The first ping will be sent immediately.
477489 let mut ping_interval = time:: interval ( PING_INTERVAL ) ;
478490 ping_interval. set_missed_tick_behavior ( MissedTickBehavior :: Delay ) ;
479- ping_interval. reset ( ) ; // skip the ping at current time.
480491
481492 let res = loop {
482493 if let Some ( data) = state. pong_pending . take ( ) {
@@ -576,7 +587,7 @@ impl ActiveRelayActor {
576587 }
577588 msg = client_stream. next( ) => {
578589 let Some ( msg) = msg else {
579- break Err ( anyhow!( "Client stream finished " ) ) ;
590+ break Err ( anyhow!( "Stream closed by server. " ) ) ;
580591 } ;
581592 match msg {
582593 Ok ( msg) => {
@@ -593,10 +604,14 @@ impl ActiveRelayActor {
593604 }
594605 }
595606 } ;
607+
596608 if res. is_ok ( ) {
597- client_sink. close ( ) . await ?;
609+ if let Err ( err) = client_sink. close ( ) . await {
610+ debug ! ( "Failed to close client sink gracefully: {err:#}" ) ;
611+ }
598612 }
599- res
613+
614+ res. map_err ( |err| state. map_err ( err) )
600615 }
601616
602617 fn handle_relay_msg ( & mut self , msg : ReceivedMessage , state : & mut ConnectedRelayState ) {
@@ -642,11 +657,16 @@ impl ActiveRelayActor {
642657 }
643658 }
644659 }
645- state. ping_tracker . pong_received ( data)
660+ state. ping_tracker . pong_received ( data) ;
661+ state. established = true ;
662+ }
663+ ReceivedMessage :: Health { problem } => {
664+ let problem = problem. as_deref ( ) . unwrap_or ( "unknown" ) ;
665+ warn ! ( "Relay server reports problem: {problem}" ) ;
666+ }
667+ ReceivedMessage :: KeepAlive | ReceivedMessage :: ServerRestarting { .. } => {
668+ trace ! ( "Ignoring {msg:?}" )
646669 }
647- ReceivedMessage :: KeepAlive
648- | ReceivedMessage :: Health { .. }
649- | ReceivedMessage :: ServerRestarting { .. } => trace ! ( "Ignoring {msg:?}" ) ,
650670 }
651671 }
652672
@@ -666,13 +686,13 @@ impl ActiveRelayActor {
666686 sending_fut : impl Future < Output = Result < T , E > > ,
667687 state : & mut ConnectedRelayState ,
668688 client_stream : & mut iroh_relay:: client:: ClientStream ,
669- ) -> Result < ( ) > {
689+ ) -> Result < ( ) , RelayConnectionError > {
670690 // we use the same time as for our ping interval
671691 let send_timeout = PING_INTERVAL ;
672692
673693 let mut timeout = pin ! ( time:: sleep( send_timeout) ) ;
674694 let mut sending_fut = pin ! ( sending_fut) ;
675- loop {
695+ let res = loop {
676696 tokio:: select! {
677697 biased;
678698 _ = self . stop_token. cancelled( ) => {
@@ -705,7 +725,7 @@ impl ActiveRelayActor {
705725 // No need to read the inbox or datagrams to send.
706726 msg = client_stream. next( ) => {
707727 let Some ( msg) = msg else {
708- break Err ( anyhow!( "Client stream finished " ) ) ;
728+ break Err ( anyhow!( "Stream closed by server. " ) ) ;
709729 } ;
710730 match msg {
711731 Ok ( msg) => self . handle_relay_msg( msg, state) ,
@@ -717,7 +737,8 @@ impl ActiveRelayActor {
717737 break Ok ( ( ) ) ;
718738 }
719739 }
720- }
740+ } ;
741+ res. map_err ( |err| state. map_err ( err) )
721742 }
722743}
723744
@@ -738,10 +759,24 @@ struct ConnectedRelayState {
738759 last_packet_src : Option < NodeId > ,
739760 /// A pong we need to send ASAP.
740761 pong_pending : Option < [ u8 ; 8 ] > ,
762+ /// Whether the connection is to be considered established.
763+ ///
764+ /// This is set to `true` once a pong was received from the server.
765+ established : bool ,
741766 #[ cfg( test) ]
742767 test_pong : Option < ( [ u8 ; 8 ] , oneshot:: Sender < ( ) > ) > ,
743768}
744769
770+ impl ConnectedRelayState {
771+ fn map_err ( & self , error : anyhow:: Error ) -> RelayConnectionError {
772+ if self . established {
773+ RelayConnectionError :: Established ( error)
774+ } else {
775+ RelayConnectionError :: Handshake ( error)
776+ }
777+ }
778+ }
779+
745780pub ( super ) enum RelayActorMessage {
746781 MaybeCloseRelaysOnRebind ( Vec < IpAddr > ) ,
747782 SetHome { url : RelayUrl } ,
0 commit comments