From b4f3dc389ef7268c063b87cccefab8a78a7b51dd Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Tue, 11 Nov 2025 11:31:42 +0100 Subject: [PATCH 1/3] refactor: remove the TransportsSenderActor --- iroh/src/magicsock.rs | 3 +- iroh/src/magicsock/endpoint_map.rs | 89 ++----------------- .../magicsock/endpoint_map/endpoint_state.rs | 40 +++++---- iroh/src/magicsock/transports.rs | 6 +- iroh/src/magicsock/transports/ip.rs | 16 +++- 5 files changed, 48 insertions(+), 106 deletions(-) diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index 3f417f5b617..11c673c088c 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -1011,15 +1011,14 @@ impl Handle { let (disco, disco_receiver) = DiscoState::new(&secret_key); let endpoint_map = { - let sender = transports.create_sender(); EndpointMap::new( secret_key.public(), // #[cfg(any(test, feature = "test-utils"))] // path_selection, metrics.magicsock.clone(), - sender, direct_addrs.addrs.watch(), disco.clone(), + transports.create_sender(), ) }; diff --git a/iroh/src/magicsock/endpoint_map.rs b/iroh/src/magicsock/endpoint_map.rs index 7c4b80d4fc6..c018e04b70f 100644 --- a/iroh/src/magicsock/endpoint_map.rs +++ b/iroh/src/magicsock/endpoint_map.rs @@ -6,16 +6,15 @@ use std::{ }; use iroh_base::{EndpointAddr, EndpointId, RelayUrl}; -use n0_future::task::{self, AbortOnDropHandle}; use rustc_hash::FxHashMap; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; -use tracing::{Instrument, info_span, trace, warn}; +use tracing::warn; use super::{ DirectAddr, DiscoState, MagicsockMetrics, mapped_addrs::{AddrMap, EndpointIdMappedAddr, RelayMappedAddr}, - transports::{self, OwnedTransmit, TransportsSender}, + transports::{self, TransportsSender}, }; use crate::disco::{self}; // #[cfg(any(test, feature = "test-utils"))] @@ -58,10 +57,9 @@ pub(crate) struct EndpointMap { /// The endpoint ID of the local endpoint. local_endpoint_id: EndpointId, metrics: Arc, - /// Handle to an actor that can send over the transports. - transports_handle: TransportsSenderHandle, local_addrs: n0_watcher::Direct>, disco: DiscoState, + sender: TransportsSender, } impl EndpointMap { @@ -71,9 +69,10 @@ impl EndpointMap { // TODO: // #[cfg(any(test, feature = "test-utils"))] path_selection: PathSelection, metrics: Arc, - sender: TransportsSender, + local_addrs: n0_watcher::Direct>, disco: DiscoState, + sender: TransportsSender, ) -> Self { Self { actor_handles: Mutex::new(FxHashMap::default()), @@ -81,9 +80,9 @@ impl EndpointMap { relay_mapped_addrs: Default::default(), local_endpoint_id, metrics, - transports_handle: TransportsSenderActor::new(sender).start(), local_addrs, disco, + sender, } } @@ -135,18 +134,17 @@ impl EndpointMap { Some(handle) => handle.sender.clone(), None => { // Create a new EndpointStateActor and insert it into the endpoint map. - let sender = self.transports_handle.inbox.clone(); let local_addrs = self.local_addrs.clone(); let disco = self.disco.clone(); let metrics = self.metrics.clone(); let actor = EndpointStateActor::new( eid, self.local_endpoint_id, - sender, local_addrs, disco, self.relay_mapped_addrs.clone(), metrics, + self.sender.clone(), ); let handle = actor.start(); let sender = handle.sender.clone(); @@ -282,79 +280,6 @@ impl IpPort { } } -/// An actor that can send datagrams onto iroh transports. -/// -/// The [`EndpointStateActor`]s want to be able to send datagrams. Because we can not create -/// [`TransportsSender`]s on demand we must share one for the entire [`EndpointMap`], which -/// lives in this actor. -/// -/// [`EndpointStateActor`]: endpoint_state::EndpointStateActor -#[derive(Debug)] -struct TransportsSenderActor { - sender: TransportsSender, -} - -impl TransportsSenderActor { - fn new(sender: TransportsSender) -> Self { - Self { sender } - } - - fn start(self) -> TransportsSenderHandle { - // This actor gets an inbox size of exactly 1. This is the same as if they had the - // underlying sender directly: either you can send or not, or you await until you - // can. No need to introduce extra buffering. - let (tx, rx) = mpsc::channel(1); - - let task = task::spawn( - async move { - self.run(rx).await; - } - .instrument(info_span!("TransportsSenderActor")), - ); - TransportsSenderHandle { - inbox: tx, - _task: AbortOnDropHandle::new(task), - } - } - - async fn run(self, mut inbox: mpsc::Receiver) { - use TransportsSenderMessage::SendDatagram; - - while let Some(SendDatagram(dst, owned_transmit)) = inbox.recv().await { - let transmit = transports::Transmit { - ecn: owned_transmit.ecn, - contents: owned_transmit.contents.as_ref(), - segment_size: owned_transmit.segment_size, - }; - let len = transmit.contents.len(); - match self.sender.send(&dst, None, &transmit).await { - Ok(()) => {} - Err(err) => { - trace!(?dst, %len, "transmit failed to send: {err:#}"); - } - }; - } - trace!("actor terminating"); - } -} - -#[derive(Debug)] -struct TransportsSenderHandle { - inbox: mpsc::Sender, - _task: AbortOnDropHandle<()>, -} - -#[derive(Debug)] -enum TransportsSenderMessage { - SendDatagram(transports::Addr, OwnedTransmit), -} - -impl From<(transports::Addr, OwnedTransmit)> for TransportsSenderMessage { - fn from(source: (transports::Addr, OwnedTransmit)) -> Self { - Self::SendDatagram(source.0, source.1) - } -} - #[cfg(test)] mod tests { diff --git a/iroh/src/magicsock/endpoint_map/endpoint_state.rs b/iroh/src/magicsock/endpoint_map/endpoint_state.rs index e5cb923979a..6eb8030d457 100644 --- a/iroh/src/magicsock/endpoint_map/endpoint_state.rs +++ b/iroh/src/magicsock/endpoint_map/endpoint_state.rs @@ -7,7 +7,6 @@ use std::{ }; use iroh_base::{EndpointAddr, EndpointId, RelayUrl, TransportAddr}; -use n0_error::StdResultExt; use n0_future::{ FuturesUnordered, MergeUnbounded, Stream, StreamExt, task::{self, AbortOnDropHandle}, @@ -23,7 +22,7 @@ use tokio::sync::{mpsc, oneshot}; use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError}; use tracing::{Instrument, Level, debug, error, event, info_span, instrument, trace, warn}; -use super::{Source, TransportsSenderMessage, path_state::PathState}; +use super::{Source, path_state::PathState}; // TODO: Use this // #[cfg(any(test, feature = "test-utils"))] // use crate::endpoint::PathSelection; @@ -33,7 +32,7 @@ use crate::{ magicsock::{ DiscoState, HEARTBEAT_INTERVAL, MagicsockMetrics, PATH_MAX_IDLE_TIMEOUT, mapped_addrs::{AddrMap, MappedAddr, RelayMappedAddr}, - transports::{self, OwnedTransmit}, + transports::{self, OwnedTransmit, TransportsSender}, }, util::MaybeFuture, }; @@ -95,10 +94,7 @@ pub(super) struct EndpointStateActor { // /// Metrics. metrics: Arc, - /// Allowing us to directly send datagrams. - /// - /// Used for handling [`EndpointStateMessage::SendDatagram`] messages. - transports_sender: mpsc::Sender, + sender: TransportsSender, /// Our local addresses. /// /// These are our local addresses and any reflexive transport addresses. @@ -149,17 +145,16 @@ impl EndpointStateActor { pub(super) fn new( endpoint_id: EndpointId, local_endpoint_id: EndpointId, - transports_sender: mpsc::Sender, local_addrs: n0_watcher::Direct>, disco: DiscoState, relay_mapped_addrs: AddrMap<(RelayUrl, EndpointId), RelayMappedAddr>, metrics: Arc, + sender: TransportsSender, ) -> Self { Self { endpoint_id, local_endpoint_id, metrics, - transports_sender, local_addrs, relay_mapped_addrs, disco, @@ -172,6 +167,7 @@ impl EndpointStateActor { scheduled_holepunch: None, scheduled_open_path: None, pending_open_paths: VecDeque::new(), + sender, } } @@ -297,26 +293,34 @@ impl EndpointStateActor { Ok(()) } + async fn send_datagram( + &self, + dst: transports::Addr, + owned_transmit: OwnedTransmit, + ) -> n0_error::Result<()> { + let transmit = transports::Transmit { + ecn: owned_transmit.ecn, + contents: owned_transmit.contents.as_ref(), + segment_size: owned_transmit.segment_size, + }; + self.sender.send(&dst, None, &transmit).await?; + Ok(()) + } + /// Handles [`EndpointStateMessage::SendDatagram`]. /// /// Error returns are fatal and kill the actor. async fn handle_msg_send_datagram(&mut self, transmit: OwnedTransmit) -> n0_error::Result<()> { if let Some(addr) = self.selected_path.get() { trace!(?addr, "sending datagram to selected path"); - self.transports_sender - .send((addr, transmit).into()) - .await - .std_context("TransportSenderActor stopped")?; + self.send_datagram(addr, transmit).await?; } else { trace!( paths = ?self.paths.keys().collect::>(), "sending datagram to all known paths", ); for addr in self.paths.keys() { - self.transports_sender - .send((addr.clone(), transmit.clone()).into()) - .await - .std_context("TransportSenerActor stopped")?; + self.send_datagram(addr.clone(), transmit.clone()).await?; } // This message is received *before* a connection is added. So we do // not yet have a connection to holepunch. Instead we trigger @@ -711,7 +715,7 @@ impl EndpointStateActor { transports::Addr::Ip(_) => &self.metrics.send_disco_udp, transports::Addr::Relay(_, _) => &self.metrics.send_disco_relay, }; - match self.transports_sender.send((dst, transmit).into()).await { + match self.send_datagram(dst, transmit).await { Ok(()) => { trace!("sent"); counter.inc(); diff --git a/iroh/src/magicsock/transports.rs b/iroh/src/magicsock/transports.rs index 8cc031109f9..b57e15d9902 100644 --- a/iroh/src/magicsock/transports.rs +++ b/iroh/src/magicsock/transports.rs @@ -412,7 +412,7 @@ impl Addr { } /// A sender that sends to all our transports. -#[derive(Debug)] +#[derive(Debug, Clone)] pub(crate) struct TransportsSender { #[cfg(not(wasm_browser))] ip: Vec, @@ -602,8 +602,8 @@ impl quinn::AsyncUdpSocket for MagicTransport { /// This is special in that it handles [`MultipathMappedAddr::Mixed`] by delegating to the /// [`MagicSock`] which expands it back to one or more [`Addr`]s and sends it /// using the underlying [`Transports`]. -// TODO: Can I just send the TransportsSender along in the NodeStateMessage::SendDatagram -// message?? That way you don't have to hook up the sender into the NodeMap! +// TODO: Can I just send the TransportsSender along in the EndpointStateMessage::SendDatagram +// message?? That way you don't have to hook up the sender into the EndpointMap! #[derive(Debug)] #[pin_project::pin_project] pub(crate) struct MagicSender { diff --git a/iroh/src/magicsock/transports/ip.rs b/iroh/src/magicsock/transports/ip.rs index a1c2fabad16..35501407583 100644 --- a/iroh/src/magicsock/transports/ip.rs +++ b/iroh/src/magicsock/transports/ip.rs @@ -101,10 +101,12 @@ impl IpTransport { } pub(super) fn create_sender(&self) -> IpSender { - let sender = self.socket.clone().create_sender(); + let socket = self.socket.clone(); + let sender = socket.clone().create_sender(); IpSender { bind_addr: self.bind_addr, sender, + socket, metrics: self.metrics.clone(), } } @@ -136,11 +138,23 @@ impl IpNetworkChangeSender { #[pin_project] pub(super) struct IpSender { bind_addr: SocketAddr, + socket: Arc, #[pin] sender: UdpSender, metrics: Arc, } +impl Clone for IpSender { + fn clone(&self) -> Self { + Self { + bind_addr: self.bind_addr, + socket: self.socket.clone(), + sender: self.socket.clone().create_sender(), + metrics: self.metrics.clone(), + } + } +} + impl IpSender { pub(super) fn is_valid_send_addr(&self, dst: &SocketAddr) -> bool { // Our net-tools crate binds sockets to their specific family. This means an IPv6 From cb510c37a6ad47e63cdd36227a3f50257904c1ee Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Tue, 11 Nov 2025 11:34:14 +0100 Subject: [PATCH 2/3] fixup: remove todo --- iroh/src/magicsock/transports.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/iroh/src/magicsock/transports.rs b/iroh/src/magicsock/transports.rs index b57e15d9902..d9be7bbfa97 100644 --- a/iroh/src/magicsock/transports.rs +++ b/iroh/src/magicsock/transports.rs @@ -602,8 +602,6 @@ impl quinn::AsyncUdpSocket for MagicTransport { /// This is special in that it handles [`MultipathMappedAddr::Mixed`] by delegating to the /// [`MagicSock`] which expands it back to one or more [`Addr`]s and sends it /// using the underlying [`Transports`]. -// TODO: Can I just send the TransportsSender along in the EndpointStateMessage::SendDatagram -// message?? That way you don't have to hook up the sender into the EndpointMap! #[derive(Debug)] #[pin_project::pin_project] pub(crate) struct MagicSender { From b9d063fc005aba4afd73f8e55f361c96e3223410 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Tue, 11 Nov 2025 13:58:56 +0100 Subject: [PATCH 3/3] fixup: use patched UdpSender --- Cargo.lock | 29 ++++++++++++++--------------- Cargo.toml | 2 ++ iroh/src/magicsock/transports/ip.rs | 18 ++---------------- 3 files changed, 18 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9cf9dca565d..dae53d970a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1121,7 +1121,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -1896,7 +1896,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2 0.5.10", + "socket2 0.6.1", "tokio", "tower-service", "tracing", @@ -2337,7 +2337,7 @@ dependencies = [ "pin-project-lite", "rustc-hash", "rustls", - "socket2 0.5.10", + "socket2 0.6.1", "thiserror 2.0.17", "tokio", "tokio-stream", @@ -2376,9 +2376,9 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2 0.5.10", + "socket2 0.6.1", "tracing", - "windows-sys 0.59.0", + "windows-sys 0.60.2", ] [[package]] @@ -2815,8 +2815,7 @@ dependencies = [ [[package]] name = "netwatch" version = "0.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26f2acd376ef48b6c326abf3ba23c449e0cb8aa5c2511d189dd8a8a3bfac889b" +source = "git+https://github.com/n0-computer/net-tools?branch=main#2708e3d7b0a6e1bf3322f71033a4b2002ec2339d" dependencies = [ "atomic-waker", "bytes", @@ -2889,7 +2888,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -3358,7 +3357,7 @@ dependencies = [ "quinn-udp", "rustc-hash", "rustls", - "socket2 0.5.10", + "socket2 0.6.1", "thiserror 2.0.17", "tokio", "tracing", @@ -3395,9 +3394,9 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2 0.5.10", + "socket2 0.6.1", "tracing", - "windows-sys 0.59.0", + "windows-sys 0.60.2", ] [[package]] @@ -3666,7 +3665,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -3789,7 +3788,7 @@ dependencies = [ "security-framework", "security-framework-sys", "webpki-root-certs 1.0.3", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -4352,7 +4351,7 @@ dependencies = [ "getrandom 0.3.4", "once_cell", "rustix", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -5124,7 +5123,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.48.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index a1faa46daa3..7de2e7463e4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,6 +47,8 @@ iroh-quinn = { git = "https://github.com/n0-computer/quinn", branch = "main-iroh iroh-quinn-proto = { git = "https://github.com/n0-computer/quinn", branch = "main-iroh" } iroh-quinn-udp = { git = "https://github.com/n0-computer/quinn", branch = "main-iroh" } +netwatch = { git = "https://github.com/n0-computer/net-tools", branch = "main" } + # iroh-quinn = { path = "../iroh-quinn/quinn" } # iroh-quinn-proto = { path = "../iroh-quinn/quinn-proto" } # iroh-quinn-udp = { path = "../iroh-quinn/quinn-udp" } diff --git a/iroh/src/magicsock/transports/ip.rs b/iroh/src/magicsock/transports/ip.rs index 35501407583..147cfcec022 100644 --- a/iroh/src/magicsock/transports/ip.rs +++ b/iroh/src/magicsock/transports/ip.rs @@ -101,12 +101,10 @@ impl IpTransport { } pub(super) fn create_sender(&self) -> IpSender { - let socket = self.socket.clone(); - let sender = socket.clone().create_sender(); + let sender = self.socket.clone().create_sender(); IpSender { bind_addr: self.bind_addr, sender, - socket, metrics: self.metrics.clone(), } } @@ -134,27 +132,15 @@ impl IpNetworkChangeSender { } } -#[derive(Debug)] +#[derive(Debug, Clone)] #[pin_project] pub(super) struct IpSender { bind_addr: SocketAddr, - socket: Arc, #[pin] sender: UdpSender, metrics: Arc, } -impl Clone for IpSender { - fn clone(&self) -> Self { - Self { - bind_addr: self.bind_addr, - socket: self.socket.clone(), - sender: self.socket.clone().create_sender(), - metrics: self.metrics.clone(), - } - } -} - impl IpSender { pub(super) fn is_valid_send_addr(&self, dst: &SocketAddr) -> bool { // Our net-tools crate binds sockets to their specific family. This means an IPv6