Skip to content

Commit b4f3dc3

Browse files
refactor: remove the TransportsSenderActor
1 parent 9d795d3 commit b4f3dc3

File tree

5 files changed

+48
-106
lines changed

5 files changed

+48
-106
lines changed

iroh/src/magicsock.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1011,15 +1011,14 @@ impl Handle {
10111011
let (disco, disco_receiver) = DiscoState::new(&secret_key);
10121012

10131013
let endpoint_map = {
1014-
let sender = transports.create_sender();
10151014
EndpointMap::new(
10161015
secret_key.public(),
10171016
// #[cfg(any(test, feature = "test-utils"))]
10181017
// path_selection,
10191018
metrics.magicsock.clone(),
1020-
sender,
10211019
direct_addrs.addrs.watch(),
10221020
disco.clone(),
1021+
transports.create_sender(),
10231022
)
10241023
};
10251024

iroh/src/magicsock/endpoint_map.rs

Lines changed: 7 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,15 @@ use std::{
66
};
77

88
use iroh_base::{EndpointAddr, EndpointId, RelayUrl};
9-
use n0_future::task::{self, AbortOnDropHandle};
109
use rustc_hash::FxHashMap;
1110
use serde::{Deserialize, Serialize};
1211
use tokio::sync::mpsc;
13-
use tracing::{Instrument, info_span, trace, warn};
12+
use tracing::warn;
1413

1514
use super::{
1615
DirectAddr, DiscoState, MagicsockMetrics,
1716
mapped_addrs::{AddrMap, EndpointIdMappedAddr, RelayMappedAddr},
18-
transports::{self, OwnedTransmit, TransportsSender},
17+
transports::{self, TransportsSender},
1918
};
2019
use crate::disco::{self};
2120
// #[cfg(any(test, feature = "test-utils"))]
@@ -58,10 +57,9 @@ pub(crate) struct EndpointMap {
5857
/// The endpoint ID of the local endpoint.
5958
local_endpoint_id: EndpointId,
6059
metrics: Arc<MagicsockMetrics>,
61-
/// Handle to an actor that can send over the transports.
62-
transports_handle: TransportsSenderHandle,
6360
local_addrs: n0_watcher::Direct<BTreeSet<DirectAddr>>,
6461
disco: DiscoState,
62+
sender: TransportsSender,
6563
}
6664

6765
impl EndpointMap {
@@ -71,19 +69,20 @@ impl EndpointMap {
7169
// TODO:
7270
// #[cfg(any(test, feature = "test-utils"))] path_selection: PathSelection,
7371
metrics: Arc<MagicsockMetrics>,
74-
sender: TransportsSender,
72+
7573
local_addrs: n0_watcher::Direct<BTreeSet<DirectAddr>>,
7674
disco: DiscoState,
75+
sender: TransportsSender,
7776
) -> Self {
7877
Self {
7978
actor_handles: Mutex::new(FxHashMap::default()),
8079
endpoint_mapped_addrs: Default::default(),
8180
relay_mapped_addrs: Default::default(),
8281
local_endpoint_id,
8382
metrics,
84-
transports_handle: TransportsSenderActor::new(sender).start(),
8583
local_addrs,
8684
disco,
85+
sender,
8786
}
8887
}
8988

@@ -135,18 +134,17 @@ impl EndpointMap {
135134
Some(handle) => handle.sender.clone(),
136135
None => {
137136
// Create a new EndpointStateActor and insert it into the endpoint map.
138-
let sender = self.transports_handle.inbox.clone();
139137
let local_addrs = self.local_addrs.clone();
140138
let disco = self.disco.clone();
141139
let metrics = self.metrics.clone();
142140
let actor = EndpointStateActor::new(
143141
eid,
144142
self.local_endpoint_id,
145-
sender,
146143
local_addrs,
147144
disco,
148145
self.relay_mapped_addrs.clone(),
149146
metrics,
147+
self.sender.clone(),
150148
);
151149
let handle = actor.start();
152150
let sender = handle.sender.clone();
@@ -282,79 +280,6 @@ impl IpPort {
282280
}
283281
}
284282

285-
/// An actor that can send datagrams onto iroh transports.
286-
///
287-
/// The [`EndpointStateActor`]s want to be able to send datagrams. Because we can not create
288-
/// [`TransportsSender`]s on demand we must share one for the entire [`EndpointMap`], which
289-
/// lives in this actor.
290-
///
291-
/// [`EndpointStateActor`]: endpoint_state::EndpointStateActor
292-
#[derive(Debug)]
293-
struct TransportsSenderActor {
294-
sender: TransportsSender,
295-
}
296-
297-
impl TransportsSenderActor {
298-
fn new(sender: TransportsSender) -> Self {
299-
Self { sender }
300-
}
301-
302-
fn start(self) -> TransportsSenderHandle {
303-
// This actor gets an inbox size of exactly 1. This is the same as if they had the
304-
// underlying sender directly: either you can send or not, or you await until you
305-
// can. No need to introduce extra buffering.
306-
let (tx, rx) = mpsc::channel(1);
307-
308-
let task = task::spawn(
309-
async move {
310-
self.run(rx).await;
311-
}
312-
.instrument(info_span!("TransportsSenderActor")),
313-
);
314-
TransportsSenderHandle {
315-
inbox: tx,
316-
_task: AbortOnDropHandle::new(task),
317-
}
318-
}
319-
320-
async fn run(self, mut inbox: mpsc::Receiver<TransportsSenderMessage>) {
321-
use TransportsSenderMessage::SendDatagram;
322-
323-
while let Some(SendDatagram(dst, owned_transmit)) = inbox.recv().await {
324-
let transmit = transports::Transmit {
325-
ecn: owned_transmit.ecn,
326-
contents: owned_transmit.contents.as_ref(),
327-
segment_size: owned_transmit.segment_size,
328-
};
329-
let len = transmit.contents.len();
330-
match self.sender.send(&dst, None, &transmit).await {
331-
Ok(()) => {}
332-
Err(err) => {
333-
trace!(?dst, %len, "transmit failed to send: {err:#}");
334-
}
335-
};
336-
}
337-
trace!("actor terminating");
338-
}
339-
}
340-
341-
#[derive(Debug)]
342-
struct TransportsSenderHandle {
343-
inbox: mpsc::Sender<TransportsSenderMessage>,
344-
_task: AbortOnDropHandle<()>,
345-
}
346-
347-
#[derive(Debug)]
348-
enum TransportsSenderMessage {
349-
SendDatagram(transports::Addr, OwnedTransmit),
350-
}
351-
352-
impl From<(transports::Addr, OwnedTransmit)> for TransportsSenderMessage {
353-
fn from(source: (transports::Addr, OwnedTransmit)) -> Self {
354-
Self::SendDatagram(source.0, source.1)
355-
}
356-
}
357-
358283
#[cfg(test)]
359284
mod tests {
360285

iroh/src/magicsock/endpoint_map/endpoint_state.rs

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use std::{
77
};
88

99
use iroh_base::{EndpointAddr, EndpointId, RelayUrl, TransportAddr};
10-
use n0_error::StdResultExt;
1110
use n0_future::{
1211
FuturesUnordered, MergeUnbounded, Stream, StreamExt,
1312
task::{self, AbortOnDropHandle},
@@ -23,7 +22,7 @@ use tokio::sync::{mpsc, oneshot};
2322
use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError};
2423
use tracing::{Instrument, Level, debug, error, event, info_span, instrument, trace, warn};
2524

26-
use super::{Source, TransportsSenderMessage, path_state::PathState};
25+
use super::{Source, path_state::PathState};
2726
// TODO: Use this
2827
// #[cfg(any(test, feature = "test-utils"))]
2928
// use crate::endpoint::PathSelection;
@@ -33,7 +32,7 @@ use crate::{
3332
magicsock::{
3433
DiscoState, HEARTBEAT_INTERVAL, MagicsockMetrics, PATH_MAX_IDLE_TIMEOUT,
3534
mapped_addrs::{AddrMap, MappedAddr, RelayMappedAddr},
36-
transports::{self, OwnedTransmit},
35+
transports::{self, OwnedTransmit, TransportsSender},
3736
},
3837
util::MaybeFuture,
3938
};
@@ -95,10 +94,7 @@ pub(super) struct EndpointStateActor {
9594
//
9695
/// Metrics.
9796
metrics: Arc<MagicsockMetrics>,
98-
/// Allowing us to directly send datagrams.
99-
///
100-
/// Used for handling [`EndpointStateMessage::SendDatagram`] messages.
101-
transports_sender: mpsc::Sender<TransportsSenderMessage>,
97+
sender: TransportsSender,
10298
/// Our local addresses.
10399
///
104100
/// These are our local addresses and any reflexive transport addresses.
@@ -149,17 +145,16 @@ impl EndpointStateActor {
149145
pub(super) fn new(
150146
endpoint_id: EndpointId,
151147
local_endpoint_id: EndpointId,
152-
transports_sender: mpsc::Sender<TransportsSenderMessage>,
153148
local_addrs: n0_watcher::Direct<BTreeSet<DirectAddr>>,
154149
disco: DiscoState,
155150
relay_mapped_addrs: AddrMap<(RelayUrl, EndpointId), RelayMappedAddr>,
156151
metrics: Arc<MagicsockMetrics>,
152+
sender: TransportsSender,
157153
) -> Self {
158154
Self {
159155
endpoint_id,
160156
local_endpoint_id,
161157
metrics,
162-
transports_sender,
163158
local_addrs,
164159
relay_mapped_addrs,
165160
disco,
@@ -172,6 +167,7 @@ impl EndpointStateActor {
172167
scheduled_holepunch: None,
173168
scheduled_open_path: None,
174169
pending_open_paths: VecDeque::new(),
170+
sender,
175171
}
176172
}
177173

@@ -297,26 +293,34 @@ impl EndpointStateActor {
297293
Ok(())
298294
}
299295

296+
async fn send_datagram(
297+
&self,
298+
dst: transports::Addr,
299+
owned_transmit: OwnedTransmit,
300+
) -> n0_error::Result<()> {
301+
let transmit = transports::Transmit {
302+
ecn: owned_transmit.ecn,
303+
contents: owned_transmit.contents.as_ref(),
304+
segment_size: owned_transmit.segment_size,
305+
};
306+
self.sender.send(&dst, None, &transmit).await?;
307+
Ok(())
308+
}
309+
300310
/// Handles [`EndpointStateMessage::SendDatagram`].
301311
///
302312
/// Error returns are fatal and kill the actor.
303313
async fn handle_msg_send_datagram(&mut self, transmit: OwnedTransmit) -> n0_error::Result<()> {
304314
if let Some(addr) = self.selected_path.get() {
305315
trace!(?addr, "sending datagram to selected path");
306-
self.transports_sender
307-
.send((addr, transmit).into())
308-
.await
309-
.std_context("TransportSenderActor stopped")?;
316+
self.send_datagram(addr, transmit).await?;
310317
} else {
311318
trace!(
312319
paths = ?self.paths.keys().collect::<Vec<_>>(),
313320
"sending datagram to all known paths",
314321
);
315322
for addr in self.paths.keys() {
316-
self.transports_sender
317-
.send((addr.clone(), transmit.clone()).into())
318-
.await
319-
.std_context("TransportSenerActor stopped")?;
323+
self.send_datagram(addr.clone(), transmit.clone()).await?;
320324
}
321325
// This message is received *before* a connection is added. So we do
322326
// not yet have a connection to holepunch. Instead we trigger
@@ -711,7 +715,7 @@ impl EndpointStateActor {
711715
transports::Addr::Ip(_) => &self.metrics.send_disco_udp,
712716
transports::Addr::Relay(_, _) => &self.metrics.send_disco_relay,
713717
};
714-
match self.transports_sender.send((dst, transmit).into()).await {
718+
match self.send_datagram(dst, transmit).await {
715719
Ok(()) => {
716720
trace!("sent");
717721
counter.inc();

iroh/src/magicsock/transports.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -412,7 +412,7 @@ impl Addr {
412412
}
413413

414414
/// A sender that sends to all our transports.
415-
#[derive(Debug)]
415+
#[derive(Debug, Clone)]
416416
pub(crate) struct TransportsSender {
417417
#[cfg(not(wasm_browser))]
418418
ip: Vec<IpSender>,
@@ -602,8 +602,8 @@ impl quinn::AsyncUdpSocket for MagicTransport {
602602
/// This is special in that it handles [`MultipathMappedAddr::Mixed`] by delegating to the
603603
/// [`MagicSock`] which expands it back to one or more [`Addr`]s and sends it
604604
/// using the underlying [`Transports`].
605-
// TODO: Can I just send the TransportsSender along in the NodeStateMessage::SendDatagram
606-
// message?? That way you don't have to hook up the sender into the NodeMap!
605+
// TODO: Can I just send the TransportsSender along in the EndpointStateMessage::SendDatagram
606+
// message?? That way you don't have to hook up the sender into the EndpointMap!
607607
#[derive(Debug)]
608608
#[pin_project::pin_project]
609609
pub(crate) struct MagicSender {

iroh/src/magicsock/transports/ip.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,12 @@ impl IpTransport {
101101
}
102102

103103
pub(super) fn create_sender(&self) -> IpSender {
104-
let sender = self.socket.clone().create_sender();
104+
let socket = self.socket.clone();
105+
let sender = socket.clone().create_sender();
105106
IpSender {
106107
bind_addr: self.bind_addr,
107108
sender,
109+
socket,
108110
metrics: self.metrics.clone(),
109111
}
110112
}
@@ -136,11 +138,23 @@ impl IpNetworkChangeSender {
136138
#[pin_project]
137139
pub(super) struct IpSender {
138140
bind_addr: SocketAddr,
141+
socket: Arc<UdpSocket>,
139142
#[pin]
140143
sender: UdpSender,
141144
metrics: Arc<MagicsockMetrics>,
142145
}
143146

147+
impl Clone for IpSender {
148+
fn clone(&self) -> Self {
149+
Self {
150+
bind_addr: self.bind_addr,
151+
socket: self.socket.clone(),
152+
sender: self.socket.clone().create_sender(),
153+
metrics: self.metrics.clone(),
154+
}
155+
}
156+
}
157+
144158
impl IpSender {
145159
pub(super) fn is_valid_send_addr(&self, dst: &SocketAddr) -> bool {
146160
// Our net-tools crate binds sockets to their specific family. This means an IPv6

0 commit comments

Comments
 (0)