Skip to content

Commit 508d4a7

Browse files
committed
rustfmt max_width = 140
1 parent a80178b commit 508d4a7

File tree

11 files changed

+304
-775
lines changed

11 files changed

+304
-775
lines changed

rustfmt.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
max_width = 140

src/args.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,7 @@ pub struct Args {
5050
pub reverse: bool,
5151

5252
/// the format in which to deplay information (json, megabit/sec, megabyte/sec)
53-
#[arg(
54-
short,
55-
long,
56-
value_enum,
57-
value_name = "format",
58-
default_value = "megabit"
59-
)]
53+
#[arg(short, long, value_enum, value_name = "format", default_value = "megabit")]
6054
pub format: Format,
6155

6256
/// use UDP rather than TCP

src/client.rs

Lines changed: 27 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,9 @@ use crate::{
2222
args,
2323
protocol::{
2424
communication::{receive, send, KEEPALIVE_DURATION},
25-
messaging::{
26-
prepare_begin, prepare_download_configuration, prepare_end,
27-
prepare_upload_configuration,
28-
},
29-
results::ClientDoneResult,
30-
results::{
31-
IntervalResultBox, IntervalResultKind, TcpTestResults, TestResults, UdpTestResults,
32-
},
25+
messaging::{prepare_begin, prepare_download_configuration, prepare_end, prepare_upload_configuration},
26+
results::{ClientDoneResult, ClientFailedResult},
27+
results::{IntervalResultBox, IntervalResultKind, TcpTestResults, TestResults, UdpTestResults},
3328
},
3429
stream::{tcp, udp, TestStream},
3530
};
@@ -63,21 +58,12 @@ fn connect_to_server(address: &str, port: &u16) -> BoxResult<TcpStream> {
6358

6459
let server_addr = destination.to_socket_addrs()?.next();
6560
if server_addr.is_none() {
66-
return Err(Box::new(simple_error::simple_error!(
67-
"unable to resolve {}",
68-
address
69-
)));
61+
return Err(Box::new(simple_error::simple_error!("unable to resolve {}", address)));
7062
}
71-
let raw_stream =
72-
match std::net::TcpStream::connect_timeout(&server_addr.unwrap(), CONNECT_TIMEOUT) {
73-
Ok(s) => s,
74-
Err(e) => {
75-
return Err(Box::new(simple_error::simple_error!(
76-
"unable to connect: {}",
77-
e
78-
)))
79-
}
80-
};
63+
let raw_stream = match std::net::TcpStream::connect_timeout(&server_addr.unwrap(), CONNECT_TIMEOUT) {
64+
Ok(s) => s,
65+
Err(e) => return Err(Box::new(simple_error::simple_error!("unable to connect: {}", e))),
66+
};
8167
let stream = match TcpStream::from_stream(raw_stream) {
8268
Ok(s) => s,
8369
Err(e) => {
@@ -89,12 +75,8 @@ fn connect_to_server(address: &str, port: &u16) -> BoxResult<TcpStream> {
8975
};
9076
log::info!("connected to server");
9177

92-
stream
93-
.set_nodelay(true)
94-
.expect("cannot disable Nagle's algorithm");
95-
stream
96-
.set_keepalive(Some(KEEPALIVE_DURATION))
97-
.expect("unable to set TCP keepalive");
78+
stream.set_nodelay(true).expect("cannot disable Nagle's algorithm");
79+
stream.set_keepalive(Some(KEEPALIVE_DURATION)).expect("unable to set TCP keepalive");
9880

9981
Ok(stream)
10082
}
@@ -121,18 +103,10 @@ pub fn execute(args: &args::Args) -> BoxResult<()> {
121103
let mut complete = false;
122104

123105
//config-parsing and pre-connection setup
124-
let mut tcp_port_pool = tcp::receiver::TcpPortPool::new(
125-
args.tcp_port_pool.to_string(),
126-
args.tcp6_port_pool.to_string(),
127-
);
128-
let mut udp_port_pool = udp::receiver::UdpPortPool::new(
129-
args.udp_port_pool.to_string(),
130-
args.udp6_port_pool.to_string(),
131-
);
132-
133-
let cpu_affinity_manager = Arc::new(Mutex::new(
134-
crate::utils::cpu_affinity::CpuAffinityManager::new(&args.affinity)?,
135-
));
106+
let mut tcp_port_pool = tcp::receiver::TcpPortPool::new(args.tcp_port_pool.to_string(), args.tcp6_port_pool.to_string());
107+
let mut udp_port_pool = udp::receiver::UdpPortPool::new(args.udp_port_pool.to_string(), args.udp6_port_pool.to_string());
108+
109+
let cpu_affinity_manager = Arc::new(Mutex::new(crate::utils::cpu_affinity::CpuAffinityManager::new(&args.affinity)?));
136110

137111
let display_json: bool;
138112
let display_bit: bool;
@@ -163,16 +137,14 @@ pub fn execute(args: &args::Args) -> BoxResult<()> {
163137

164138
//scaffolding to track and relay the streams and stream-results associated with this test
165139
let stream_count = download_config.get("streams").unwrap().as_i64().unwrap() as usize;
166-
let mut parallel_streams: Vec<Arc<Mutex<(dyn TestStream + Sync + Send)>>> =
167-
Vec::with_capacity(stream_count);
140+
let mut parallel_streams: Vec<Arc<Mutex<(dyn TestStream + Sync + Send)>>> = Vec::with_capacity(stream_count);
168141
let mut parallel_streams_joinhandles = Vec::with_capacity(stream_count);
169142
let (results_tx, results_rx): (
170143
std::sync::mpsc::Sender<IntervalResultBox>,
171144
std::sync::mpsc::Receiver<IntervalResultBox>,
172145
) = channel();
173146

174-
let test_results: Mutex<Box<dyn TestResults>> =
175-
prepare_test_results(is_udp, stream_count as u8);
147+
let test_results: Mutex<Box<dyn TestResults>> = prepare_test_results(is_udp, stream_count as u8);
176148

177149
//a closure used to pass results from stream-handlers to the test-result structure
178150
let mut results_handler = || -> BoxResult<()> {
@@ -193,10 +165,7 @@ pub fn execute(args: &args::Args) -> BoxResult<()> {
193165
} else {
194166
log::warn!("stream {} failed", result.get_stream_idx());
195167
}
196-
tr.mark_stream_done(
197-
&result.get_stream_idx(),
198-
result.kind() == IntervalResultKind::ClientDone,
199-
);
168+
tr.mark_stream_done(&result.get_stream_idx(), result.kind() == IntervalResultKind::ClientDone);
200169
if tr.count_in_progress_streams() == 0 {
201170
complete = true;
202171

@@ -226,10 +195,7 @@ pub fn execute(args: &args::Args) -> BoxResult<()> {
226195

227196
if is_udp {
228197
//UDP
229-
log::info!(
230-
"preparing for reverse-UDP test with {} streams...",
231-
stream_count
232-
);
198+
log::info!("preparing for reverse-UDP test with {} streams...", stream_count);
233199

234200
let test_definition = udp::UdpTestDefinition::new(&download_config)?;
235201
for stream_idx in 0..stream_count {
@@ -246,10 +212,7 @@ pub fn execute(args: &args::Args) -> BoxResult<()> {
246212
}
247213
} else {
248214
//TCP
249-
log::info!(
250-
"preparing for reverse-TCP test with {} streams...",
251-
stream_count
252-
);
215+
log::info!("preparing for reverse-TCP test with {} streams...", stream_count);
253216

254217
let test_definition = tcp::TcpTestDefinition::new(&download_config)?;
255218
for stream_idx in 0..stream_count {
@@ -410,11 +373,9 @@ pub fn execute(args: &args::Args) -> BoxResult<()> {
410373
},
411374
Err(e) => {
412375
log::error!("unable to process stream: {}", e);
413-
match c_results_tx.send(Box::new(
414-
crate::protocol::results::ClientFailedResult {
415-
stream_idx: test.get_idx(),
416-
},
417-
)) {
376+
match c_results_tx.send(Box::new(ClientFailedResult {
377+
stream_idx: test.get_idx(),
378+
})) {
418379
Ok(_) => (),
419380
Err(e) => {
420381
log::error!("unable to report interval-failed-result: {}", e)
@@ -457,8 +418,7 @@ pub fn execute(args: &args::Args) -> BoxResult<()> {
457418
"receive" | "send" => {
458419
//receive/send-results from the server
459420
if !display_json {
460-
let result =
461-
crate::protocol::results::interval_result_from_json(payload.clone())?;
421+
let result = crate::protocol::results::interval_result_from_json(payload.clone())?;
462422
println!("{}", result.to_string(display_bit));
463423
}
464424
let mut tr = test_results.lock().unwrap();
@@ -480,9 +440,7 @@ pub fn execute(args: &args::Args) -> BoxResult<()> {
480440
_ => (), //not possible
481441
}
482442
tr.mark_stream_done_server(&(idx64 as u8));
483-
if tr.count_in_progress_streams() == 0
484-
&& tr.count_in_progress_streams_server() == 0
485-
{
443+
if tr.count_in_progress_streams() == 0 && tr.count_in_progress_streams_server() == 0 {
486444
//all data gathered from both sides
487445
kill();
488446
}
@@ -517,9 +475,7 @@ pub fn execute(args: &args::Args) -> BoxResult<()> {
517475
let mut stream = match (*ps).lock() {
518476
Ok(guard) => guard,
519477
Err(poisoned) => {
520-
log::error!(
521-
"a stream-handler was poisoned; this indicates some sort of logic error"
522-
);
478+
log::error!("a stream-handler was poisoned; this indicates some sort of logic error");
523479
poisoned.into_inner()
524480
}
525481
};
@@ -601,23 +557,14 @@ pub fn kill() -> bool {
601557
}
602558
fn start_kill_timer() {
603559
unsafe {
604-
KILL_TIMER_RELATIVE_START_TIME = SystemTime::now()
605-
.duration_since(UNIX_EPOCH)
606-
.unwrap()
607-
.as_secs_f64();
560+
KILL_TIMER_RELATIVE_START_TIME = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs_f64();
608561
}
609562
}
610563
fn is_alive() -> bool {
611564
unsafe {
612565
if KILL_TIMER_RELATIVE_START_TIME != 0.0 {
613566
//initialised
614-
if SystemTime::now()
615-
.duration_since(UNIX_EPOCH)
616-
.unwrap()
617-
.as_secs_f64()
618-
- KILL_TIMER_RELATIVE_START_TIME
619-
>= KILL_TIMEOUT
620-
{
567+
if SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs_f64() - KILL_TIMER_RELATIVE_START_TIME >= KILL_TIMEOUT {
621568
return false;
622569
}
623570
}

src/main.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,7 @@ fn main() {
3939
log::debug!("registering SIGINT handler...");
4040
ctrlc2::set_handler(move || {
4141
if server::kill() {
42-
log::warn!(
43-
"shutdown requested; please allow a moment for any in-progress tests to stop"
44-
);
42+
log::warn!("shutdown requested; please allow a moment for any in-progress tests to stop");
4543
} else {
4644
log::warn!("forcing shutdown immediately");
4745
std::process::exit(3);
@@ -60,9 +58,7 @@ fn main() {
6058
log::debug!("registering SIGINT handler...");
6159
ctrlc2::set_handler(move || {
6260
if client::kill() {
63-
log::warn!(
64-
"shutdown requested; please allow a moment for any in-progress tests to stop"
65-
);
61+
log::warn!("shutdown requested; please allow a moment for any in-progress tests to stop");
6662
} else {
6763
log::warn!("forcing shutdown immediately");
6864
std::process::exit(3);

src/protocol/communication.rs

Lines changed: 8 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -51,21 +51,12 @@ pub fn send(stream: &mut TcpStream, message: &serde_json::Value) -> BoxResult<()
5151
}
5252

5353
/// receives the length-count of a pending message over a client-server communications stream
54-
fn receive_length(
55-
stream: &mut TcpStream,
56-
alive_check: fn() -> bool,
57-
results_handler: &mut dyn FnMut() -> BoxResult<()>,
58-
) -> BoxResult<u16> {
54+
fn receive_length(stream: &mut TcpStream, alive_check: fn() -> bool, results_handler: &mut dyn FnMut() -> BoxResult<()>) -> BoxResult<u16> {
5955
let mut cloned_stream = stream.try_clone()?;
6056

6157
let mio_token = Token(0);
6258
let poll = Poll::new()?;
63-
poll.register(
64-
&cloned_stream,
65-
mio_token,
66-
Ready::readable(),
67-
PollOpt::edge(),
68-
)?;
59+
poll.register(&cloned_stream, mio_token, Ready::readable(), PollOpt::edge())?;
6960
let mut events = Events::with_capacity(1); //only interacting with one stream
7061

7162
let mut length_bytes_read = 0;
@@ -93,30 +84,22 @@ fn receive_length(
9384
return Err(Box::new(simple_error::simple_error!("connection lost")));
9485
} else {
9586
//shutting down; a disconnect is expected
96-
return Err(Box::new(simple_error::simple_error!(
97-
"local shutdown requested"
98-
)));
87+
return Err(Box::new(simple_error::simple_error!("local shutdown requested")));
9988
}
10089
}
10190

10291
length_bytes_read += size;
10392
if length_bytes_read == 2 {
10493
let length = u16::from_be_bytes(length_spec);
105-
log::debug!(
106-
"received length-spec of {} from {}",
107-
length,
108-
stream.peer_addr()?
109-
);
94+
log::debug!("received length-spec of {} from {}", length, stream.peer_addr()?);
11095
return Ok(length);
11196
} else {
11297
log::debug!("received partial length-spec from {}", stream.peer_addr()?);
11398
}
11499
}
115100
}
116101
}
117-
Err(Box::new(simple_error::simple_error!(
118-
"system shutting down"
119-
)))
102+
Err(Box::new(simple_error::simple_error!("system shutting down")))
120103
}
121104
/// receives the data-value of a pending message over a client-server communications stream
122105
fn receive_payload(
@@ -129,12 +112,7 @@ fn receive_payload(
129112

130113
let mio_token = Token(0);
131114
let poll = Poll::new()?;
132-
poll.register(
133-
&cloned_stream,
134-
mio_token,
135-
Ready::readable(),
136-
PollOpt::edge(),
137-
)?;
115+
poll.register(&cloned_stream, mio_token, Ready::readable(), PollOpt::edge())?;
138116
let mut events = Events::with_capacity(1); //only interacting with one stream
139117

140118
let mut bytes_read = 0;
@@ -162,9 +140,7 @@ fn receive_payload(
162140
return Err(Box::new(simple_error::simple_error!("connection lost")));
163141
} else {
164142
// shutting down; a disconnect is expected
165-
return Err(Box::new(simple_error::simple_error!(
166-
"local shutdown requested"
167-
)));
143+
return Err(Box::new(simple_error::simple_error!("local shutdown requested")));
168144
}
169145
}
170146

@@ -185,9 +161,7 @@ fn receive_payload(
185161
}
186162
}
187163
}
188-
Err(Box::new(simple_error::simple_error!(
189-
"system shutting down"
190-
)))
164+
Err(Box::new(simple_error::simple_error!("system shutting down")))
191165
}
192166
/// handles the full process of retrieving a message from a client-server communications stream
193167
pub fn receive(

0 commit comments

Comments
 (0)