Skip to content

Commit dc0de88

Browse files
committed
VirtDevice::poll manager without lock
1 parent 2e9fc33 commit dc0de88

File tree

1 file changed

+59
-34
lines changed
  • crates/shadowsocks-service/src/local/tun

1 file changed

+59
-34
lines changed

crates/shadowsocks-service/src/local/tun/tcp.rs

Lines changed: 59 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,15 @@ struct TcpSocketControl {
5454

5555
struct ManagerNotify {
5656
cond: ParkingCondvar,
57+
mutex: ParkingMutex<()>,
5758
running: AtomicBool,
5859
}
5960

6061
impl ManagerNotify {
6162
fn new() -> ManagerNotify {
6263
ManagerNotify {
6364
cond: ParkingCondvar::new(),
65+
mutex: ParkingMutex::new(()),
6466
running: AtomicBool::new(true),
6567
}
6668
}
@@ -77,18 +79,29 @@ impl ManagerNotify {
7779
self.running.store(false, Ordering::Relaxed);
7880
self.notify();
7981
}
82+
83+
fn wait(&self, timeout: Duration) {
84+
let mut guard = self.mutex.lock();
85+
self.cond.wait_for(&mut guard, timeout);
86+
}
8087
}
8188

8289
struct TcpSocketManager {
8390
iface: Interface<'static, VirtTunDevice>,
8491
manager_notify: Arc<ManagerNotify>,
85-
sockets: HashMap<SocketHandle, Arc<ParkingMutex<TcpSocketControl>>>,
92+
sockets: HashMap<SocketHandle, SharedTcpConnectionControl>,
93+
socket_creation_rx: mpsc::UnboundedReceiver<TcpSocketCreation>,
8694
}
8795

88-
type SharedTcpSocketManager = Arc<ParkingMutex<TcpSocketManager>>;
96+
type SharedTcpConnectionControl = Arc<ParkingMutex<TcpSocketControl>>;
97+
98+
struct TcpSocketCreation {
99+
control: SharedTcpConnectionControl,
100+
socket: TcpSocket<'static>,
101+
}
89102

90103
struct TcpConnection {
91-
control: Arc<ParkingMutex<TcpSocketControl>>,
104+
control: SharedTcpConnectionControl,
92105
manager_notify: Arc<ManagerNotify>,
93106
}
94107

@@ -100,25 +113,27 @@ impl Drop for TcpConnection {
100113
}
101114

102115
impl TcpConnection {
103-
fn new(socket: TcpSocket<'static>, manager: SharedTcpSocketManager, tcp_opts: &TcpSocketOpts) -> TcpConnection {
116+
fn new(
117+
socket: TcpSocket<'static>,
118+
socket_creation_tx: &mpsc::UnboundedSender<TcpSocketCreation>,
119+
manager_notify: Arc<ManagerNotify>,
120+
tcp_opts: &TcpSocketOpts,
121+
) -> TcpConnection {
104122
let send_buffer_size = tcp_opts.send_buffer_size.unwrap_or(DEFAULT_TCP_SEND_BUFFER_SIZE);
105123
let recv_buffer_size = tcp_opts.recv_buffer_size.unwrap_or(DEFAULT_TCP_RECV_BUFFER_SIZE);
106124

107-
let (control, manager_notify) = {
108-
let mut manager = manager.lock();
109-
let socket_handle = manager.iface.add_socket(socket);
110-
111-
let control = Arc::new(ParkingMutex::new(TcpSocketControl {
112-
send_buffer: RingBuffer::new(vec![0u8; send_buffer_size as usize]),
113-
send_waker: None,
114-
recv_buffer: RingBuffer::new(vec![0u8; recv_buffer_size as usize]),
115-
recv_waker: None,
116-
is_closed: false,
117-
}));
125+
let control = Arc::new(ParkingMutex::new(TcpSocketControl {
126+
send_buffer: RingBuffer::new(vec![0u8; send_buffer_size as usize]),
127+
send_waker: None,
128+
recv_buffer: RingBuffer::new(vec![0u8; recv_buffer_size as usize]),
129+
recv_waker: None,
130+
is_closed: false,
131+
}));
118132

119-
manager.sockets.insert(socket_handle.clone(), control.clone());
120-
(control, manager.manager_notify.clone())
121-
};
133+
let _ = socket_creation_tx.send(TcpSocketCreation {
134+
control: control.clone(),
135+
socket,
136+
});
122137

123138
TcpConnection {
124139
control,
@@ -212,9 +227,9 @@ impl AsyncWrite for TcpConnection {
212227

213228
pub struct TcpTun {
214229
context: Arc<ServiceContext>,
215-
manager: SharedTcpSocketManager,
216230
manager_handle: Option<JoinHandle<()>>,
217231
manager_notify: Arc<ManagerNotify>,
232+
manager_socket_creation_tx: mpsc::UnboundedSender<TcpSocketCreation>,
218233
balancer: PingBalancer,
219234
iface_rx: mpsc::UnboundedReceiver<Vec<u8>>,
220235
iface_tx: mpsc::Sender<Vec<u8>>,
@@ -254,24 +269,29 @@ impl TcpTun {
254269
.finalize();
255270

256271
let manager_notify = Arc::new(ManagerNotify::new());
257-
let manager = Arc::new(ParkingMutex::new(TcpSocketManager {
272+
let (manager_socket_creation_tx, manager_socket_creation_rx) = mpsc::unbounded_channel();
273+
let mut manager = TcpSocketManager {
258274
iface,
259275
manager_notify: manager_notify.clone(),
260276
sockets: HashMap::new(),
261-
}));
277+
socket_creation_rx: manager_socket_creation_rx,
278+
};
262279

263280
let manager_handle = {
264-
let manager = manager.clone();
265-
let manager_notify = manager_notify.clone();
266281
thread::spawn(move || {
267-
let mut manager_guard = manager.lock();
282+
let TcpSocketManager {
283+
ref mut iface,
284+
ref mut sockets,
285+
ref mut socket_creation_rx,
286+
ref manager_notify,
287+
..
288+
} = manager;
268289

269290
while manager_notify.running() {
270-
let TcpSocketManager {
271-
ref mut iface,
272-
ref mut sockets,
273-
..
274-
} = *manager_guard;
291+
while let Ok(TcpSocketCreation { control, socket }) = socket_creation_rx.try_recv() {
292+
let handle = iface.add_socket(socket);
293+
sockets.insert(handle, control);
294+
}
275295

276296
let before_poll = SmolInstant::now();
277297
let updated_sockets = match iface.poll(before_poll) {
@@ -385,9 +405,9 @@ impl TcpTun {
385405
.poll_delay(SmolInstant::now())
386406
.unwrap_or(SmolDuration::from_millis(50));
387407

388-
manager_notify
389-
.cond
390-
.wait_for(&mut manager_guard, Duration::from(next_duration));
408+
if next_duration.total_millis() != 0 {
409+
manager_notify.wait(Duration::from(next_duration));
410+
}
391411
}
392412

393413
trace!("VirtDevice::poll thread exited");
@@ -396,9 +416,9 @@ impl TcpTun {
396416

397417
TcpTun {
398418
context,
399-
manager,
400419
manager_handle: Some(manager_handle),
401420
manager_notify,
421+
manager_socket_creation_tx,
402422
balancer,
403423
iface_rx,
404424
iface_tx,
@@ -434,7 +454,12 @@ impl TcpTun {
434454

435455
trace!("created TCP connection for {} <-> {}", src_addr, dst_addr);
436456

437-
let connection = TcpConnection::new(socket, self.manager.clone(), &accept_opts.tcp);
457+
let connection = TcpConnection::new(
458+
socket,
459+
&self.manager_socket_creation_tx,
460+
self.manager_notify.clone(),
461+
&accept_opts.tcp,
462+
);
438463

439464
// establish a tunnel
440465
let context = self.context.clone();

0 commit comments

Comments
 (0)