Skip to content

Commit 6380246

Browse files
refactor: remove the TransportsSenderActor
First step for #3641, the rest can be done once quic holepunching has landed Unfortunately `send_disco_message` also needs the sender, so it can't be fully removed from the `EndpointState`
1 parent 9d795d3 commit 6380246

File tree

7 files changed

+48
-121
lines changed

7 files changed

+48
-121
lines changed

Cargo.lock

Lines changed: 14 additions & 15 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ iroh-quinn = { git = "https://github.com/n0-computer/quinn", branch = "main-iroh
4747
iroh-quinn-proto = { git = "https://github.com/n0-computer/quinn", branch = "main-iroh" }
4848
iroh-quinn-udp = { git = "https://github.com/n0-computer/quinn", branch = "main-iroh" }
4949

50+
netwatch = { git = "https://github.com/n0-computer/net-tools", branch = "main" }
51+
5052
# iroh-quinn = { path = "../iroh-quinn/quinn" }
5153
# iroh-quinn-proto = { path = "../iroh-quinn/quinn-proto" }
5254
# iroh-quinn-udp = { path = "../iroh-quinn/quinn-udp" }

iroh/src/magicsock.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1011,15 +1011,14 @@ impl Handle {
10111011
let (disco, disco_receiver) = DiscoState::new(&secret_key);
10121012

10131013
let endpoint_map = {
1014-
let sender = transports.create_sender();
10151014
EndpointMap::new(
10161015
secret_key.public(),
10171016
// #[cfg(any(test, feature = "test-utils"))]
10181017
// path_selection,
10191018
metrics.magicsock.clone(),
1020-
sender,
10211019
direct_addrs.addrs.watch(),
10221020
disco.clone(),
1021+
transports.create_sender(),
10231022
)
10241023
};
10251024

iroh/src/magicsock/endpoint_map.rs

Lines changed: 7 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,15 @@ use std::{
66
};
77

88
use iroh_base::{EndpointAddr, EndpointId, RelayUrl};
9-
use n0_future::task::{self, AbortOnDropHandle};
109
use rustc_hash::FxHashMap;
1110
use serde::{Deserialize, Serialize};
1211
use tokio::sync::mpsc;
13-
use tracing::{Instrument, info_span, trace, warn};
12+
use tracing::warn;
1413

1514
use super::{
1615
DirectAddr, DiscoState, MagicsockMetrics,
1716
mapped_addrs::{AddrMap, EndpointIdMappedAddr, RelayMappedAddr},
18-
transports::{self, OwnedTransmit, TransportsSender},
17+
transports::{self, TransportsSender},
1918
};
2019
use crate::disco::{self};
2120
// #[cfg(any(test, feature = "test-utils"))]
@@ -58,10 +57,9 @@ pub(crate) struct EndpointMap {
5857
/// The endpoint ID of the local endpoint.
5958
local_endpoint_id: EndpointId,
6059
metrics: Arc<MagicsockMetrics>,
61-
/// Handle to an actor that can send over the transports.
62-
transports_handle: TransportsSenderHandle,
6360
local_addrs: n0_watcher::Direct<BTreeSet<DirectAddr>>,
6461
disco: DiscoState,
62+
sender: TransportsSender,
6563
}
6664

6765
impl EndpointMap {
@@ -71,19 +69,20 @@ impl EndpointMap {
7169
// TODO:
7270
// #[cfg(any(test, feature = "test-utils"))] path_selection: PathSelection,
7371
metrics: Arc<MagicsockMetrics>,
74-
sender: TransportsSender,
72+
7573
local_addrs: n0_watcher::Direct<BTreeSet<DirectAddr>>,
7674
disco: DiscoState,
75+
sender: TransportsSender,
7776
) -> Self {
7877
Self {
7978
actor_handles: Mutex::new(FxHashMap::default()),
8079
endpoint_mapped_addrs: Default::default(),
8180
relay_mapped_addrs: Default::default(),
8281
local_endpoint_id,
8382
metrics,
84-
transports_handle: TransportsSenderActor::new(sender).start(),
8583
local_addrs,
8684
disco,
85+
sender,
8786
}
8887
}
8988

@@ -135,18 +134,17 @@ impl EndpointMap {
135134
Some(handle) => handle.sender.clone(),
136135
None => {
137136
// Create a new EndpointStateActor and insert it into the endpoint map.
138-
let sender = self.transports_handle.inbox.clone();
139137
let local_addrs = self.local_addrs.clone();
140138
let disco = self.disco.clone();
141139
let metrics = self.metrics.clone();
142140
let actor = EndpointStateActor::new(
143141
eid,
144142
self.local_endpoint_id,
145-
sender,
146143
local_addrs,
147144
disco,
148145
self.relay_mapped_addrs.clone(),
149146
metrics,
147+
self.sender.clone(),
150148
);
151149
let handle = actor.start();
152150
let sender = handle.sender.clone();
@@ -282,79 +280,6 @@ impl IpPort {
282280
}
283281
}
284282

285-
/// An actor that can send datagrams onto iroh transports.
286-
///
287-
/// The [`EndpointStateActor`]s want to be able to send datagrams. Because we can not create
288-
/// [`TransportsSender`]s on demand we must share one for the entire [`EndpointMap`], which
289-
/// lives in this actor.
290-
///
291-
/// [`EndpointStateActor`]: endpoint_state::EndpointStateActor
292-
#[derive(Debug)]
293-
struct TransportsSenderActor {
294-
sender: TransportsSender,
295-
}
296-
297-
impl TransportsSenderActor {
298-
fn new(sender: TransportsSender) -> Self {
299-
Self { sender }
300-
}
301-
302-
fn start(self) -> TransportsSenderHandle {
303-
// This actor gets an inbox size of exactly 1. This is the same as if they had the
304-
// underlying sender directly: either you can send or not, or you await until you
305-
// can. No need to introduce extra buffering.
306-
let (tx, rx) = mpsc::channel(1);
307-
308-
let task = task::spawn(
309-
async move {
310-
self.run(rx).await;
311-
}
312-
.instrument(info_span!("TransportsSenderActor")),
313-
);
314-
TransportsSenderHandle {
315-
inbox: tx,
316-
_task: AbortOnDropHandle::new(task),
317-
}
318-
}
319-
320-
async fn run(self, mut inbox: mpsc::Receiver<TransportsSenderMessage>) {
321-
use TransportsSenderMessage::SendDatagram;
322-
323-
while let Some(SendDatagram(dst, owned_transmit)) = inbox.recv().await {
324-
let transmit = transports::Transmit {
325-
ecn: owned_transmit.ecn,
326-
contents: owned_transmit.contents.as_ref(),
327-
segment_size: owned_transmit.segment_size,
328-
};
329-
let len = transmit.contents.len();
330-
match self.sender.send(&dst, None, &transmit).await {
331-
Ok(()) => {}
332-
Err(err) => {
333-
trace!(?dst, %len, "transmit failed to send: {err:#}");
334-
}
335-
};
336-
}
337-
trace!("actor terminating");
338-
}
339-
}
340-
341-
#[derive(Debug)]
342-
struct TransportsSenderHandle {
343-
inbox: mpsc::Sender<TransportsSenderMessage>,
344-
_task: AbortOnDropHandle<()>,
345-
}
346-
347-
#[derive(Debug)]
348-
enum TransportsSenderMessage {
349-
SendDatagram(transports::Addr, OwnedTransmit),
350-
}
351-
352-
impl From<(transports::Addr, OwnedTransmit)> for TransportsSenderMessage {
353-
fn from(source: (transports::Addr, OwnedTransmit)) -> Self {
354-
Self::SendDatagram(source.0, source.1)
355-
}
356-
}
357-
358283
#[cfg(test)]
359284
mod tests {
360285

0 commit comments

Comments
 (0)