Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 14 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ iroh-quinn = { git = "https://github.com/n0-computer/quinn", branch = "main-iroh
iroh-quinn-proto = { git = "https://github.com/n0-computer/quinn", branch = "main-iroh" }
iroh-quinn-udp = { git = "https://github.com/n0-computer/quinn", branch = "main-iroh" }

netwatch = { git = "https://github.com/n0-computer/net-tools", branch = "main" }

# iroh-quinn = { path = "../iroh-quinn/quinn" }
# iroh-quinn-proto = { path = "../iroh-quinn/quinn-proto" }
# iroh-quinn-udp = { path = "../iroh-quinn/quinn-udp" }
3 changes: 1 addition & 2 deletions iroh/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1011,15 +1011,14 @@ impl Handle {
let (disco, disco_receiver) = DiscoState::new(&secret_key);

let endpoint_map = {
let sender = transports.create_sender();
EndpointMap::new(
secret_key.public(),
// #[cfg(any(test, feature = "test-utils"))]
// path_selection,
metrics.magicsock.clone(),
sender,
direct_addrs.addrs.watch(),
disco.clone(),
transports.create_sender(),
)
};

Expand Down
89 changes: 7 additions & 82 deletions iroh/src/magicsock/endpoint_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,15 @@ use std::{
};

use iroh_base::{EndpointAddr, EndpointId, RelayUrl};
use n0_future::task::{self, AbortOnDropHandle};
use rustc_hash::FxHashMap;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use tracing::{Instrument, info_span, trace, warn};
use tracing::warn;

use super::{
DirectAddr, DiscoState, MagicsockMetrics,
mapped_addrs::{AddrMap, EndpointIdMappedAddr, RelayMappedAddr},
transports::{self, OwnedTransmit, TransportsSender},
transports::{self, TransportsSender},
};
use crate::disco::{self};
// #[cfg(any(test, feature = "test-utils"))]
Expand Down Expand Up @@ -58,10 +57,9 @@ pub(crate) struct EndpointMap {
/// The endpoint ID of the local endpoint.
local_endpoint_id: EndpointId,
metrics: Arc<MagicsockMetrics>,
/// Handle to an actor that can send over the transports.
transports_handle: TransportsSenderHandle,
local_addrs: n0_watcher::Direct<BTreeSet<DirectAddr>>,
disco: DiscoState,
sender: TransportsSender,
}

impl EndpointMap {
Expand All @@ -71,19 +69,20 @@ impl EndpointMap {
// TODO:
// #[cfg(any(test, feature = "test-utils"))] path_selection: PathSelection,
metrics: Arc<MagicsockMetrics>,
sender: TransportsSender,

local_addrs: n0_watcher::Direct<BTreeSet<DirectAddr>>,
disco: DiscoState,
sender: TransportsSender,
) -> Self {
Self {
actor_handles: Mutex::new(FxHashMap::default()),
endpoint_mapped_addrs: Default::default(),
relay_mapped_addrs: Default::default(),
local_endpoint_id,
metrics,
transports_handle: TransportsSenderActor::new(sender).start(),
local_addrs,
disco,
sender,
}
}

Expand Down Expand Up @@ -135,18 +134,17 @@ impl EndpointMap {
Some(handle) => handle.sender.clone(),
None => {
// Create a new EndpointStateActor and insert it into the endpoint map.
let sender = self.transports_handle.inbox.clone();
let local_addrs = self.local_addrs.clone();
let disco = self.disco.clone();
let metrics = self.metrics.clone();
let actor = EndpointStateActor::new(
eid,
self.local_endpoint_id,
sender,
local_addrs,
disco,
self.relay_mapped_addrs.clone(),
metrics,
self.sender.clone(),
);
let handle = actor.start();
let sender = handle.sender.clone();
Expand Down Expand Up @@ -282,79 +280,6 @@ impl IpPort {
}
}

/// An actor that can send datagrams onto iroh transports.
///
/// The [`EndpointStateActor`]s want to be able to send datagrams. Because we can not create
/// [`TransportsSender`]s on demand we must share one for the entire [`EndpointMap`], which
/// lives in this actor.
///
/// [`EndpointStateActor`]: endpoint_state::EndpointStateActor
#[derive(Debug)]
struct TransportsSenderActor {
sender: TransportsSender,
}

impl TransportsSenderActor {
fn new(sender: TransportsSender) -> Self {
Self { sender }
}

fn start(self) -> TransportsSenderHandle {
// This actor gets an inbox size of exactly 1. This is the same as if they had the
// underlying sender directly: either you can send or not, or you await until you
// can. No need to introduce extra buffering.
let (tx, rx) = mpsc::channel(1);

let task = task::spawn(
async move {
self.run(rx).await;
}
.instrument(info_span!("TransportsSenderActor")),
);
TransportsSenderHandle {
inbox: tx,
_task: AbortOnDropHandle::new(task),
}
}

async fn run(self, mut inbox: mpsc::Receiver<TransportsSenderMessage>) {
use TransportsSenderMessage::SendDatagram;

while let Some(SendDatagram(dst, owned_transmit)) = inbox.recv().await {
let transmit = transports::Transmit {
ecn: owned_transmit.ecn,
contents: owned_transmit.contents.as_ref(),
segment_size: owned_transmit.segment_size,
};
let len = transmit.contents.len();
match self.sender.send(&dst, None, &transmit).await {
Ok(()) => {}
Err(err) => {
trace!(?dst, %len, "transmit failed to send: {err:#}");
}
};
}
trace!("actor terminating");
}
}

#[derive(Debug)]
struct TransportsSenderHandle {
inbox: mpsc::Sender<TransportsSenderMessage>,
_task: AbortOnDropHandle<()>,
}

#[derive(Debug)]
enum TransportsSenderMessage {
SendDatagram(transports::Addr, OwnedTransmit),
}

impl From<(transports::Addr, OwnedTransmit)> for TransportsSenderMessage {
fn from(source: (transports::Addr, OwnedTransmit)) -> Self {
Self::SendDatagram(source.0, source.1)
}
}

#[cfg(test)]
mod tests {

Expand Down
Loading
Loading