Skip to content

Commit 5365d90

Browse files
committed
fix mio poll issues
1 parent 59471a5 commit 5365d90

File tree

7 files changed

+97
-78
lines changed

7 files changed

+97
-78
lines changed

src/args.rs

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,49 +33,57 @@ pub struct Args {
3333

3434
/// enable IPv6 on the server (on most hosts, this will allow both IPv4 and IPv6,
3535
/// but it might limit to just IPv6 on some)
36-
#[arg(short = '6', long)]
36+
#[arg(short = '6', long, conflicts_with = "client")]
3737
pub version6: bool,
3838

3939
/// limit the number of concurrent clients that can be processed by a server;
4040
/// any over this count will be immediately disconnected
41-
#[arg(long, value_name = "number", default_value = "0")]
41+
#[arg(long, value_name = "number", default_value = "0", conflicts_with = "client")]
4242
pub client_limit: usize,
4343

4444
/// run in client mode; value is the server's address
4545
#[arg(short, long, value_name = "host", conflicts_with = "server")]
4646
pub client: Option<std::net::IpAddr>,
4747

4848
/// run in reverse-mode (server sends, client receives)
49-
#[arg(short = 'R', long)]
49+
#[arg(short = 'R', long, conflicts_with = "server")]
5050
pub reverse: bool,
5151

5252
/// the format in which to deplay information (json, megabit/sec, megabyte/sec)
53-
#[arg(short, long, value_enum, value_name = "format", default_value = "megabit")]
53+
#[arg(
54+
short,
55+
long,
56+
value_enum,
57+
value_name = "format",
58+
default_value = "megabit",
59+
conflicts_with = "server"
60+
)]
5461
pub format: Format,
5562

5663
/// use UDP rather than TCP
57-
#[arg(short, long)]
64+
#[arg(short, long, conflicts_with = "server")]
5865
pub udp: bool,
5966

6067
/// target bandwidth in bytes/sec; this value is applied to each stream,
6168
/// with a default target of 1 megabit/second for all protocols (note: megabit, not mebibit);
6269
/// the suffixes kKmMgG can also be used for xbit and xbyte, respectively
63-
#[arg(short, long, default_value = "125000", value_name = "bytes/sec")]
70+
#[arg(short, long, default_value = "125000", value_name = "bytes/sec", conflicts_with = "server")]
6471
pub bandwidth: String,
6572

6673
/// the time in seconds for which to transmit
67-
#[arg(short, long, default_value = "10.0", value_name = "seconds")]
74+
#[arg(short, long, default_value = "10.0", value_name = "seconds", conflicts_with = "server")]
6875
pub time: f64,
6976

7077
/// the interval at which to send batches of data, in seconds, between [0.0 and 1.0);
7178
/// this is used to evenly spread packets out over time
72-
#[arg(long, default_value = "0.05", value_name = "seconds")]
79+
#[arg(long, default_value = "0.05", value_name = "seconds", conflicts_with = "server")]
7380
pub send_interval: f64,
7481

7582
/// length of the buffer to exchange; for TCP, this defaults to 32 kibibytes; for UDP, it's 1024 bytes
7683
#[arg(
7784
short,
7885
long,
86+
conflicts_with = "server",
7987
default_value = "32768",
8088
default_value_if("udp", "true", Some("1024")),
8189
value_name = "bytes"
@@ -85,27 +93,27 @@ pub struct Args {
8593
/// send buffer, in bytes (only supported on some platforms;
8694
/// if set too small, a 'resource unavailable' error may occur;
8795
/// affects TCP window-size)
88-
#[arg(long, default_value = "0", value_name = "bytes")]
96+
#[arg(long, default_value = "0", value_name = "bytes", conflicts_with = "server")]
8997
pub send_buffer: usize,
9098

9199
/// receive buffer, in bytes (only supported on some platforms;
92100
/// if set too small, a 'resource unavailable' error may occur; affects TCP window-size)
93-
#[arg(long, default_value = "0", value_name = "bytes")]
101+
#[arg(long, default_value = "0", value_name = "bytes", conflicts_with = "server")]
94102
pub receive_buffer: usize,
95103

96104
/// the number of parallel data-streams to use
97-
#[arg(short = 'P', long, value_name = "number", default_value = "1")]
105+
#[arg(short = 'P', long, value_name = "number", default_value = "1", conflicts_with = "server")]
98106
pub parallel: usize,
99107

100108
/// omit a number of seconds from the start of calculations,
101109
/// primarily to avoid including TCP ramp-up in averages;
102110
/// using this option may result in disagreement between bytes sent and received,
103111
/// since data can be in-flight across time-boundaries
104-
#[arg(short, long, default_value = "0", value_name = "seconds")]
112+
#[arg(short, long, default_value = "0", value_name = "seconds", conflicts_with = "server")]
105113
pub omit: usize,
106114

107115
/// use no-delay mode for TCP tests, disabling Nagle's Algorithm
108-
#[arg(short = 'N', long)]
116+
#[arg(short = 'N', long, conflicts_with = "server")]
109117
pub no_delay: bool,
110118

111119
/// an optional pool of IPv4 TCP ports over which data will be accepted;

src/lib.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,10 @@ pub mod server;
55
pub(crate) mod stream;
66
pub(crate) mod utils;
77

8-
pub(crate) type BoxResult<T> = Result<T, Box<dyn std::error::Error + Send + Sync + 'static>>;
8+
pub type BoxResult<T> = Result<T, Box<dyn std::error::Error + Send + Sync + 'static>>;
9+
10+
/// a global token generator
11+
pub(crate) fn get_global_token() -> mio::Token {
12+
mio::Token(TOKEN_SEED.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1)
13+
}
14+
static TOKEN_SEED: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);

src/main.rs

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
* along with rperf. If not, see <https://www.gnu.org/licenses/>.
1919
*/
2020

21-
use rperf::{args, client, server};
21+
use rperf::{args, client, server, BoxResult};
2222

23-
fn main() {
23+
fn main() -> BoxResult<()> {
2424
use clap::Parser;
2525
let args = args::Args::parse();
2626

@@ -45,12 +45,8 @@ fn main() {
4545
.expect("unable to set SIGINT handler");
4646

4747
log::debug!("beginning normal operation...");
48-
let service = server::serve(&args);
48+
server::serve(&args)?;
4949
exiting.join().expect("unable to join SIGINT handler thread");
50-
if service.is_err() {
51-
log::error!("unable to run server: {}", service.unwrap_err());
52-
std::process::exit(4);
53-
}
5450
} else if args.client.is_some() {
5551
log::debug!("registering SIGINT handler...");
5652
ctrlc2::set_handler(move || {
@@ -65,15 +61,11 @@ fn main() {
6561
.expect("unable to set SIGINT handler");
6662

6763
log::debug!("connecting to server...");
68-
let execution = client::execute(&args);
69-
if execution.is_err() {
70-
log::error!("unable to run client: {}", execution.unwrap_err());
71-
std::process::exit(4);
72-
}
64+
client::execute(&args)?;
7365
} else {
7466
use clap::CommandFactory;
7567
let mut cmd = args::Args::command();
7668
cmd.print_help().unwrap();
77-
std::process::exit(2);
7869
}
70+
Ok(())
7971
}

src/protocol/communication.rs

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use std::io::{self, Read, Write};
2222
use std::time::Duration;
2323

2424
use mio::net::TcpStream;
25-
use mio::{Events, Interest, Poll, Token};
25+
use mio::{Events, Interest, Poll};
2626

2727
use crate::BoxResult;
2828

@@ -51,14 +51,17 @@ pub fn send(stream: &mut TcpStream, message: &serde_json::Value) -> BoxResult<()
5151

5252
/// receives the length-count of a pending message over a client-server communications stream
5353
fn receive_length(stream: &mut TcpStream, alive_check: fn() -> bool, results_handler: &mut dyn FnMut() -> BoxResult<()>) -> BoxResult<u16> {
54-
let mio_token = Token(0);
54+
let mio_token = crate::get_global_token();
5555
let mut poll = Poll::new()?;
5656
poll.registry().register(stream, mio_token, Interest::READABLE)?;
5757
let mut events = Events::with_capacity(1); //only interacting with one stream
5858

5959
let mut length_bytes_read = 0;
6060
let mut length_spec: [u8; 2] = [0; 2];
61-
while alive_check() {
61+
let result: BoxResult<u16> = 'exiting: loop {
62+
if !alive_check() {
63+
break 'exiting Ok(0);
64+
}
6265
//waiting to find out how long the next message is
6366
results_handler()?; //send any outstanding results between cycles
6467
poll.poll(&mut events, Some(POLL_TIMEOUT))?;
@@ -72,31 +75,33 @@ fn receive_length(stream: &mut TcpStream, alive_check: fn() -> bool, results_han
7275
break;
7376
}
7477
Err(e) => {
75-
return Err(Box::new(e));
78+
break 'exiting Err(Box::new(e));
7679
}
7780
};
7881

7982
if size == 0 {
8083
if alive_check() {
81-
return Err(Box::new(simple_error::simple_error!("connection lost")));
84+
break 'exiting Err(Box::new(simple_error::simple_error!("connection lost")));
8285
} else {
8386
//shutting down; a disconnect is expected
84-
return Err(Box::new(simple_error::simple_error!("local shutdown requested")));
87+
break 'exiting Err(Box::new(simple_error::simple_error!("local shutdown requested")));
8588
}
8689
}
8790

8891
length_bytes_read += size;
8992
if length_bytes_read == 2 {
9093
let length = u16::from_be_bytes(length_spec);
9194
log::debug!("received length-spec of {} from {}", length, stream.peer_addr()?);
92-
return Ok(length);
95+
break 'exiting Ok(length);
9396
} else {
9497
log::debug!("received partial length-spec from {}", stream.peer_addr()?);
9598
}
9699
}
97100
}
98-
}
99-
Err(Box::new(simple_error::simple_error!("system shutting down")))
101+
};
102+
poll.registry().deregister(stream)?;
103+
result
104+
// Err(Box::new(simple_error::simple_error!("system shutting down")))
100105
}
101106
/// receives the data-value of a pending message over a client-server communications stream
102107
fn receive_payload(
@@ -105,14 +110,17 @@ fn receive_payload(
105110
results_handler: &mut dyn FnMut() -> BoxResult<()>,
106111
length: u16,
107112
) -> BoxResult<serde_json::Value> {
108-
let mio_token = Token(0);
113+
let mio_token = crate::get_global_token();
109114
let mut poll = Poll::new()?;
110115
poll.registry().register(stream, mio_token, Interest::READABLE)?;
111116
let mut events = Events::with_capacity(1); //only interacting with one stream
112117

113118
let mut bytes_read = 0;
114119
let mut buffer = vec![0_u8; length.into()];
115-
while alive_check() {
120+
let result: BoxResult<serde_json::Value> = 'exiting: loop {
121+
if !alive_check() {
122+
break 'exiting Ok(serde_json::from_slice(&buffer[0..0])?);
123+
}
116124
//waiting to receive the payload
117125
results_handler()?; //send any outstanding results between cycles
118126
poll.poll(&mut events, Some(POLL_TIMEOUT))?;
@@ -126,16 +134,16 @@ fn receive_payload(
126134
break;
127135
}
128136
Err(e) => {
129-
return Err(Box::new(e));
137+
break 'exiting Err(Box::new(e));
130138
}
131139
};
132140

133141
if size == 0 {
134142
if alive_check() {
135-
return Err(Box::new(simple_error::simple_error!("connection lost")));
143+
break 'exiting Err(Box::new(simple_error::simple_error!("connection lost")));
136144
} else {
137145
// shutting down; a disconnect is expected
138-
return Err(Box::new(simple_error::simple_error!("local shutdown requested")));
146+
break 'exiting Err(Box::new(simple_error::simple_error!("local shutdown requested")));
139147
}
140148
}
141149

@@ -144,19 +152,21 @@ fn receive_payload(
144152
match serde_json::from_slice(&buffer) {
145153
Ok(v) => {
146154
log::debug!("received {:?} from {}", v, stream.peer_addr()?);
147-
return Ok(v);
155+
break 'exiting Ok(v);
148156
}
149157
Err(e) => {
150-
return Err(Box::new(e));
158+
break 'exiting Err(Box::new(e));
151159
}
152160
}
153161
} else {
154162
log::debug!("received partial payload from {}", stream.peer_addr()?);
155163
}
156164
}
157165
}
158-
}
159-
Err(Box::new(simple_error::simple_error!("system shutting down")))
166+
};
167+
poll.registry().deregister(stream)?;
168+
result
169+
// Err(Box::new(simple_error::simple_error!("system shutting down")))
160170
}
161171
/// handles the full process of retrieving a message from a client-server communications stream
162172
pub fn receive(

src/server.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use std::thread;
2727
use std::time::Duration;
2828

2929
use mio::net::{TcpListener, TcpStream};
30-
use mio::{Events, Interest, Poll, Token};
30+
use mio::{Events, Interest, Poll};
3131

3232
use crate::args::Args;
3333
use crate::protocol::communication::{receive, send, KEEPALIVE_DURATION};
@@ -328,15 +328,15 @@ pub fn serve(args: &Args) -> BoxResult<()> {
328328
TcpListener::bind(SocketAddr::new(args.bind, port)).unwrap_or_else(|_| panic!("failed to bind TCP socket, port {}", port));
329329
log::info!("server listening on {}", listener.local_addr()?);
330330

331-
let mio_token = Token(0);
331+
let mio_token = crate::get_global_token();
332332
let mut poll = Poll::new()?;
333333
poll.registry().register(&mut listener, mio_token, Interest::READABLE)?;
334334
let mut events = Events::with_capacity(32);
335335

336336
while is_alive() {
337337
if let Err(err) = poll.poll(&mut events, Some(POLL_TIMEOUT)) {
338338
if err.kind() == std::io::ErrorKind::Interrupted {
339-
log::debug!("Poll interrupted: \"{err}\", ignored, continue polling");
339+
log::debug!("Poll interrupted: \"{err}\"");
340340
continue;
341341
}
342342
log::error!("Poll error: {}", err);

0 commit comments

Comments
 (0)