diff --git a/iroh/src/discovery.rs b/iroh/src/discovery.rs index 0f9aa46bdd..939ed7abfb 100644 --- a/iroh/src/discovery.rs +++ b/iroh/src/discovery.rs @@ -893,12 +893,10 @@ mod tests { let (ep2, _guard2) = new_endpoint(&mut rng, |ep| disco_shared.create_discovery(ep.id())).await; - let ep1_wrong_addr = EndpointAddr { - id: ep1.id(), - addrs: [TransportAddr::Ip("240.0.0.1:1000".parse().unwrap())] - .into_iter() - .collect(), - }; + let ep1_wrong_addr = EndpointAddr::from_parts( + ep1.id(), + [TransportAddr::Ip("240.0.0.1:1000".parse().unwrap())], + ); let _conn = ep2.connect(ep1_wrong_addr, TEST_ALPN).await?; Ok(()) } @@ -1043,10 +1041,7 @@ mod test_dns_pkarr { .await?; println!("resolved {resolved:?}"); - let expected_addr = EndpointAddr { - id: endpoint_id, - addrs: relay_url.into_iter().collect(), - }; + let expected_addr = EndpointAddr::from_parts(endpoint_id, relay_url); assert_eq!(resolved.to_endpoint_addr(), expected_addr); assert_eq!(resolved.user_data(), Some(&user_data)); diff --git a/iroh/src/discovery/static_provider.rs b/iroh/src/discovery/static_provider.rs index 05e16b85d6..edb0c601c0 100644 --- a/iroh/src/discovery/static_provider.rs +++ b/iroh/src/discovery/static_provider.rs @@ -248,12 +248,10 @@ mod tests { .await?; let key = SecretKey::from_bytes(&[0u8; 32]); - let addr = EndpointAddr { - id: key.public(), - addrs: [TransportAddr::Relay("https://example.com".parse()?)] - .into_iter() - .collect(), - }; + let addr = EndpointAddr::from_parts( + key.public(), + [TransportAddr::Relay("https://example.com".parse()?)], + ); let user_data = Some("foobar".parse().unwrap()); let endpoint_info = EndpointInfo::from(addr.clone()).with_user_data(user_data.clone()); discovery.add_endpoint_info(endpoint_info.clone()); @@ -280,12 +278,10 @@ mod tests { async fn test_provenance() -> Result { let discovery = StaticProvider::with_provenance("foo"); let key = SecretKey::from_bytes(&[0u8; 32]); - let addr = EndpointAddr { - id: key.public(), - addrs: [TransportAddr::Relay("https://example.com".parse()?)] - .into_iter() - .collect(), - }; + let addr = EndpointAddr::from_parts( + key.public(), + [TransportAddr::Relay("https://example.com".parse()?)], + ); discovery.add_endpoint_info(addr); let mut stream = discovery.resolve(key.public()).unwrap(); let item = stream.next().await.unwrap()?; diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index 54a8058f45..e2fabf9e3f 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -69,8 +69,8 @@ pub use quinn_proto::{ pub use self::connection::{ Accept, Accepting, AlpnError, AuthenticationError, Connecting, ConnectingError, Connection, - Incoming, IncomingZeroRttConnection, OutgoingZeroRttConnection, RemoteEndpointIdError, - ZeroRttStatus, + ConnectionState, HandshakeCompleted, Incoming, IncomingZeroRtt, IncomingZeroRttConnection, + OutgoingZeroRtt, OutgoingZeroRttConnection, RemoteEndpointIdError, ZeroRttStatus, }; pub use crate::magicsock::transports::TransportConfig; diff --git a/iroh/src/endpoint/connection.rs b/iroh/src/endpoint/connection.rs index e1840dfa05..456ff8e3f8 100644 --- a/iroh/src/endpoint/connection.rs +++ b/iroh/src/endpoint/connection.rs @@ -232,7 +232,7 @@ fn conn_from_quinn_conn( impl Future> + Send + 'static, ConnectingError, > { - let (remote_id, alpn) = match static_info_from_conn(&conn) { + let info = match static_info_from_conn(&conn) { Ok(val) => val, Err(auth_err) => { // If the authentication error raced with a connection error, the connection @@ -246,24 +246,22 @@ fn conn_from_quinn_conn( }; // Register this connection with the magicsock. - let fut = ep.msock.register_connection(remote_id, conn.weak_handle()); + let fut = ep + .msock + .register_connection(info.endpoint_id, conn.weak_handle()); Ok(async move { let paths = fut.await?; Ok(Connection { - paths, - remote_id, - alpn, + data: HandshakeCompletedData { info, paths }, inner: conn, }) }) } -fn static_info_from_conn( - conn: &quinn::Connection, -) -> Result<(EndpointId, Vec), AuthenticationError> { - let remote_id = remote_id_from_quinn_conn(conn)?; +fn static_info_from_conn(conn: &quinn::Connection) -> Result { + let endpoint_id = remote_id_from_quinn_conn(conn)?; let alpn = alpn_from_quinn_conn(conn).ok_or_else(|| e!(AuthenticationError::NoAlpn))?; - Ok((remote_id, alpn)) + Ok(StaticInfo { endpoint_id, alpn }) } /// Returns the [`EndpointId`] from the peer's TLS certificate. @@ -429,7 +427,7 @@ impl Connecting { pub fn into_0rtt(self) -> Result { match self.inner.into_0rtt() { Ok((quinn_conn, zrtt_accepted)) => { - let handshake_completed_fut: BoxFuture<_> = Box::pin({ + let accepted: BoxFuture<_> = Box::pin({ let quinn_conn = quinn_conn.clone(); async move { let accepted = zrtt_accepted.await; @@ -441,9 +439,10 @@ impl Connecting { }) } }); - Ok(OutgoingZeroRttConnection { + let accepted = accepted.shared(); + Ok(Connection { inner: quinn_conn, - handshake_completed_fut: handshake_completed_fut.shared(), + data: OutgoingZeroRttData { accepted }, }) } Err(inner) => Err(Self { inner, ..self }), @@ -523,7 +522,7 @@ impl Accepting { .inner .into_0rtt() .expect("incoming connections can always be converted to 0-RTT"); - let handshake_completed_fut: BoxFuture<_> = Box::pin({ + let accepted: BoxFuture<_> = Box::pin({ let quinn_conn = quinn_conn.clone(); async move { zrtt_accepted.await; @@ -531,9 +530,10 @@ impl Accepting { Ok(conn) } }); + let accepted = accepted.shared(); IncomingZeroRttConnection { inner: quinn_conn, - handshake_completed_fut: handshake_completed_fut.shared(), + data: IncomingZeroRttData { accepted }, } } @@ -577,11 +577,7 @@ impl Future for Accepting { /// /// Look at the [`OutgoingZeroRttConnection::handshake_completed`] method for /// more details. -#[derive(Debug, Clone)] -pub struct OutgoingZeroRttConnection { - inner: quinn::Connection, - handshake_completed_fut: Shared>>, -} +pub type OutgoingZeroRttConnection = Connection; /// Returned from [`OutgoingZeroRttConnection::handshake_completed`]. #[derive(Debug, Clone)] @@ -595,30 +591,116 @@ pub enum ZeroRttStatus { Rejected(Connection), } -impl OutgoingZeroRttConnection { - /// Waits until the full handshake occurs and returns a [`ZeroRttStatus`]. - /// - /// If `ZeroRttStatus::Accepted` is returned, than any streams created before - /// the handshake has completed can still be used. - /// - /// If `ZeroRttStatus::Rejected` is returned, than any streams created before - /// the handshake will error and any data sent should be re-sent on a - /// new stream. - /// - /// This may fail with [`ConnectingError::ConnectionError`], if there was - /// some general failure with the connection, such as a network timeout since - /// we initiated the connection. - /// - /// This may fail with [`ConnectingError::HandshakeFailure`], if the other side - /// doesn't use the right TLS authentication, which usually every iroh endpoint - /// uses and requires. - /// - /// Thus, those errors should only occur if someone connects to you with a - /// modified iroh endpoint or with a plain QUIC client. - pub async fn handshake_completed(self) -> Result { - self.handshake_completed_fut.await - } +/// A QUIC connection on the server-side that can possibly accept 0-RTT data. +/// +/// It is very similar to a `Connection`, but the `IncomingZeroRttConnection::remote_id` +/// and `IncomingZeroRttConnection::alpn` may not be set yet, since the handshake has +/// not necessarily occurred yet. +/// +/// If the `IncomingZeroRttConnection` has rejected 0-RTT or does not have enough information +/// to accept 0-RTT, any received 0-RTT packets will simply be dropped before +/// reaching any receive streams. +/// +/// Any streams that are created to send or receive data can continue to be used +/// even after the handshake has completed and we are no longer in a 0-RTT +/// situation. +/// +/// Use the [`IncomingZeroRttConnection::handshake_completed`] method to get a [`Connection`] from a +/// `IncomingZeroRttConnection`. This waits until 0-RTT connection has completed +/// the handshake and can now confidently derive the ALPN and the +/// [`EndpointId`] of the remote endpoint. +pub type IncomingZeroRttConnection = Connection; + +/// A QUIC connection. +/// +/// If all references to a connection (including every clone of the Connection handle, +/// streams of incoming streams, and the various stream types) have been dropped, then the +/// connection will be automatically closed with an error_code of 0 and an empty reason. You +/// can also close the connection explicitly by calling [`Connection::close`]. +/// +/// Closing the connection immediately abandons efforts to deliver data to the peer. Upon +/// receiving CONNECTION_CLOSE the peer may drop any stream data not yet delivered to the +/// application. [`Connection::close`] describes in more detail how to gracefully close a +/// connection without losing application data. +/// +/// May be cloned to obtain another handle to the same connection. +#[derive(Debug, Clone)] +pub struct Connection { + inner: quinn::Connection, + /// State-specific information + data: State::Data, +} + +#[doc(hidden)] +#[derive(Debug, Clone)] +pub struct HandshakeCompletedData { + info: StaticInfo, + paths: PathsWatcher, +} + +/// Static info from a completed TLS handshake. +#[derive(Debug, Clone)] +struct StaticInfo { + endpoint_id: EndpointId, + alpn: Vec, +} + +#[doc(hidden)] +#[derive(Debug, Clone)] +pub struct IncomingZeroRttData { + accepted: Shared>>, +} + +#[doc(hidden)] +#[derive(Debug, Clone)] +pub struct OutgoingZeroRttData { + accepted: Shared>>, +} + +mod sealed { + pub trait Sealed {} +} + +/// Trait to track the state of a [`Connection`] at compile time. +pub trait ConnectionState: sealed::Sealed { + /// State-specific data stored in the [`Connection`]. + type Data: std::fmt::Debug + Clone; +} + +/// Marker type for a connection that has completed the handshake. +#[derive(Debug, Clone)] +pub struct HandshakeCompleted; + +/// Marker type for a connection that is in the incoming 0-RTT state. +#[derive(Debug, Clone)] +pub struct IncomingZeroRtt; + +/// Marker type for a connection that is in the outgoing 0-RTT state. +#[derive(Debug, Clone)] +pub struct OutgoingZeroRtt; + +impl sealed::Sealed for HandshakeCompleted {} +impl ConnectionState for HandshakeCompleted { + type Data = HandshakeCompletedData; +} + +impl sealed::Sealed for IncomingZeroRtt {} +impl ConnectionState for IncomingZeroRtt { + type Data = IncomingZeroRttData; +} + +impl sealed::Sealed for OutgoingZeroRtt {} +impl ConnectionState for OutgoingZeroRtt { + type Data = OutgoingZeroRttData; +} + +#[allow(missing_docs)] +#[stack_error(add_meta, derive)] +#[error("Protocol error: no remote id available")] +#[derive(Clone)] +pub struct RemoteEndpointIdError; +impl Connection { /// Initiates a new outgoing unidirectional stream. /// /// Streams are cheap and instantaneous to open unless blocked by flow control. As a @@ -806,11 +888,6 @@ impl OutgoingZeroRttConnection { self.inner.handshake_data() } - /// Extracts the ALPN protocol from the peer's handshake data. - pub fn alpn(&self) -> Option> { - alpn_from_quinn_conn(&self.inner) - } - /// Cryptographic identity of the peer. /// /// The dynamic type returned is determined by the configured [`Session`]. For the @@ -824,18 +901,6 @@ impl OutgoingZeroRttConnection { self.inner.peer_identity() } - /// Returns the [`EndpointId`] from the peer's TLS certificate. - /// - /// The [`PublicKey`] of an endpoint is also known as an [`EndpointId`]. This [`PublicKey`] is - /// included in the TLS certificate presented during the handshake when connecting. - /// This function allows you to get the [`EndpointId`] of the remote endpoint of this - /// connection. - /// - /// [`PublicKey`]: iroh_base::PublicKey - pub fn remote_id(&self) -> Result { - remote_id_from_quinn_conn(&self.inner) - } - /// A stable identifier for this connection. /// /// Peer addresses and connection IDs can change, but this value will remain fixed for @@ -888,627 +953,92 @@ impl OutgoingZeroRttConnection { } } -/// A QUIC connection on the server-side that can possibly accept 0-RTT data. -/// -/// It is very similar to a `Connection`, but the `IncomingZeroRttConnection::remote_id` -/// and `IncomingZeroRttConnection::alpn` may not be set yet, since the handshake has -/// not necessarily occurred yet. -/// -/// If the `IncomingZeroRttConnection` has rejected 0-RTT or does not have enough information -/// to accept 0-RTT, any received 0-RTT packets will simply be dropped before -/// reaching any receive streams. -/// -/// Any streams that are created to send or receive data can continue to be used -/// even after the handshake has completed and we are no longer in a 0-RTT -/// situation. -/// -/// Use the [`IncomingZeroRttConnection::handshake_completed`] method to get a [`Connection`] from a -/// `IncomingZeroRttConnection`. This waits until 0-RTT connection has completed -/// the handshake and can now confidently derive the ALPN and the -/// [`EndpointId`] of the remote endpoint. -#[derive(Debug)] -pub struct IncomingZeroRttConnection { - inner: quinn::Connection, - handshake_completed_fut: Shared>>, -} +impl Connection { + /// Extracts the ALPN protocol from the peer's handshake data. + pub fn alpn(&self) -> &[u8] { + &self.data.info.alpn + } -impl IncomingZeroRttConnection { - /// Waits until the full handshake occurs and then returns a [`Connection`]. - /// - /// This may fail with [`ConnectingError::ConnectionError`], if there was - /// some general failure with the connection, such as a network timeout since - /// we accepted the connection. + /// Returns the [`EndpointId`] from the peer's TLS certificate. /// - /// This may fail with [`ConnectingError::HandshakeFailure`], if the other side - /// doesn't use the right TLS authentication, which usually every iroh endpoint - /// uses and requires. + /// The [`PublicKey`] of an endpoint is also known as an [`EndpointId`]. This [`PublicKey`] is + /// included in the TLS certificate presented during the handshake when connecting. + /// This function allows you to get the [`EndpointId`] of the remote endpoint of this + /// connection. /// - /// Thus, those errors should only occur if someone connects to you with a - /// modified iroh endpoint or with a plain QUIC client. - pub async fn handshake_completed(self) -> Result { - self.handshake_completed_fut.await + /// [`PublicKey`]: iroh_base::PublicKey + pub fn remote_id(&self) -> EndpointId { + self.data.info.endpoint_id } - /// Initiates a new outgoing unidirectional stream. + /// Returns a [`Watcher`] for the network paths of this connection. /// - /// Streams are cheap and instantaneous to open unless blocked by flow control. As a - /// consequence, the peer won’t be notified that a stream has been opened until the - /// stream is actually used. - #[inline] - pub fn open_uni(&self) -> OpenUni<'_> { - self.inner.open_uni() - } - - /// Initiates a new outgoing bidirectional stream. + /// A connection can have several network paths to the remote endpoint, commonly there + /// will be a path via the relay server and a holepunched path. /// - /// Streams are cheap and instantaneous to open unless blocked by flow control. As a - /// consequence, the peer won't be notified that a stream has been opened until the - /// stream is actually used. Calling [`open_bi`] then waiting on the [`RecvStream`] - /// without writing anything to [`SendStream`] will never succeed. + /// The watcher is updated whenever a path is opened or closed, or when the path selected + /// for transmission changes (see [`PathInfo::is_selected`]). /// - /// [`open_bi`]: Connection::open_bi - /// [`SendStream`]: quinn::SendStream - /// [`RecvStream`]: quinn::RecvStream - #[inline] - pub fn open_bi(&self) -> OpenBi<'_> { - self.inner.open_bi() + /// The [`PathInfoList`] returned from the watcher contains a [`PathInfo`] for each + /// transmission path. + /// + /// [`PathInfo::is_selected`]: crate::magicsock::PathInfo::is_selected + /// [`PathInfo`]: crate::magicsock::PathInfo + pub fn paths(&self) -> impl Watcher + Unpin + Send + Sync + 'static { + self.data.paths.clone() } +} - /// Accepts the next incoming uni-directional stream. - #[inline] - pub fn accept_uni(&self) -> AcceptUni<'_> { - self.inner.accept_uni() +impl Connection { + /// Extracts the ALPN protocol from the peer's handshake data. + pub fn alpn(&self) -> Option> { + alpn_from_quinn_conn(&self.inner) } - /// Accept the next incoming bidirectional stream. + /// Waits until the full handshake occurs and then returns a [`Connection`]. /// - /// **Important Note**: The peer that calls [`open_bi`] must write to its [`SendStream`] - /// before the peer `Connection` is able to accept the stream using - /// `accept_bi()`. Calling [`open_bi`] then waiting on the [`RecvStream`] without - /// writing anything to the connected [`SendStream`] will never succeed. + /// This may fail with [`ConnectingError::ConnectionError`], if there was + /// some general failure with the connection, such as a network timeout since + /// we accepted the connection. /// - /// [`open_bi`]: Connection::open_bi - /// [`SendStream`]: quinn::SendStream - /// [`RecvStream`]: quinn::RecvStream - #[inline] - pub fn accept_bi(&self) -> AcceptBi<'_> { - self.inner.accept_bi() - } - - /// Receives an application datagram. - #[inline] - pub fn read_datagram(&self) -> ReadDatagram<'_> { - self.inner.read_datagram() - } - - /// Wait for the connection to be closed for any reason. + /// This may fail with [`ConnectingError::HandshakeFailure`], if the other side + /// doesn't use the right TLS authentication, which usually every iroh endpoint + /// uses and requires. /// - /// Despite the return type's name, closed connections are often not an error condition - /// at the application layer. Cases that might be routine include - /// [`ConnectionError::LocallyClosed`] and [`ConnectionError::ApplicationClosed`]. - #[inline] - pub async fn closed(&self) -> ConnectionError { - self.inner.closed().await + /// Thus, those errors should only occur if someone connects to you with a + /// modified iroh endpoint or with a plain QUIC client. + pub async fn handshake_completed(&self) -> Result { + self.data.accepted.clone().await } +} - /// If the connection is closed, the reason why. - /// - /// Returns `None` if the connection is still open. - #[inline] - pub fn close_reason(&self) -> Option { - self.inner.close_reason() +impl Connection { + /// Extracts the ALPN protocol from the peer's handshake data. + pub fn alpn(&self) -> Option> { + alpn_from_quinn_conn(&self.inner) } - /// Closes the connection immediately. + /// Waits until the full handshake occurs and returns a [`ZeroRttStatus`]. /// - /// Pending operations will fail immediately with [`ConnectionError::LocallyClosed`]. No - /// more data is sent to the peer and the peer may drop buffered data upon receiving the - /// CONNECTION_CLOSE frame. + /// If `ZeroRttStatus::Accepted` is returned, than any streams created before + /// the handshake has completed can still be used. /// - /// `error_code` and `reason` are not interpreted, and are provided directly to the - /// peer. + /// If `ZeroRttStatus::Rejected` is returned, than any streams created before + /// the handshake will error and any data sent should be re-sent on a + /// new stream. /// - /// `reason` will be truncated to fit in a single packet with overhead; to improve odds - /// that it is preserved in full, it should be kept under 1KiB. + /// This may fail with [`ConnectingError::ConnectionError`], if there was + /// some general failure with the connection, such as a network timeout since + /// we initiated the connection. /// - /// # Gracefully closing a connection + /// This may fail with [`ConnectingError::HandshakeFailure`], if the other side + /// doesn't use the right TLS authentication, which usually every iroh endpoint + /// uses and requires. /// - /// Only the peer last receiving application data can be certain that all data is - /// delivered. The only reliable action it can then take is to close the connection, - /// potentially with a custom error code. The delivery of the final CONNECTION_CLOSE - /// frame is very likely if both endpoints stay online long enough, calling - /// [`Endpoint::close`] will wait to provide sufficient time. Otherwise, the remote peer - /// will time out the connection, provided that the idle timeout is not disabled. - /// - /// The sending side can not guarantee all stream data is delivered to the remote - /// application. It only knows the data is delivered to the QUIC stack of the remote - /// endpoint. Once the local side sends a CONNECTION_CLOSE frame in response to calling - /// [`close`] the remote endpoint may drop any data it received but is as yet - /// undelivered to the application, including data that was acknowledged as received to - /// the local endpoint. - /// - /// [`close`]: Connection::close - #[inline] - pub fn close(&self, error_code: VarInt, reason: &[u8]) { - self.inner.close(error_code, reason) - } - - /// Transmits `data` as an unreliable, unordered application datagram. - /// - /// Application datagrams are a low-level primitive. They may be lost or delivered out - /// of order, and `data` must both fit inside a single QUIC packet and be smaller than - /// the maximum dictated by the peer. - #[inline] - pub fn send_datagram(&self, data: bytes::Bytes) -> Result<(), SendDatagramError> { - self.inner.send_datagram(data) - } - - // TODO: It seems `SendDatagram` is not yet exposed by quinn. This has been fixed - // upstream and will be in the next release. - // /// Transmits `data` as an unreliable, unordered application datagram - // /// - // /// Unlike [`send_datagram()`], this method will wait for buffer space during congestion - // /// conditions, which effectively prioritizes old datagrams over new datagrams. - // /// - // /// See [`send_datagram()`] for details. - // /// - // /// [`send_datagram()`]: Connection::send_datagram - // #[inline] - // pub fn send_datagram_wait(&self, data: bytes::Bytes) -> SendDatagram<'_> { - // self.inner.send_datagram_wait(data) - // } - - /// Computes the maximum size of datagrams that may be passed to [`send_datagram`]. - /// - /// Returns `None` if datagrams are unsupported by the peer or disabled locally. - /// - /// This may change over the lifetime of a connection according to variation in the path - /// MTU estimate. The peer can also enforce an arbitrarily small fixed limit, but if the - /// peer's limit is large this is guaranteed to be a little over a kilobyte at minimum. - /// - /// Not necessarily the maximum size of received datagrams. - /// - /// [`send_datagram`]: Self::send_datagram - #[inline] - pub fn max_datagram_size(&self) -> Option { - self.inner.max_datagram_size() - } - - /// Bytes available in the outgoing datagram buffer. - /// - /// When greater than zero, calling [`send_datagram`] with a - /// datagram of at most this size is guaranteed not to cause older datagrams to be - /// dropped. - /// - /// [`send_datagram`]: Self::send_datagram - #[inline] - pub fn datagram_send_buffer_space(&self) -> usize { - self.inner.datagram_send_buffer_space() - } - - /// Current best estimate of this connection's latency (round-trip-time). - #[inline] - pub fn rtt(&self) -> Duration { - self.inner.rtt() - } - - /// Returns connection statistics. - #[inline] - pub fn stats(&self) -> ConnectionStats { - self.inner.stats() - } - - /// Current state of the congestion control algorithm, for debugging purposes. - #[inline] - pub fn congestion_state(&self) -> Box { - self.inner.congestion_state() - } - - /// Parameters negotiated during the handshake. - /// - /// Guaranteed to return `Some` on fully established connections or after - /// [`Connecting::handshake_data()`] succeeds. See that method's documentations for - /// details on the returned value. - /// - /// [`Connection::handshake_data()`]: crate::endpoint::Connecting::handshake_data - #[inline] - pub fn handshake_data(&self) -> Option> { - self.inner.handshake_data() - } - - /// Extracts the ALPN protocol from the peer's handshake data. - pub fn alpn(&self) -> Option> { - alpn_from_quinn_conn(&self.inner) - } - - /// Cryptographic identity of the peer. - /// - /// The dynamic type returned is determined by the configured [`Session`]. For the - /// default `rustls` session, the return value can be [`downcast`] to a - /// Vec<[rustls::pki_types::CertificateDer]> - /// - /// [`Session`]: quinn_proto::crypto::Session - /// [`downcast`]: Box::downcast - #[inline] - pub fn peer_identity(&self) -> Option> { - self.inner.peer_identity() - } - - /// Returns the [`EndpointId`] from the peer's TLS certificate. - /// - /// The [`PublicKey`] of an endpoint is also known as an [`EndpointId`]. This [`PublicKey`] is - /// included in the TLS certificate presented during the handshake when connecting. - /// This function allows you to get the [`EndpointId`] of the remote endpoint of this - /// connection. - /// - /// [`PublicKey`]: iroh_base::PublicKey - pub fn remote_id(&self) -> Result { - remote_id_from_quinn_conn(&self.inner) - } - - /// A stable identifier for this connection. - /// - /// Peer addresses and connection IDs can change, but this value will remain fixed for - /// the lifetime of the connection. - #[inline] - pub fn stable_id(&self) -> usize { - self.inner.stable_id() - } - - /// Derives keying material from this connection's TLS session secrets. - /// - /// When both peers call this method with the same `label` and `context` - /// arguments and `output` buffers of equal length, they will get the - /// same sequence of bytes in `output`. These bytes are cryptographically - /// strong and pseudorandom, and are suitable for use as keying material. - /// - /// See [RFC5705](https://tools.ietf.org/html/rfc5705) for more information. - #[inline] - pub fn export_keying_material( - &self, - output: &mut [u8], - label: &[u8], - context: &[u8], - ) -> Result<(), quinn_proto::crypto::ExportKeyingMaterialError> { - self.inner.export_keying_material(output, label, context) - } - - /// Modifies the number of unidirectional streams that may be concurrently opened. - /// - /// No streams may be opened by the peer unless fewer than `count` are already - /// open. Large `count`s increase both minimum and worst-case memory consumption. - #[inline] - pub fn set_max_concurrent_uni_streams(&self, count: VarInt) { - self.inner.set_max_concurrent_uni_streams(count) - } - - /// See [`quinn_proto::TransportConfig::receive_window`]. - #[inline] - pub fn set_receive_window(&self, receive_window: VarInt) { - self.inner.set_receive_window(receive_window) - } - - /// Modifies the number of bidirectional streams that may be concurrently opened. - /// - /// No streams may be opened by the peer unless fewer than `count` are already - /// open. Large `count`s increase both minimum and worst-case memory consumption. - #[inline] - pub fn set_max_concurrent_bi_streams(&self, count: VarInt) { - self.inner.set_max_concurrent_bi_streams(count) - } -} - -/// A QUIC connection. -/// -/// If all references to a connection (including every clone of the Connection handle, -/// streams of incoming streams, and the various stream types) have been dropped, then the -/// connection will be automatically closed with an error_code of 0 and an empty reason. You -/// can also close the connection explicitly by calling [`Connection::close`]. -/// -/// Closing the connection immediately abandons efforts to deliver data to the peer. Upon -/// receiving CONNECTION_CLOSE the peer may drop any stream data not yet delivered to the -/// application. [`Connection::close`] describes in more detail how to gracefully close a -/// connection without losing application data. -/// -/// May be cloned to obtain another handle to the same connection. -#[derive(derive_more::Debug, Clone)] -pub struct Connection { - inner: quinn::Connection, - remote_id: EndpointId, - alpn: Vec, - paths: PathsWatcher, -} - -#[allow(missing_docs)] -#[stack_error(add_meta, derive)] -#[error("Protocol error: no remote id available")] -#[derive(Clone)] -pub struct RemoteEndpointIdError; - -impl Connection { - /// Initiates a new outgoing unidirectional stream. - /// - /// Streams are cheap and instantaneous to open unless blocked by flow control. As a - /// consequence, the peer won’t be notified that a stream has been opened until the - /// stream is actually used. - #[inline] - pub fn open_uni(&self) -> OpenUni<'_> { - self.inner.open_uni() - } - - /// Initiates a new outgoing bidirectional stream. - /// - /// Streams are cheap and instantaneous to open unless blocked by flow control. As a - /// consequence, the peer won't be notified that a stream has been opened until the - /// stream is actually used. Calling [`open_bi`] then waiting on the [`RecvStream`] - /// without writing anything to [`SendStream`] will never succeed. - /// - /// [`open_bi`]: Connection::open_bi - /// [`SendStream`]: quinn::SendStream - /// [`RecvStream`]: quinn::RecvStream - #[inline] - pub fn open_bi(&self) -> OpenBi<'_> { - self.inner.open_bi() - } - - /// Accepts the next incoming uni-directional stream. - #[inline] - pub fn accept_uni(&self) -> AcceptUni<'_> { - self.inner.accept_uni() - } - - /// Accept the next incoming bidirectional stream. - /// - /// **Important Note**: The peer that calls [`open_bi`] must write to its [`SendStream`] - /// before the peer `Connection` is able to accept the stream using - /// `accept_bi()`. Calling [`open_bi`] then waiting on the [`RecvStream`] without - /// writing anything to the connected [`SendStream`] will never succeed. - /// - /// [`open_bi`]: Connection::open_bi - /// [`SendStream`]: quinn::SendStream - /// [`RecvStream`]: quinn::RecvStream - #[inline] - pub fn accept_bi(&self) -> AcceptBi<'_> { - self.inner.accept_bi() - } - - /// Receives an application datagram. - #[inline] - pub fn read_datagram(&self) -> ReadDatagram<'_> { - self.inner.read_datagram() - } - - /// Wait for the connection to be closed for any reason. - /// - /// Despite the return type's name, closed connections are often not an error condition - /// at the application layer. Cases that might be routine include - /// [`ConnectionError::LocallyClosed`] and [`ConnectionError::ApplicationClosed`]. - #[inline] - pub async fn closed(&self) -> ConnectionError { - self.inner.closed().await - } - - /// If the connection is closed, the reason why. - /// - /// Returns `None` if the connection is still open. - #[inline] - pub fn close_reason(&self) -> Option { - self.inner.close_reason() - } - - /// Closes the connection immediately. - /// - /// Pending operations will fail immediately with [`ConnectionError::LocallyClosed`]. No - /// more data is sent to the peer and the peer may drop buffered data upon receiving the - /// CONNECTION_CLOSE frame. - /// - /// `error_code` and `reason` are not interpreted, and are provided directly to the - /// peer. - /// - /// `reason` will be truncated to fit in a single packet with overhead; to improve odds - /// that it is preserved in full, it should be kept under 1KiB. - /// - /// # Gracefully closing a connection - /// - /// Only the peer last receiving application data can be certain that all data is - /// delivered. The only reliable action it can then take is to close the connection, - /// potentially with a custom error code. The delivery of the final CONNECTION_CLOSE - /// frame is very likely if both endpoints stay online long enough, calling - /// [`Endpoint::close`] will wait to provide sufficient time. Otherwise, the remote peer - /// will time out the connection, provided that the idle timeout is not disabled. - /// - /// The sending side can not guarantee all stream data is delivered to the remote - /// application. It only knows the data is delivered to the QUIC stack of the remote - /// endpoint. Once the local side sends a CONNECTION_CLOSE frame in response to calling - /// [`close`] the remote endpoint may drop any data it received but is as yet - /// undelivered to the application, including data that was acknowledged as received to - /// the local endpoint. - /// - /// [`close`]: Connection::close - #[inline] - pub fn close(&self, error_code: VarInt, reason: &[u8]) { - self.inner.close(error_code, reason) - } - - /// Transmits `data` as an unreliable, unordered application datagram. - /// - /// Application datagrams are a low-level primitive. They may be lost or delivered out - /// of order, and `data` must both fit inside a single QUIC packet and be smaller than - /// the maximum dictated by the peer. - #[inline] - pub fn send_datagram(&self, data: bytes::Bytes) -> Result<(), SendDatagramError> { - self.inner.send_datagram(data) - } - - // TODO: It seems `SendDatagram` is not yet exposed by quinn. This has been fixed - // upstream and will be in the next release. - // /// Transmits `data` as an unreliable, unordered application datagram - // /// - // /// Unlike [`send_datagram()`], this method will wait for buffer space during congestion - // /// conditions, which effectively prioritizes old datagrams over new datagrams. - // /// - // /// See [`send_datagram()`] for details. - // /// - // /// [`send_datagram()`]: Connection::send_datagram - // #[inline] - // pub fn send_datagram_wait(&self, data: bytes::Bytes) -> SendDatagram<'_> { - // self.inner.send_datagram_wait(data) - // } - - /// Computes the maximum size of datagrams that may be passed to [`send_datagram`]. - /// - /// Returns `None` if datagrams are unsupported by the peer or disabled locally. - /// - /// This may change over the lifetime of a connection according to variation in the path - /// MTU estimate. The peer can also enforce an arbitrarily small fixed limit, but if the - /// peer's limit is large this is guaranteed to be a little over a kilobyte at minimum. - /// - /// Not necessarily the maximum size of received datagrams. - /// - /// [`send_datagram`]: Self::send_datagram - #[inline] - pub fn max_datagram_size(&self) -> Option { - self.inner.max_datagram_size() - } - - /// Bytes available in the outgoing datagram buffer. - /// - /// When greater than zero, calling [`send_datagram`] with a - /// datagram of at most this size is guaranteed not to cause older datagrams to be - /// dropped. - /// - /// [`send_datagram`]: Self::send_datagram - #[inline] - pub fn datagram_send_buffer_space(&self) -> usize { - self.inner.datagram_send_buffer_space() - } - - /// Current best estimate of this connection's latency (round-trip-time). - #[inline] - pub fn rtt(&self) -> Duration { - self.inner.rtt() - } - - /// Returns connection statistics. - #[inline] - pub fn stats(&self) -> ConnectionStats { - self.inner.stats() - } - - /// Current state of the congestion control algorithm, for debugging purposes. - #[inline] - pub fn congestion_state(&self) -> Box { - self.inner.congestion_state() - } - - /// Parameters negotiated during the handshake. - /// - /// Guaranteed to return `Some` on fully established connections or after - /// [`Connecting::handshake_data()`] succeeds. See that method's documentations for - /// details on the returned value. - /// - /// [`Connection::handshake_data()`]: crate::endpoint::Connecting::handshake_data - #[inline] - pub fn handshake_data(&self) -> Option> { - self.inner.handshake_data() - } - - /// Extracts the ALPN protocol from the peer's handshake data. - pub fn alpn(&self) -> &[u8] { - &self.alpn - } - - /// Cryptographic identity of the peer. - /// - /// The dynamic type returned is determined by the configured [`Session`]. For the - /// default `rustls` session, the return value can be [`downcast`] to a - /// Vec<[rustls::pki_types::CertificateDer]> - /// - /// [`Session`]: quinn_proto::crypto::Session - /// [`downcast`]: Box::downcast - #[inline] - pub fn peer_identity(&self) -> Option> { - self.inner.peer_identity() - } - - /// Returns the [`EndpointId`] from the peer's TLS certificate. - /// - /// The [`PublicKey`] of an endpoint is also known as an [`EndpointId`]. This [`PublicKey`] is - /// included in the TLS certificate presented during the handshake when connecting. - /// This function allows you to get the [`EndpointId`] of the remote endpoint of this - /// connection. - /// - /// [`PublicKey`]: iroh_base::PublicKey - pub fn remote_id(&self) -> EndpointId { - self.remote_id - } - - /// A stable identifier for this connection. - /// - /// Peer addresses and connection IDs can change, but this value will remain fixed for - /// the lifetime of the connection. - #[inline] - pub fn stable_id(&self) -> usize { - self.inner.stable_id() - } - - /// Returns a [`Watcher`] for the network paths of this connection. - /// - /// A connection can have several network paths to the remote endpoint, commonly there - /// will be a path via the relay server and a holepunched path. - /// - /// The watcher is updated whenever a path is opened or closed, or when the path selected - /// for transmission changes (see [`PathInfo::is_selected`]). - /// - /// The [`PathInfoList`] returned from the watcher contains a [`PathInfo`] for each - /// transmission path. - /// - /// [`PathInfo::is_selected`]: crate::magicsock::PathInfo::is_selected - /// [`PathInfo`]: crate::magicsock::PathInfo - pub fn paths(&self) -> impl Watcher + Unpin + Send + Sync + 'static { - self.paths.clone() - } - - /// Derives keying material from this connection's TLS session secrets. - /// - /// When both peers call this method with the same `label` and `context` - /// arguments and `output` buffers of equal length, they will get the - /// same sequence of bytes in `output`. These bytes are cryptographically - /// strong and pseudorandom, and are suitable for use as keying material. - /// - /// See [RFC5705](https://tools.ietf.org/html/rfc5705) for more information. - #[inline] - pub fn export_keying_material( - &self, - output: &mut [u8], - label: &[u8], - context: &[u8], - ) -> Result<(), quinn_proto::crypto::ExportKeyingMaterialError> { - self.inner.export_keying_material(output, label, context) - } - - /// Modifies the number of unidirectional streams that may be concurrently opened. - /// - /// No streams may be opened by the peer unless fewer than `count` are already - /// open. Large `count`s increase both minimum and worst-case memory consumption. - #[inline] - pub fn set_max_concurrent_uni_streams(&self, count: VarInt) { - self.inner.set_max_concurrent_uni_streams(count) - } - - /// See [`quinn_proto::TransportConfig::receive_window`]. - #[inline] - pub fn set_receive_window(&self, receive_window: VarInt) { - self.inner.set_receive_window(receive_window) - } - - /// Modifies the number of bidirectional streams that may be concurrently opened. - /// - /// No streams may be opened by the peer unless fewer than `count` are already - /// open. Large `count`s increase both minimum and worst-case memory consumption. - #[inline] - pub fn set_max_concurrent_bi_streams(&self, count: VarInt) { - self.inner.set_max_concurrent_bi_streams(count) + /// Thus, those errors should only occur if someone connects to you with a + /// modified iroh endpoint or with a plain QUIC client. + pub async fn handshake_completed(&self) -> Result { + self.data.accepted.clone().await } } diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index cdc1111b1f..33ca0ccf55 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -2430,12 +2430,8 @@ mod tests { .ip_addrs() .get() .into_iter() - .map(|x| TransportAddr::Ip(x.addr)) - .collect(); - let endpoint_addr_2 = EndpointAddr { - id: endpoint_id_2, - addrs, - }; + .map(|x| TransportAddr::Ip(x.addr)); + let endpoint_addr_2 = EndpointAddr::from_parts(endpoint_id_2, addrs); msock_1 .add_endpoint_addr( endpoint_addr_2, @@ -2509,10 +2505,7 @@ mod tests { msock_1 .remote_map .add_endpoint_addr( - EndpointAddr { - id: endpoint_id_2, - addrs: Default::default(), - }, + EndpointAddr::from_parts(endpoint_id_2, []), Source::NamedApp { name: "test".into(), }, @@ -2544,18 +2537,15 @@ mod tests { info!("first connect timed out as expected"); // Provide correct addressing information + let addrs = msock_2 + .ip_addrs() + .get() + .into_iter() + .map(|x| TransportAddr::Ip(x.addr)); msock_1 .remote_map .add_endpoint_addr( - EndpointAddr { - id: endpoint_id_2, - addrs: msock_2 - .ip_addrs() - .get() - .into_iter() - .map(|x| TransportAddr::Ip(x.addr)) - .collect(), - }, + EndpointAddr::from_parts(endpoint_id_2, addrs), Source::NamedApp { name: "test".into(), }, diff --git a/iroh/src/magicsock/transports/relay/actor.rs b/iroh/src/magicsock/transports/relay/actor.rs index 55b908cef8..f26b3d7333 100644 --- a/iroh/src/magicsock/transports/relay/actor.rs +++ b/iroh/src/magicsock/transports/relay/actor.rs @@ -1459,6 +1459,7 @@ mod tests { #[tokio::test] #[traced_test] + #[ignore = "flaky"] async fn test_active_relay_inactive() -> Result { let (_relay_map, relay_url, _server) = test_utils::run_relay_server().await?;