Skip to content

Commit a80178b

Browse files
committed
refine code
1 parent 7721e0f commit a80178b

File tree

5 files changed

+648
-633
lines changed

5 files changed

+648
-633
lines changed

src/client.rs

Lines changed: 112 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -18,31 +18,34 @@
1818
* along with rperf. If not, see <https://www.gnu.org/licenses/>.
1919
*/
2020

21-
use std::net::{IpAddr, Shutdown, ToSocketAddrs};
22-
use std::sync::atomic::{AtomicBool, Ordering};
23-
use std::sync::mpsc::channel;
24-
use std::sync::{Arc, Mutex};
25-
use std::thread;
26-
use std::time::{Duration, SystemTime, UNIX_EPOCH};
27-
28-
use mio::net::TcpStream;
29-
30-
use crate::args;
31-
use crate::protocol::communication::{receive, send, KEEPALIVE_DURATION};
32-
33-
use crate::protocol::messaging::{
34-
prepare_begin, prepare_download_configuration, prepare_end, prepare_upload_configuration,
21+
use crate::{
22+
args,
23+
protocol::{
24+
communication::{receive, send, KEEPALIVE_DURATION},
25+
messaging::{
26+
prepare_begin, prepare_download_configuration, prepare_end,
27+
prepare_upload_configuration,
28+
},
29+
results::ClientDoneResult,
30+
results::{
31+
IntervalResultBox, IntervalResultKind, TcpTestResults, TestResults, UdpTestResults,
32+
},
33+
},
34+
stream::{tcp, udp, TestStream},
3535
};
36-
37-
use crate::protocol::results::{
38-
IntervalResultBox, IntervalResultKind, TcpTestResults, TestResults, UdpTestResults,
36+
use mio::net::TcpStream;
37+
use std::{
38+
error::Error,
39+
net::{IpAddr, Shutdown, ToSocketAddrs},
40+
sync::{
41+
atomic::{AtomicBool, Ordering},
42+
mpsc::channel,
43+
Arc, Mutex,
44+
},
45+
thread,
46+
time::{Duration, SystemTime, UNIX_EPOCH},
3947
};
4048

41-
use crate::stream::tcp;
42-
use crate::stream::udp;
43-
use crate::stream::TestStream;
44-
45-
use std::error::Error;
4649
type BoxResult<T> = Result<T, Box<dyn Error>>;
4750

4851
/// when false, the system is shutting down
@@ -381,40 +384,40 @@ pub fn execute(args: &args::Args) -> BoxResult<()> {
381384
loop {
382385
let mut test = c_ps.lock().unwrap();
383386
log::debug!("beginning test-interval for stream {}", test.get_idx());
384-
match test.run_interval() {
385-
Some(interval_result) => match interval_result {
386-
Ok(ir) => match c_results_tx.send(ir) {
387+
388+
let interval_result = match test.run_interval() {
389+
Some(interval_result) => interval_result,
390+
None => {
391+
match c_results_tx.send(Box::new(ClientDoneResult {
392+
stream_idx: test.get_idx(),
393+
})) {
387394
Ok(_) => (),
388395
Err(e) => {
389-
log::error!("unable to report interval-result: {}", e);
390-
break;
396+
log::error!("unable to report interval-done-result: {}", e)
391397
}
392-
},
398+
}
399+
break;
400+
}
401+
};
402+
403+
match interval_result {
404+
Ok(ir) => match c_results_tx.send(ir) {
405+
Ok(_) => (),
393406
Err(e) => {
394-
log::error!("unable to process stream: {}", e);
395-
match c_results_tx.send(Box::new(
396-
crate::protocol::results::ClientFailedResult {
397-
stream_idx: test.get_idx(),
398-
},
399-
)) {
400-
Ok(_) => (),
401-
Err(e) => log::error!(
402-
"unable to report interval-failed-result: {}",
403-
e
404-
),
405-
}
407+
log::error!("unable to report interval-result: {}", e);
406408
break;
407409
}
408410
},
409-
None => {
411+
Err(e) => {
412+
log::error!("unable to process stream: {}", e);
410413
match c_results_tx.send(Box::new(
411-
crate::protocol::results::ClientDoneResult {
414+
crate::protocol::results::ClientFailedResult {
412415
stream_idx: test.get_idx(),
413416
},
414417
)) {
415418
Ok(_) => (),
416419
Err(e) => {
417-
log::error!("unable to report interval-done-result: {}", e)
420+
log::error!("unable to report interval-failed-result: {}", e)
418421
}
419422
}
420423
break;
@@ -427,63 +430,77 @@ pub fn execute(args: &args::Args) -> BoxResult<()> {
427430

428431
//watch for events from the server
429432
while is_alive() {
430-
match receive(&mut stream, is_alive, &mut results_handler) {
431-
Ok(payload) => {
432-
match payload.get("kind") {
433-
Some(kind) => {
434-
match kind.as_str().unwrap_or_default() {
435-
"receive" | "send" => { //receive/send-results from the server
436-
if !display_json {
437-
let result = crate::protocol::results::interval_result_from_json(payload.clone())?;
438-
println!("{}", result.to_string(display_bit));
439-
}
440-
let mut tr = test_results.lock().unwrap();
441-
tr.update_from_json(payload)?;
442-
},
443-
"done" | "failed" => match payload.get("stream_idx") { //completion-result from the server
444-
Some(stream_idx) => match stream_idx.as_i64() {
445-
Some(idx64) => {
446-
let mut tr = test_results.lock().unwrap();
447-
match kind.as_str().unwrap() {
448-
"done" => {
449-
log::info!("server reported completion of stream {}", idx64);
450-
},
451-
"failed" => {
452-
log::warn!("server reported failure with stream {}", idx64);
453-
tr.mark_stream_done(&(idx64 as u8), false);
454-
},
455-
_ => (), //not possible
456-
}
457-
tr.mark_stream_done_server(&(idx64 as u8));
458-
if tr.count_in_progress_streams() == 0 && tr.count_in_progress_streams_server() == 0 { //all data gathered from both sides
459-
kill();
460-
}
461-
},
462-
None => log::error!("completion from server did not include a valid stream_idx"),
463-
},
464-
None => log::error!("completion from server did not include stream_idx"),
465-
},
466-
_ => {
467-
log::error!("invalid data from {}: {}", stream.peer_addr()?, serde_json::to_string(&connection_payload)?);
468-
break;
469-
},
433+
let payload = match receive(&mut stream, is_alive, &mut results_handler) {
434+
Ok(payload) => payload,
435+
Err(e) => {
436+
if !complete {
437+
// when complete, this also occurs
438+
return Err(e);
439+
}
440+
break;
441+
}
442+
};
443+
444+
let kind = match payload.get("kind") {
445+
Some(kind) => kind,
446+
None => {
447+
log::error!(
448+
"invalid data from {}: {}",
449+
stream.peer_addr()?,
450+
serde_json::to_string(&connection_payload)?
451+
);
452+
break;
453+
}
454+
};
455+
456+
match kind.as_str().unwrap_or_default() {
457+
"receive" | "send" => {
458+
//receive/send-results from the server
459+
if !display_json {
460+
let result =
461+
crate::protocol::results::interval_result_from_json(payload.clone())?;
462+
println!("{}", result.to_string(display_bit));
463+
}
464+
let mut tr = test_results.lock().unwrap();
465+
tr.update_from_json(payload)?;
466+
}
467+
"done" | "failed" => match payload.get("stream_idx") {
468+
//completion-result from the server
469+
Some(stream_idx) => match stream_idx.as_i64() {
470+
Some(idx64) => {
471+
let mut tr = test_results.lock().unwrap();
472+
match kind.as_str().unwrap() {
473+
"done" => {
474+
log::info!("server reported completion of stream {}", idx64);
475+
}
476+
"failed" => {
477+
log::warn!("server reported failure with stream {}", idx64);
478+
tr.mark_stream_done(&(idx64 as u8), false);
479+
}
480+
_ => (), //not possible
481+
}
482+
tr.mark_stream_done_server(&(idx64 as u8));
483+
if tr.count_in_progress_streams() == 0
484+
&& tr.count_in_progress_streams_server() == 0
485+
{
486+
//all data gathered from both sides
487+
kill();
470488
}
471489
}
472490
None => {
473-
log::error!(
474-
"invalid data from {}: {}",
475-
stream.peer_addr()?,
476-
serde_json::to_string(&connection_payload)?
477-
);
478-
break;
491+
log::error!("completion from server did not include a valid stream_idx")
479492
}
493+
},
494+
None => {
495+
log::error!("completion from server did not include stream_idx")
480496
}
481-
}
482-
Err(e) => {
483-
if !complete {
484-
//when complete, this also occurs
485-
return Err(e);
486-
}
497+
},
498+
_ => {
499+
log::error!(
500+
"invalid data from {}: {}",
501+
stream.peer_addr()?,
502+
serde_json::to_string(&connection_payload)?
503+
);
487504
break;
488505
}
489506
}

src/protocol/communication.rs

Lines changed: 55 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -77,44 +77,39 @@ fn receive_length(
7777
for event in events.iter() {
7878
event.token();
7979
loop {
80-
match cloned_stream.read(&mut length_spec[length_bytes_read..]) {
81-
Ok(size) => {
82-
if size == 0 {
83-
if alive_check() {
84-
return Err(Box::new(simple_error::simple_error!(
85-
"connection lost"
86-
)));
87-
} else {
88-
//shutting down; a disconnect is expected
89-
return Err(Box::new(simple_error::simple_error!(
90-
"local shutdown requested"
91-
)));
92-
}
93-
}
94-
95-
length_bytes_read += size;
96-
if length_bytes_read == 2 {
97-
let length = u16::from_be_bytes(length_spec);
98-
log::debug!(
99-
"received length-spec of {} from {}",
100-
length,
101-
stream.peer_addr()?
102-
);
103-
return Ok(length);
104-
} else {
105-
log::debug!(
106-
"received partial length-spec from {}",
107-
stream.peer_addr()?
108-
);
109-
}
110-
}
80+
let size = match cloned_stream.read(&mut length_spec[length_bytes_read..]) {
81+
Ok(size) => size,
11182
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
11283
//nothing left to process
11384
break;
11485
}
11586
Err(e) => {
11687
return Err(Box::new(e));
11788
}
89+
};
90+
91+
if size == 0 {
92+
if alive_check() {
93+
return Err(Box::new(simple_error::simple_error!("connection lost")));
94+
} else {
95+
//shutting down; a disconnect is expected
96+
return Err(Box::new(simple_error::simple_error!(
97+
"local shutdown requested"
98+
)));
99+
}
100+
}
101+
102+
length_bytes_read += size;
103+
if length_bytes_read == 2 {
104+
let length = u16::from_be_bytes(length_spec);
105+
log::debug!(
106+
"received length-spec of {} from {}",
107+
length,
108+
stream.peer_addr()?
109+
);
110+
return Ok(length);
111+
} else {
112+
log::debug!("received partial length-spec from {}", stream.peer_addr()?);
118113
}
119114
}
120115
}
@@ -151,43 +146,41 @@ fn receive_payload(
151146
for event in events.iter() {
152147
event.token();
153148
loop {
154-
match cloned_stream.read(&mut buffer[bytes_read..]) {
155-
Ok(size) => {
156-
if size == 0 {
157-
if alive_check() {
158-
return Err(Box::new(simple_error::simple_error!(
159-
"connection lost"
160-
)));
161-
} else {
162-
//shutting down; a disconnect is expected
163-
return Err(Box::new(simple_error::simple_error!(
164-
"local shutdown requested"
165-
)));
166-
}
167-
}
168-
169-
bytes_read += size;
170-
if bytes_read == length as usize {
171-
match serde_json::from_slice(&buffer) {
172-
Ok(v) => {
173-
log::debug!("received {:?} from {}", v, stream.peer_addr()?);
174-
return Ok(v);
175-
}
176-
Err(e) => {
177-
return Err(Box::new(e));
178-
}
179-
}
180-
} else {
181-
log::debug!("received partial payload from {}", stream.peer_addr()?);
182-
}
183-
}
149+
let size = match cloned_stream.read(&mut buffer[bytes_read..]) {
150+
Ok(size) => size,
184151
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
185-
//nothing left to process
152+
// nothing left to process
186153
break;
187154
}
188155
Err(e) => {
189156
return Err(Box::new(e));
190157
}
158+
};
159+
160+
if size == 0 {
161+
if alive_check() {
162+
return Err(Box::new(simple_error::simple_error!("connection lost")));
163+
} else {
164+
// shutting down; a disconnect is expected
165+
return Err(Box::new(simple_error::simple_error!(
166+
"local shutdown requested"
167+
)));
168+
}
169+
}
170+
171+
bytes_read += size;
172+
if bytes_read == length as usize {
173+
match serde_json::from_slice(&buffer) {
174+
Ok(v) => {
175+
log::debug!("received {:?} from {}", v, stream.peer_addr()?);
176+
return Ok(v);
177+
}
178+
Err(e) => {
179+
return Err(Box::new(e));
180+
}
181+
}
182+
} else {
183+
log::debug!("received partial payload from {}", stream.peer_addr()?);
191184
}
192185
}
193186
}

0 commit comments

Comments
 (0)