diff --git a/Cargo.lock b/Cargo.lock index bf83ac230e..e46335a8ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2738,9 +2738,9 @@ dependencies = [ [[package]] name = "n0-watcher" -version = "0.5.0" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38acf13c1ddafc60eb7316d52213467f8ccb70b6f02b65e7d97f7799b1f50be4" +checksum = "ba717c22ceec021ace0ff7674bf8fd60c9394605740a8201678fc1cb3a7398f6" dependencies = [ "derive_more 2.0.1", "n0-error", @@ -2815,7 +2815,7 @@ dependencies = [ [[package]] name = "netwatch" version = "0.12.0" -source = "git+https://github.com/n0-computer/net-tools?branch=main#2708e3d7b0a6e1bf3322f71033a4b2002ec2339d" +source = "git+https://github.com/n0-computer/net-tools?branch=main#0721bbb6a2c4dd487a378d6ef0f56387680649d1" dependencies = [ "atomic-waker", "bytes", diff --git a/Cargo.toml b/Cargo.toml index 7de2e7463e..89721a6363 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,3 +52,10 @@ netwatch = { git = "https://github.com/n0-computer/net-tools", branch = "main" } # iroh-quinn = { path = "../iroh-quinn/quinn" } # iroh-quinn-proto = { path = "../iroh-quinn/quinn-proto" } # iroh-quinn-udp = { path = "../iroh-quinn/quinn-udp" } + + +[patch."https://github.com/n0-computer/quinn"] + +# iroh-quinn = { path = "../iroh-quinn/quinn" } +# iroh-quinn-proto = { path = "../iroh-quinn/quinn-proto" } +# iroh-quinn-udp = { path = "../iroh-quinn/quinn-udp" } diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index 558ff278d0..aaa00b8039 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -33,7 +33,7 @@ iroh-base = { version = "0.95.1", default-features = false, features = ["key", " iroh-relay = { version = "0.95", path = "../iroh-relay", default-features = false } n0-future = "0.3.0" n0-error = "0.1.0" -n0-watcher = "0.5" +n0-watcher = "0.6" netwatch = { version = "0.12" } pin-project = "1" pkarr = { version = "5", default-features = false, features = ["relays"] } diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index 38eeed85c5..2a4f6e6602 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -75,9 +75,11 @@ pub(crate) mod endpoint_map; pub(crate) mod mapped_addrs; pub(crate) mod transports; -use mapped_addrs::{EndpointIdMappedAddr, MappedAddr}; - pub use self::{endpoint_map::PathInfo, metrics::Metrics}; +use self::{ + mapped_addrs::{EndpointIdMappedAddr, MappedAddr}, + transports::Addr, +}; // TODO: Use this // /// How long we consider a QAD-derived endpoint valid for. UDP NAT mappings typically @@ -489,18 +491,18 @@ impl MagicSock { #[cfg_attr(windows, allow(dead_code))] fn normalized_local_addr(&self) -> io::Result { - let addrs = self.local_addrs_watch.clone().get(); + let addrs = self.local_addrs_watch.peek(); let mut ipv4_addr = None; for addr in addrs { - let Some(addr) = addr.into_socket_addr() else { - continue; - }; - if addr.is_ipv6() { - return Ok(addr); - } - if addr.is_ipv4() && ipv4_addr.is_none() { - ipv4_addr.replace(addr); + match addr { + Addr::Ip(addr @ SocketAddr::V6(_)) => { + return Ok(*addr); + } + Addr::Ip(addr @ SocketAddr::V4(_)) if ipv4_addr.is_none() => { + ipv4_addr.replace(*addr); + } + _ => {} } } match ipv4_addr { @@ -550,11 +552,12 @@ impl MagicSock { let mut quic_packets_total = 0; - for ((quinn_meta, buf), source_addr) in metas - .iter_mut() - .zip(bufs.iter_mut()) - .zip(source_addrs.iter()) - { + // zip is slow :( + for i in 0..metas.len() { + let quinn_meta = &mut metas[i]; + let buf = &mut bufs[i]; + let source_addr = &source_addrs[i]; + let mut buf_contains_quic_datagrams = false; let mut quic_datagram_count = 0; if quinn_meta.len > quinn_meta.stride { @@ -578,11 +581,9 @@ impl MagicSock { // relies on quinn::EndpointConfig::grease_quic_bit being set to `false`, // which we do in Endpoint::bind. if let Some((sender, sealed_box)) = disco::source_and_box(datagram) { - trace!(src = ?source_addr, len = datagram.len(), "UDP recv: DISCO packet"); self.handle_disco_message(sender, sealed_box, source_addr); datagram[0] = 0u8; } else { - trace!(src = ?source_addr, len = datagram.len(), "UDP recv: QUIC packet"); match source_addr { transports::Addr::Ip(SocketAddr::V4(..)) => { self.metrics diff --git a/iroh/src/magicsock/endpoint_map/endpoint_state.rs b/iroh/src/magicsock/endpoint_map/endpoint_state.rs index aa70a0cae7..f51d18956c 100644 --- a/iroh/src/magicsock/endpoint_map/endpoint_state.rs +++ b/iroh/src/magicsock/endpoint_map/endpoint_state.rs @@ -1183,10 +1183,10 @@ impl ConnectionState { pub(crate) struct PathsWatcher( Box< n0_watcher::Map< - ( + n0_watcher::Tuple< n0_watcher::Direct, n0_watcher::Direct>, - ), + >, PathInfoList, >, >, @@ -1195,8 +1195,12 @@ pub(crate) struct PathsWatcher( impl n0_watcher::Watcher for PathsWatcher { type Value = PathInfoList; - fn get(&mut self) -> Self::Value { - self.0.get() + fn update(&mut self) -> bool { + self.0.update() + } + + fn peek(&self) -> &Self::Value { + self.0.peek() } fn is_connected(&self) -> bool { @@ -1206,7 +1210,7 @@ impl n0_watcher::Watcher for PathsWatcher { fn poll_updated( &mut self, cx: &mut std::task::Context<'_>, - ) -> Poll> { + ) -> Poll> { self.0.poll_updated(cx) } } diff --git a/iroh/src/magicsock/transports.rs b/iroh/src/magicsock/transports.rs index d9be7bbfa9..5d108b31b8 100644 --- a/iroh/src/magicsock/transports.rs +++ b/iroh/src/magicsock/transports.rs @@ -3,7 +3,7 @@ use std::{ io::{self, IoSliceMut}, net::{IpAddr, Ipv6Addr, SocketAddr, SocketAddrV6}, pin::Pin, - sync::{Arc, atomic::AtomicUsize}, + sync::Arc, task::{Context, Poll}, }; @@ -34,18 +34,20 @@ pub(crate) struct Transports { ip: Vec, relay: Vec, - poll_recv_counter: AtomicUsize, + poll_recv_counter: usize, + /// Cache for source addrs, to speed up access + source_addrs: [Addr; quinn_udp::BATCH_SIZE], } #[cfg(not(wasm_browser))] pub(crate) type LocalAddrsWatch = n0_watcher::Map< - ( + n0_watcher::Tuple< n0_watcher::Join>, n0_watcher::Join< Option<(RelayUrl, EndpointId)>, n0_watcher::Map>, Option<(RelayUrl, EndpointId)>>, >, - ), + >, Vec, >; @@ -69,6 +71,7 @@ impl Transports { ip, relay, poll_recv_counter: Default::default(), + source_addrs: Default::default(), } } @@ -80,15 +83,15 @@ impl Transports { msock: &MagicSock, ) -> Poll> { debug_assert_eq!(bufs.len(), metas.len(), "non matching bufs & metas"); + debug_assert!(bufs.len() <= quinn_udp::BATCH_SIZE, "too many buffers"); if msock.is_closing() { return Poll::Pending; } - let mut source_addrs = vec![Addr::default(); metas.len()]; - match self.inner_poll_recv(cx, bufs, metas, &mut source_addrs)? { + match self.inner_poll_recv(cx, bufs, metas)? { Poll::Pending | Poll::Ready(0) => Poll::Pending, Poll::Ready(n) => { - msock.process_datagrams(&mut bufs[..n], &mut metas[..n], &source_addrs[..n]); + msock.process_datagrams(&mut bufs[..n], &mut metas[..n], &self.source_addrs[..n]); Poll::Ready(Ok(n)) } } @@ -100,13 +103,12 @@ impl Transports { cx: &mut Context, bufs: &mut [IoSliceMut<'_>], metas: &mut [quinn_udp::RecvMeta], - source_addrs: &mut [Addr], ) -> Poll> { debug_assert_eq!(bufs.len(), metas.len(), "non matching bufs & metas"); macro_rules! poll_transport { ($socket:expr) => { - match $socket.poll_recv(cx, bufs, metas, source_addrs)? { + match $socket.poll_recv(cx, bufs, metas, &mut self.source_addrs)? { Poll::Pending | Poll::Ready(0) => {} Poll::Ready(n) => { return Poll::Ready(Ok(n)); @@ -117,9 +119,7 @@ impl Transports { // To improve fairness, every other call reverses the ordering of polling. - let counter = self - .poll_recv_counter - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let counter = self.poll_recv_counter.wrapping_add(1); if counter % 2 == 0 { #[cfg(not(wasm_browser))] @@ -156,7 +156,7 @@ impl Transports { let ips = n0_watcher::Join::new(self.ip.iter().map(|t| t.local_addr_watch())); let relays = n0_watcher::Join::new(self.relay.iter().map(|t| t.local_addr_watch())); - (ips, relays).map(|(ips, relays)| { + ips.or(relays).map(|(ips, relays)| { ips.into_iter() .map(Addr::from) .chain(