diff --git a/iroh/examples/transfer.rs b/iroh/examples/transfer.rs index 00cb52c958..5f37ddce87 100644 --- a/iroh/examples/transfer.rs +++ b/iroh/examples/transfer.rs @@ -522,13 +522,21 @@ fn parse_byte_size(s: &str) -> std::result::Result { } fn watch_conn_type(endpoint: &Endpoint, endpoint_id: EndpointId) -> AbortOnDropHandle<()> { - let mut stream = endpoint.conn_type(endpoint_id).unwrap().stream(); + let info = endpoint.remote_info(endpoint_id).unwrap(); + let mut stream = info.selected_path().stream(); let task = tokio::task::spawn(async move { - while let Some(conn_type) = stream.next().await { - println!( - "[{}] Connection type changed to: {conn_type}", - endpoint_id.fmt_short() - ); + while let Some(selected_path) = stream.next().await { + if let Some(selected_path) = selected_path { + let label = match selected_path { + TransportAddr::Ip(addr) => format!("direct ({addr})"), + TransportAddr::Relay(url) => format!("relay ({url})"), + _ => format!("unknown transport"), + }; + println!( + "[{}] Connection type changed to: {label}", + endpoint_id.fmt_short() + ); + } } }); AbortOnDropHandle::new(task) diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index b9695673af..b04b549b60 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -26,7 +26,7 @@ use url::Url; pub use super::magicsock::{ AddEndpointAddrError, ConnectionType, DirectAddr, DirectAddrType, PathInfo, PathsInfo, - endpoint_map::Source, + RemoteInfo, endpoint_map::Source, }; #[cfg(wasm_browser)] use crate::discovery::pkarr::PkarrResolver; @@ -55,8 +55,8 @@ pub use quinn::{ AcceptBi, AcceptUni, AckFrequencyConfig, ApplicationClose, Chunk, ClosedStream, ConnectionClose, ConnectionError, ConnectionStats, MtuDiscoveryConfig, OpenBi, OpenUni, ReadDatagram, ReadError, ReadExactError, ReadToEndError, RecvStream, ResetError, RetryError, - SendDatagramError, SendStream, ServerConfig, StoppedError, StreamId, TransportConfig, VarInt, - WeakConnectionHandle, WriteError, + SendDatagramError, SendStream, ServerConfig, Side, StoppedError, StreamId, TransportConfig, + VarInt, WeakConnectionHandle, WriteError, }; pub use quinn_proto::{ FrameStats, PathStats, TransportError, TransportErrorCode, UdpStats, Written, @@ -69,8 +69,8 @@ pub use quinn_proto::{ pub use self::connection::{ Accept, Accepting, AlpnError, AuthenticationError, Connecting, ConnectingError, Connection, - Incoming, IncomingZeroRttConnection, OutgoingZeroRttConnection, RemoteEndpointIdError, - ZeroRttStatus, + ConnectionInfo, Incoming, IncomingZeroRttConnection, OutgoingZeroRttConnection, + RemoteEndpointIdError, ZeroRttStatus, }; /// The delay to fall back to discovery when direct addresses fail. @@ -98,7 +98,7 @@ pub enum PathSelection { /// new [`EndpointId`]. /// /// To create the [`Endpoint`] call [`Builder::bind`]. -#[derive(Debug)] +#[derive(derive_more::Debug)] pub struct Builder { secret_key: Option, relay_mode: RelayMode, @@ -963,38 +963,24 @@ impl Endpoint { // // Partially they return things passed into the builder. - /// Returns a [`Watcher`] that reports the current connection type and any changes for - /// given remote endpoint. - /// - /// This watcher allows observing a stream of [`ConnectionType`] items by calling - /// [`Watcher::stream()`]. If the underlying connection to a remote endpoint changes, it will - /// yield a new item. These connection changes are when the connection switches between - /// using the Relay server and a direct connection. - /// - /// Note that this does not guarantee each connection change is yielded in the stream. - /// If the connection type changes several times before this stream is polled, only the - /// last recorded state is returned. This can be observed e.g. right at the start of a - /// connection when the switch from a relayed to a direct connection can be so fast that - /// the relayed state is never exposed. + /// Information about a remote endpoint. /// - /// If there is currently a connection with the remote endpoint, then using [`Watcher::get`] - /// will immediately return either [`ConnectionType::Relay`], [`ConnectionType::Direct`] - /// or [`ConnectionType::Mixed`]. + /// From the [`RemoteInfo`] you can watch which path is selected, get the current + /// round-trip time (latency), and get a list of [`ConnectionInfo`]. /// - /// It is possible for the connection type to be [`ConnectionType::None`] if you've - /// recently connected to this endpoint id but previous methods of reaching the endpoint have - /// become inaccessible. - /// - /// Will return `None` if we do not have any address information for the given `endpoint_id`. - pub fn conn_type(&self, endpoint_id: EndpointId) -> Option> { - self.msock.conn_type(endpoint_id) + /// Returns `None` if we don't have any state for this remote. + pub fn remote_info(&self, endpoint_id: EndpointId) -> Option { + self.msock.endpoint_map.remote_info(endpoint_id) } - /// Returns the currently lowest latency for this endpoint. + /// Returns a list of all remote endpoints that this endpoint is dealing with. + /// + /// This includes all endpoints to which we have active connections. It also may include endpoints + /// to which we are in the process of connecting, or have recently been connected to. /// - /// Will return `None` if we do not have any address information for the given `endpoint_id`. - pub async fn latency(&self, endpoint_id: EndpointId) -> Option { - self.msock.latency(endpoint_id).await + /// TODO: Expand docs. + pub fn remotes(&self) -> Vec { + self.msock.endpoint_map.remotes() } /// Returns the DNS resolver used in this [`Endpoint`]. diff --git a/iroh/src/endpoint/connection.rs b/iroh/src/endpoint/connection.rs index e1e46d34b6..3692799766 100644 --- a/iroh/src/endpoint/connection.rs +++ b/iroh/src/endpoint/connection.rs @@ -35,7 +35,7 @@ use n0_watcher::{Watchable, Watcher}; use pin_project::pin_project; use quinn::{ AcceptBi, AcceptUni, ConnectionError, ConnectionStats, OpenBi, OpenUni, ReadDatagram, - RetryError, SendDatagramError, ServerConfig, VarInt, + RetryError, SendDatagramError, ServerConfig, Side, VarInt, WeakConnectionHandle, }; use quinn_proto::PathId; use tracing::warn; @@ -241,15 +241,16 @@ fn conn_from_quinn_conn( let alpn = alpn_from_quinn_conn(&conn).ok_or_else(|| e!(AuthenticationError::NoAlpn))?; let paths_info_watchable = init_paths_info_watcher(&conn, ep); let paths_info = paths_info_watchable.watch(); - // Register this connection with the magicsock. - ep.msock - .register_connection(remote_id, &conn, paths_info_watchable.clone()); - Ok(Connection { + let conn = Connection { remote_id, alpn, inner: conn, paths_info, - }) + }; + let info = conn.to_info(); + // Register this connection with the magicsock. + ep.msock.register_connection(info, paths_info_watchable); + Ok(conn) } fn init_paths_info_watcher(conn: &quinn::Connection, ep: &Endpoint) -> Watchable { @@ -1530,6 +1531,83 @@ impl Connection { pub fn set_max_concurrent_bi_streams(&self, count: VarInt) { self.inner.set_max_concurrent_bi_streams(count) } + + /// Returns the side of the connection (client or server). + pub fn side(&self) -> Side { + self.inner.side() + } + + /// Returns a [`ConnectionInfo`], which is a weak handle to the connection + /// that does not keep the connection alive, but does allow to access some information + /// about the connection, and allows to wait for the connection to be closed. + pub fn to_info(&self) -> ConnectionInfo { + ConnectionInfo { + alpn: self.alpn.clone(), + remote_id: self.remote_id, + inner: self.inner.weak_handle(), + paths_info: self.paths_info.clone(), + side: self.side(), + } + } +} + +/// A [`ConnectionInfo`] is a weak handle to a connection that exposes some information about the connection, +/// but does not keep the connection alive. +#[derive(Debug, Clone)] +pub struct ConnectionInfo { + pub(crate) side: Side, + pub(crate) alpn: Vec, + pub(crate) remote_id: EndpointId, + pub(crate) inner: WeakConnectionHandle, + pub(crate) paths_info: n0_watcher::Direct, +} + +#[allow(missing_docs)] +impl ConnectionInfo { + pub fn alpn(&self) -> &[u8] { + &self.alpn + } + + pub fn remote_id(&self) -> &EndpointId { + &self.remote_id + } + + pub fn is_alive(&self) -> bool { + self.inner.upgrade().is_some() + } + + /// Returns information about the network paths in use by this connection. + /// + /// A connection can have several network paths to the remote endpoint, commonly there + /// will be a path via the relay server and a holepunched path. This returns all the + /// paths in use by this connection. + pub fn paths_info(&self) -> &impl Watcher { + &self.paths_info + } + + // We could add such util methods here, not sure. + pub fn has_direct_path(&mut self) -> bool { + self.paths_info.get().keys().any(|addr| addr.is_ip()) + } + + /// Current best estimate of this connection's latency (round-trip-time) + /// + /// Returns `None` if the connection has been dropped. + pub fn rtt(&self) -> Option { + self.inner.upgrade().map(|conn| conn.rtt()) + } + + /// Returns connection statistics. + /// + /// Returns `None` if the connection has been dropped. + pub fn stats(&self) -> Option { + self.inner.upgrade().map(|conn| conn.stats()) + } + + /// Returns the side of the connection (client or server). + pub fn side(&self) -> Side { + self.side + } } #[cfg(test)] diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index b43f3905d9..8456e59c90 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -63,6 +63,7 @@ use crate::{ defaults::timeouts::NET_REPORT_TIMEOUT, disco::{self, SendAddr}, discovery::{ConcurrentDiscovery, Discovery, EndpointData, UserData}, + endpoint::ConnectionInfo, key::{DecryptionError, SharedSecret, public_ed_box, secret_ed_box}, metrics::EndpointMetrics, net_report::{self, IfStateDetails, Report}, @@ -77,7 +78,7 @@ pub(crate) mod transports; use mapped_addrs::{EndpointIdMappedAddr, MappedAddr}; pub use self::{ - endpoint_map::{ConnectionType, PathInfo, PathsInfo}, + endpoint_map::{ConnectionType, PathInfo, PathsInfo, RemoteInfo}, metrics::Metrics, }; @@ -174,7 +175,7 @@ pub(crate) struct Handle { /// It is usually only necessary to use a single [`MagicSock`] instance in an application, it /// means any QUIC endpoints on top will be sharing as much information about endpoints as /// possible. -#[derive(Debug)] +#[derive(derive_more::Debug)] pub(crate) struct MagicSock { /// Channel to send to the internal actor. actor_sender: mpsc::Sender, @@ -270,8 +271,7 @@ impl MagicSock { /// connection. pub(crate) fn register_connection( &self, - remote: EndpointId, - conn: &quinn::Connection, + conn: ConnectionInfo, paths_info: n0_watcher::Watchable, ) { // TODO: Spawning tasks like this is obviously bad. But it is solvable: @@ -289,9 +289,8 @@ impl MagicSock { // have a ZrttConnection::into_connection() function which can be async and actually // send this. Before the handshake has completed we don't have anything useful to // do with this connection inside of the EndpointStateActor anyway. - let weak_handle = conn.weak_handle(); - let endpoint_state = self.endpoint_map.endpoint_state_actor(remote); - let msg = EndpointStateMessage::AddConnection(weak_handle, paths_info); + let endpoint_state = self.endpoint_map.endpoint_state_actor(conn.remote_id); + let msg = EndpointStateMessage::AddConnection(conn, paths_info); task::spawn(async move { endpoint_state.send(msg).await.ok(); @@ -397,32 +396,6 @@ impl MagicSock { }) } - /// Returns a [`n0_watcher::Direct`] that reports the [`ConnectionType`] we have to the - /// given `endpoint_id`. - /// - /// This gets us a copy of the [`n0_watcher::Direct`] for the [`Watchable`] with a - /// [`ConnectionType`] that the `EndpointMap` stores for each `endpoint_id`'s endpoint. - /// - /// # Errors - /// - /// Will return `None` if there is no address information known about the - /// given `endpoint_id`. - pub(crate) fn conn_type(&self, eid: EndpointId) -> Option> { - self.endpoint_map.conn_type(eid) - } - - // TODO: Build better info to expose to the user about remote nodes. We probably want - // to expose this as part of path information instead. - pub(crate) async fn latency(&self, eid: EndpointId) -> Option { - let endpoint_state = self.endpoint_map.endpoint_state_actor(eid); - let (tx, rx) = oneshot::channel(); - endpoint_state - .send(EndpointStateMessage::Latency(tx)) - .await - .ok(); - rx.await.unwrap_or_default() - } - /// Returns the socket address which can be used by the QUIC layer to dial this endpoint. pub(crate) fn get_endpoint_mapped_addr(&self, eid: EndpointId) -> EndpointIdMappedAddr { self.endpoint_map.endpoint_mapped_addr(eid) diff --git a/iroh/src/magicsock/endpoint_map.rs b/iroh/src/magicsock/endpoint_map.rs index ea33e4770c..727ffe3828 100644 --- a/iroh/src/magicsock/endpoint_map.rs +++ b/iroh/src/magicsock/endpoint_map.rs @@ -17,7 +17,7 @@ use super::{ mapped_addrs::{AddrMap, EndpointIdMappedAddr, MultipathMappedAddr, RelayMappedAddr}, transports::{self, OwnedTransmit, TransportsSender}, }; -use crate::disco::{self}; +use crate::disco; // #[cfg(any(test, feature = "test-utils"))] // use crate::endpoint::PathSelection; @@ -25,7 +25,7 @@ mod endpoint_state; mod path_state; pub(super) use endpoint_state::EndpointStateMessage; -pub use endpoint_state::{ConnectionType, PathInfo, PathsInfo}; +pub use endpoint_state::{ConnectionType, PathInfo, PathsInfo, RemoteInfo}; use endpoint_state::{EndpointStateActor, EndpointStateHandle}; // TODO: use this @@ -122,17 +122,23 @@ impl EndpointMap { } } - /// Returns a [`n0_watcher::Direct`] for given endpoint's [`ConnectionType`]. - /// - /// # Errors - /// - /// Will return `None` if there is not an entry in the [`EndpointMap`] for - /// the `endpoint_id` - pub(super) fn conn_type( - &self, - _endpoint_id: EndpointId, - ) -> Option> { - todo!(); + pub(crate) fn remote_info(&self, eid: EndpointId) -> Option { + self.actor_handles + .lock() + .expect("poisoned") + .get(&eid) + .map(|handle| RemoteInfo::new(eid, handle.sender.clone(), handle.selected_path.watch())) + } + + pub(crate) fn remotes(&self) -> Vec { + self.actor_handles + .lock() + .expect("poisoned") + .iter() + .map(|(eid, handle)| { + RemoteInfo::new(*eid, handle.sender.clone(), handle.selected_path.watch()) + }) + .collect() } /// Returns the sender for the [`EndpointStateActor`]. diff --git a/iroh/src/magicsock/endpoint_map/endpoint_state.rs b/iroh/src/magicsock/endpoint_map/endpoint_state.rs index 8800f997d6..2bf8b1cc37 100644 --- a/iroh/src/magicsock/endpoint_map/endpoint_state.rs +++ b/iroh/src/magicsock/endpoint_map/endpoint_state.rs @@ -13,7 +13,7 @@ use n0_future::{ time::{self, Duration, Instant}, }; use n0_watcher::{Watchable, Watcher}; -use quinn::WeakConnectionHandle; +use quinn::{Side, WeakConnectionHandle}; use quinn_proto::{PathError, PathEvent, PathId, PathStatus}; use rustc_hash::FxHashMap; use serde::{Deserialize, Serialize}; @@ -27,7 +27,7 @@ use super::{Source, TransportsSenderMessage, path_state::PathState}; // use crate::endpoint::PathSelection; use crate::{ disco::{self}, - endpoint::DirectAddr, + endpoint::{ConnectionInfo, DirectAddr}, magicsock::{ DiscoState, HEARTBEAT_INTERVAL, MagicsockMetrics, PATH_MAX_IDLE_TIMEOUT, mapped_addrs::{AddrMap, MappedAddr, MultipathMappedAddr, RelayMappedAddr}, @@ -131,6 +131,7 @@ pub(super) struct EndpointStateActor { /// /// We only select a path once the path is functional in Quinn. selected_path: Option, + pub_selected_path: Watchable>, /// Time at which we should schedule the next holepunch attempt. scheduled_holepunch: Option, /// When to next attempt opening paths in [`Self::pending_open_paths`]. @@ -167,6 +168,7 @@ impl EndpointStateActor { scheduled_holepunch: None, scheduled_open_path: None, pending_open_paths: VecDeque::new(), + pub_selected_path: Default::default(), } } @@ -174,6 +176,7 @@ impl EndpointStateActor { let (tx, rx) = mpsc::channel(16); let me = self.local_endpoint_id; let endpoint_id = self.endpoint_id; + let selected_path = self.pub_selected_path.clone(); // Ideally we'd use the endpoint span as parent. We'd have to plug that span into // here somehow. Instead we have no parent and explicitly set the me attribute. If @@ -195,6 +198,7 @@ impl EndpointStateActor { ); EndpointStateHandle { sender: tx, + selected_path, _task: AbortOnDropHandle::new(task), } } @@ -264,8 +268,8 @@ impl EndpointStateActor { EndpointStateMessage::SendDatagram(transmit) => { self.handle_msg_send_datagram(transmit).await?; } - EndpointStateMessage::AddConnection(handle, paths_info) => { - self.handle_msg_add_connection(handle, paths_info).await; + EndpointStateMessage::AddConnection(info, paths_info) => { + self.handle_msg_add_connection(info, paths_info).await; } EndpointStateMessage::AddEndpointAddr(addr, source) => { self.handle_msg_add_endpoint_addr(addr, source); @@ -285,6 +289,9 @@ impl EndpointStateActor { EndpointStateMessage::Latency(tx) => { self.handle_msg_latency(tx); } + EndpointStateMessage::ConnectionInfos(tx) => { + self.handle_msg_connection_infos(tx); + } } Ok(()) } @@ -322,10 +329,10 @@ impl EndpointStateActor { /// Error returns are fatal and kill the actor. async fn handle_msg_add_connection( &mut self, - handle: WeakConnectionHandle, + info: ConnectionInfo, paths_info: Watchable, ) { - if let Some(conn) = handle.upgrade() { + if let Some(conn) = info.inner.upgrade() { // Remove any conflicting stable_ids from the local state. let conn_id = ConnId(conn.stable_id()); self.connections.remove(&conn_id); @@ -340,7 +347,9 @@ impl EndpointStateActor { self.connections.insert( conn_id, ConnectionState { - handle: handle.clone(), + handle: info.inner.clone(), + alpn: info.alpn, + side: info.side, pub_path_info: paths_info, paths: Default::default(), open_paths: Default::default(), @@ -350,7 +359,10 @@ impl EndpointStateActor { // Store PathId(0), set path_status and select best path, check if holepunching // is needed. - if let Some(conn) = handle.upgrade() { + // TODO(frando): The code here upgraded the handle again (which we already upgraded a couple of lines above) + // Was there a reason for that? + // if let Some(conn) = handle.upgrade() { + { if let Some(path) = conn.path(PathId::ZERO) { if let Some(path_remote) = path .remote_address() @@ -515,12 +527,12 @@ impl EndpointStateActor { if !conn_state.open_paths.contains_key(path_id) { continue; } - if let Some(stats) = conn_state + if let Some(rtt) = conn_state .handle .upgrade() - .and_then(|conn| conn.stats().paths.get(path_id).copied()) + .and_then(|conn| conn.stats().paths.get(path_id).map(|stats| stats.rtt)) { - return Some(stats.rtt); + return Some(rtt); } } None @@ -528,6 +540,21 @@ impl EndpointStateActor { tx.send(rtt).ok(); } + fn handle_msg_connection_infos(&self, tx: oneshot::Sender>) { + let infos: Vec = self + .connections + .values() + .map(|state| ConnectionInfo { + remote_id: self.endpoint_id, + alpn: state.alpn.clone(), + side: state.side, + inner: state.handle.clone(), + paths_info: state.pub_path_info.watch(), + }) + .collect(); + tx.send(infos).ok(); + } + /// Triggers holepunching to the remote endpoint. /// /// This will manage the entire process of holepunching with the remote endpoint. @@ -934,6 +961,7 @@ impl EndpointStateActor { } self.open_path(addr); self.close_redundant_paths(addr); + self.pub_selected_path.set(Some(addr.clone().into())).ok(); } } @@ -999,7 +1027,7 @@ pub(crate) enum EndpointStateMessage { /// needed, any new paths discovered via holepunching will be added. And closed paths /// will be removed etc. #[debug("AddConnection(..)")] - AddConnection(WeakConnectionHandle, Watchable), + AddConnection(ConnectionInfo, Watchable), /// Adds a [`EndpointAddr`] with locations where the endpoint might be reachable. AddEndpointAddr(EndpointAddr, Source), /// Process a received DISCO CallMeMaybe message. @@ -1020,6 +1048,9 @@ pub(crate) enum EndpointStateMessage { /// TODO: This is more of a placeholder message currently. Check MagicSock::latency. #[debug("Latency(..)")] Latency(oneshot::Sender>), + /// Returns info about currently open connections to the remote endpoint. + #[debug("Latency(..)")] + ConnectionInfos(oneshot::Sender>), } /// A handle to a [`EndpointStateActor`]. @@ -1028,6 +1059,7 @@ pub(crate) enum EndpointStateMessage { #[derive(Debug)] pub(super) struct EndpointStateHandle { pub(super) sender: mpsc::Sender, + pub(super) selected_path: Watchable>, _task: AbortOnDropHandle<()>, } @@ -1085,6 +1117,14 @@ struct ConnectionState { handle: WeakConnectionHandle, /// The information we publish to users about the paths used in this connection. pub_path_info: Watchable, + /// The ALPN of the connection. + /// + /// We store this to be able to create a [`ConnectionInfo`] from the [`ConnectionState`]. + alpn: Vec, + /// The side (client or server) + /// + /// We store this to be able to create a [`ConnectionInfo`] from the [`ConnectionState`]. + side: Side, /// The paths that exist on this connection. /// /// This could be in any state, e.g. while still validating the path or already closed @@ -1180,3 +1220,65 @@ fn now_or_never>(fut: F) -> Option { std::task::Poll::Pending => None, } } + +/// Information about a remote endpoint. +#[derive(derive_more::Debug, Clone)] +pub struct RemoteInfo { + endpoint_id: EndpointId, + #[debug("Sender")] + handle: mpsc::Sender, + selected_path: n0_watcher::Direct>, +} + +impl RemoteInfo { + pub(crate) fn new( + endpoint_id: EndpointId, + handle: mpsc::Sender, + selected_path: n0_watcher::Direct>, + ) -> Self { + Self { + endpoint_id, + handle, + selected_path, + } + } + + /// Returns the endpoint id of the remote. + pub fn endpoint_id(&self) -> EndpointId { + self.endpoint_id + } + + /// Returns the round-trip time of the currently selected path. + /// + /// Returns `None` if no path is active. + pub async fn rtt(&self) -> Option { + let (tx, rx) = oneshot::channel(); + if let Err(_) = self.handle.send(EndpointStateMessage::Latency(tx)).await { + return None; + }; + rx.await.ok()? + } + + /// Returns a list of connections that are active with this remote endpoint. + pub async fn connections(&self) -> Vec { + let (tx, rx) = oneshot::channel(); + if let Err(_) = self + .handle + .send(EndpointStateMessage::ConnectionInfos(tx)) + .await + { + return Vec::new(); + }; + rx.await.ok().unwrap_or_default() + } + + /// Returns a watcher over the currently selected main path to the remote. + /// + /// TODO: More docs. + /// * `get` and `stream` + /// * None means no currently active path + /// * Check TransportAddr::is_ip / TransportAddr::is_relay to see if the path is direct or relayed + pub fn selected_path(&self) -> impl Watcher> + use<> { + self.selected_path.clone() + } +}