Skip to content

Commit 95674f2

Browse files
committed
manager thread wakeup atomically with park()
1 parent 6cb5ab3 commit 95674f2

File tree

3 files changed

+21
-106
lines changed

3 files changed

+21
-106
lines changed

Cargo.lock

Lines changed: 4 additions & 71 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/shadowsocks-service/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ local-tunnel = ["local"]
5656
# Enable socks4 protocol for sslocal
5757
local-socks4 = ["local"]
5858
# Enable Tun interface protocol for sslocal
59-
local-tun = ["local", "etherparse", "tun", "rand", "smoltcp", "parking_lot"]
59+
local-tun = ["local", "etherparse", "tun", "rand", "smoltcp"]
6060

6161
# Enable Stream Cipher Protocol
6262
# WARN: Stream Cipher Protocol is proved to be insecure
@@ -95,7 +95,6 @@ bytes = "1.0"
9595
byte_string = "1.0"
9696
byteorder = "1.3"
9797
rand = { version = "0.8", optional = true }
98-
parking_lot = { version = "0.12", optional = true }
9998

10099
futures = "0.3"
101100
tokio = { version = "1.5", features = ["io-util", "macros", "net", "parking_lot", "rt", "sync", "time"] }

crates/shadowsocks-service/src/local/tun/tcp.rs

Lines changed: 16 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,11 @@ use std::{
99
Arc,
1010
},
1111
task::{Context, Poll, Waker},
12-
thread::{self, JoinHandle},
12+
thread::{self, JoinHandle, Thread},
1313
time::Duration,
1414
};
1515

1616
use log::{error, trace};
17-
use parking_lot::{Condvar as ParkingCondvar, Mutex as ParkingMutex};
1817
use shadowsocks::{net::TcpSocketOpts, relay::socks5::Address};
1918
use smoltcp::{
2019
iface::{Interface, InterfaceBuilder, Routes, SocketHandle},
@@ -54,42 +53,21 @@ struct TcpSocketControl {
5453
}
5554

5655
struct ManagerNotify {
57-
cond: ParkingCondvar,
58-
mutex: ParkingMutex<()>,
59-
running: AtomicBool,
56+
thread: Thread,
6057
}
6158

6259
impl ManagerNotify {
63-
fn new() -> ManagerNotify {
64-
ManagerNotify {
65-
cond: ParkingCondvar::new(),
66-
mutex: ParkingMutex::new(()),
67-
running: AtomicBool::new(true),
68-
}
60+
fn new(thread: Thread) -> ManagerNotify {
61+
ManagerNotify { thread }
6962
}
7063

7164
fn notify(&self) {
72-
self.cond.notify_all();
73-
}
74-
75-
fn running(&self) -> bool {
76-
self.running.load(Ordering::Relaxed)
77-
}
78-
79-
fn abort(&self) {
80-
self.running.store(false, Ordering::Relaxed);
81-
self.notify();
82-
}
83-
84-
fn wait(&self, timeout: Duration) {
85-
let mut guard = self.mutex.lock();
86-
self.cond.wait_for(&mut guard, timeout);
65+
self.thread.unpark();
8766
}
8867
}
8968

9069
struct TcpSocketManager {
9170
iface: Interface<'static, VirtTunDevice>,
92-
manager_notify: Arc<ManagerNotify>,
9371
sockets: HashMap<SocketHandle, SharedTcpConnectionControl>,
9472
socket_creation_rx: mpsc::UnboundedReceiver<TcpSocketCreation>,
9573
}
@@ -226,14 +204,15 @@ pub struct TcpTun {
226204
manager_handle: Option<JoinHandle<()>>,
227205
manager_notify: Arc<ManagerNotify>,
228206
manager_socket_creation_tx: mpsc::UnboundedSender<TcpSocketCreation>,
207+
manager_running: Arc<AtomicBool>,
229208
balancer: PingBalancer,
230209
iface_rx: mpsc::UnboundedReceiver<Vec<u8>>,
231210
iface_tx: mpsc::Sender<Vec<u8>>,
232211
}
233212

234213
impl Drop for TcpTun {
235214
fn drop(&mut self) {
236-
self.manager_notify.abort();
215+
self.manager_running.store(false, Ordering::Relaxed);
237216
let _ = self.manager_handle.take().unwrap().join();
238217
}
239218
}
@@ -264,26 +243,27 @@ impl TcpTun {
264243
.routes(iface_routes)
265244
.finalize();
266245

267-
let manager_notify = Arc::new(ManagerNotify::new());
268246
let (manager_socket_creation_tx, manager_socket_creation_rx) = mpsc::unbounded_channel();
269247
let mut manager = TcpSocketManager {
270248
iface,
271-
manager_notify: manager_notify.clone(),
272249
sockets: HashMap::new(),
273250
socket_creation_rx: manager_socket_creation_rx,
274251
};
275252

253+
let manager_running = Arc::new(AtomicBool::new(true));
254+
276255
let manager_handle = {
256+
let manager_running = manager_running.clone();
257+
277258
thread::spawn(move || {
278259
let TcpSocketManager {
279260
ref mut iface,
280261
ref mut sockets,
281262
ref mut socket_creation_rx,
282-
ref manager_notify,
283263
..
284264
} = manager;
285265

286-
while manager_notify.running() {
266+
while manager_running.load(Ordering::Relaxed) {
287267
while let Ok(TcpSocketCreation { control, socket }) = socket_creation_rx.try_recv() {
288268
let handle = iface.add_socket(socket);
289269
sockets.insert(handle, control);
@@ -402,19 +382,22 @@ impl TcpTun {
402382
.unwrap_or(SmolDuration::from_millis(1));
403383

404384
if next_duration.total_millis() != 0 {
405-
manager_notify.wait(Duration::from(next_duration));
385+
thread::park_timeout(Duration::from(next_duration));
406386
}
407387
}
408388

409389
trace!("VirtDevice::poll thread exited");
410390
})
411391
};
412392

393+
let manager_notify = Arc::new(ManagerNotify::new(manager_handle.thread().clone()));
394+
413395
TcpTun {
414396
context,
415397
manager_handle: Some(manager_handle),
416398
manager_notify,
417399
manager_socket_creation_tx,
400+
manager_running,
418401
balancer,
419402
iface_rx,
420403
iface_tx,

0 commit comments

Comments
 (0)