Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,10 @@ netwatch = { git = "https://github.com/n0-computer/net-tools", branch = "main" }
# iroh-quinn = { path = "../iroh-quinn/quinn" }
# iroh-quinn-proto = { path = "../iroh-quinn/quinn-proto" }
# iroh-quinn-udp = { path = "../iroh-quinn/quinn-udp" }


[patch."https://github.com/n0-computer/quinn"]

# iroh-quinn = { path = "../iroh-quinn/quinn" }
# iroh-quinn-proto = { path = "../iroh-quinn/quinn-proto" }
# iroh-quinn-udp = { path = "../iroh-quinn/quinn-udp" }
2 changes: 1 addition & 1 deletion iroh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ iroh-base = { version = "0.95.1", default-features = false, features = ["key", "
iroh-relay = { version = "0.95", path = "../iroh-relay", default-features = false }
n0-future = "0.3.0"
n0-error = "0.1.0"
n0-watcher = "0.5"
n0-watcher = "0.6"
netwatch = { version = "0.12" }
pin-project = "1"
pkarr = { version = "5", default-features = false, features = ["relays"] }
Expand Down
37 changes: 19 additions & 18 deletions iroh/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,11 @@ pub(crate) mod endpoint_map;
pub(crate) mod mapped_addrs;
pub(crate) mod transports;

use mapped_addrs::{EndpointIdMappedAddr, MappedAddr};

pub use self::{endpoint_map::PathInfo, metrics::Metrics};
use self::{
mapped_addrs::{EndpointIdMappedAddr, MappedAddr},
transports::Addr,
};

// TODO: Use this
// /// How long we consider a QAD-derived endpoint valid for. UDP NAT mappings typically
Expand Down Expand Up @@ -489,18 +491,18 @@ impl MagicSock {

#[cfg_attr(windows, allow(dead_code))]
fn normalized_local_addr(&self) -> io::Result<SocketAddr> {
let addrs = self.local_addrs_watch.clone().get();
let addrs = self.local_addrs_watch.peek();

let mut ipv4_addr = None;
for addr in addrs {
let Some(addr) = addr.into_socket_addr() else {
continue;
};
if addr.is_ipv6() {
return Ok(addr);
}
if addr.is_ipv4() && ipv4_addr.is_none() {
ipv4_addr.replace(addr);
match addr {
Addr::Ip(addr @ SocketAddr::V6(_)) => {
return Ok(*addr);
}
Addr::Ip(addr @ SocketAddr::V4(_)) if ipv4_addr.is_none() => {
ipv4_addr.replace(*addr);
}
_ => {}
}
}
match ipv4_addr {
Expand Down Expand Up @@ -550,11 +552,12 @@ impl MagicSock {

let mut quic_packets_total = 0;

for ((quinn_meta, buf), source_addr) in metas
.iter_mut()
.zip(bufs.iter_mut())
.zip(source_addrs.iter())
{
// zip is slow :(
for i in 0..metas.len() {
Comment on lines +555 to +556
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it is because of the double zip. The QNT branch only has a single zip here I think.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does seem like zip generally optimizes badly: rust-lang/rust#143966 (comment)

let quinn_meta = &mut metas[i];
let buf = &mut bufs[i];
let source_addr = &source_addrs[i];

let mut buf_contains_quic_datagrams = false;
let mut quic_datagram_count = 0;
if quinn_meta.len > quinn_meta.stride {
Expand All @@ -578,11 +581,9 @@ impl MagicSock {
// relies on quinn::EndpointConfig::grease_quic_bit being set to `false`,
// which we do in Endpoint::bind.
if let Some((sender, sealed_box)) = disco::source_and_box(datagram) {
trace!(src = ?source_addr, len = datagram.len(), "UDP recv: DISCO packet");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do these show up in perf?

But yeah - perhaps it's fair to remove these.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they do 😭

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they're removed in the QNT branch too :)

But really, you should be benching with the tracing static verbosity levels off, no? i.e. use max_level_off and/or release_max_level_off: https://docs.rs/tracing/latest/tracing/level_filters/index.html#compile-time-filters

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool, I didn't know about this, but in practice this will show up for folks though

self.handle_disco_message(sender, sealed_box, source_addr);
datagram[0] = 0u8;
} else {
trace!(src = ?source_addr, len = datagram.len(), "UDP recv: QUIC packet");
match source_addr {
transports::Addr::Ip(SocketAddr::V4(..)) => {
self.metrics
Expand Down
14 changes: 9 additions & 5 deletions iroh/src/magicsock/endpoint_map/endpoint_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1183,10 +1183,10 @@ impl ConnectionState {
pub(crate) struct PathsWatcher(
Box<
n0_watcher::Map<
(
n0_watcher::Tuple<
n0_watcher::Direct<PathAddrList>,
n0_watcher::Direct<Option<transports::Addr>>,
),
>,
PathInfoList,
>,
>,
Expand All @@ -1195,8 +1195,12 @@ pub(crate) struct PathsWatcher(
impl n0_watcher::Watcher for PathsWatcher {
type Value = PathInfoList;

fn get(&mut self) -> Self::Value {
self.0.get()
fn update(&mut self) -> bool {
self.0.update()
}

fn peek(&self) -> &Self::Value {
self.0.peek()
}

fn is_connected(&self) -> bool {
Expand All @@ -1206,7 +1210,7 @@ impl n0_watcher::Watcher for PathsWatcher {
fn poll_updated(
&mut self,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<Self::Value, n0_watcher::Disconnected>> {
) -> Poll<Result<(), n0_watcher::Disconnected>> {
self.0.poll_updated(cx)
}
}
Expand Down
26 changes: 13 additions & 13 deletions iroh/src/magicsock/transports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
io::{self, IoSliceMut},
net::{IpAddr, Ipv6Addr, SocketAddr, SocketAddrV6},
pin::Pin,
sync::{Arc, atomic::AtomicUsize},
sync::Arc,
task::{Context, Poll},
};

Expand Down Expand Up @@ -34,18 +34,20 @@ pub(crate) struct Transports {
ip: Vec<IpTransport>,
relay: Vec<RelayTransport>,

poll_recv_counter: AtomicUsize,
poll_recv_counter: usize,
/// Cache for source addrs, to speed up access
source_addrs: [Addr; quinn_udp::BATCH_SIZE],
}

#[cfg(not(wasm_browser))]
pub(crate) type LocalAddrsWatch = n0_watcher::Map<
(
n0_watcher::Tuple<
n0_watcher::Join<SocketAddr, n0_watcher::Direct<SocketAddr>>,
n0_watcher::Join<
Option<(RelayUrl, EndpointId)>,
n0_watcher::Map<n0_watcher::Direct<Option<RelayUrl>>, Option<(RelayUrl, EndpointId)>>,
>,
),
>,
Vec<Addr>,
>;

Expand All @@ -69,6 +71,7 @@ impl Transports {
ip,
relay,
poll_recv_counter: Default::default(),
source_addrs: Default::default(),
}
}

Expand All @@ -80,15 +83,15 @@ impl Transports {
msock: &MagicSock,
) -> Poll<io::Result<usize>> {
debug_assert_eq!(bufs.len(), metas.len(), "non matching bufs & metas");
debug_assert!(bufs.len() <= quinn_udp::BATCH_SIZE, "too many buffers");
if msock.is_closing() {
return Poll::Pending;
}

let mut source_addrs = vec![Addr::default(); metas.len()];
match self.inner_poll_recv(cx, bufs, metas, &mut source_addrs)? {
match self.inner_poll_recv(cx, bufs, metas)? {
Poll::Pending | Poll::Ready(0) => Poll::Pending,
Poll::Ready(n) => {
msock.process_datagrams(&mut bufs[..n], &mut metas[..n], &source_addrs[..n]);
msock.process_datagrams(&mut bufs[..n], &mut metas[..n], &self.source_addrs[..n]);
Poll::Ready(Ok(n))
}
}
Expand All @@ -100,13 +103,12 @@ impl Transports {
cx: &mut Context,
bufs: &mut [IoSliceMut<'_>],
metas: &mut [quinn_udp::RecvMeta],
source_addrs: &mut [Addr],
) -> Poll<io::Result<usize>> {
debug_assert_eq!(bufs.len(), metas.len(), "non matching bufs & metas");

macro_rules! poll_transport {
($socket:expr) => {
match $socket.poll_recv(cx, bufs, metas, source_addrs)? {
match $socket.poll_recv(cx, bufs, metas, &mut self.source_addrs)? {
Poll::Pending | Poll::Ready(0) => {}
Poll::Ready(n) => {
return Poll::Ready(Ok(n));
Expand All @@ -117,9 +119,7 @@ impl Transports {

// To improve fairness, every other call reverses the ordering of polling.

let counter = self
.poll_recv_counter
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let counter = self.poll_recv_counter.wrapping_add(1);

if counter % 2 == 0 {
#[cfg(not(wasm_browser))]
Expand Down Expand Up @@ -156,7 +156,7 @@ impl Transports {
let ips = n0_watcher::Join::new(self.ip.iter().map(|t| t.local_addr_watch()));
let relays = n0_watcher::Join::new(self.relay.iter().map(|t| t.local_addr_watch()));

(ips, relays).map(|(ips, relays)| {
ips.or(relays).map(|(ips, relays)| {
ips.into_iter()
.map(Addr::from)
.chain(
Expand Down
Loading