Skip to content

Commit f1642e1

Browse files
authored
Smol (#19)
Use the smol runtime and thiserror.
1 parent 78790e7 commit f1642e1

File tree

11 files changed

+247
-225
lines changed

11 files changed

+247
-225
lines changed

Cargo.toml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,19 @@ exclude = ["resources/*.pcap"]
2020

2121
[dependencies]
2222
byteorder = "1.3"
23-
failure = "0.1"
2423
futures = "0.3"
2524
libc = "0.2"
2625
log = "0.4"
2726
mio = "0.6"
2827
pin-project = "0.4"
2928
pcap-sys = "0.1"
30-
tokio = { version = "0.2.21", features = ["blocking", "io-driver", "rt-threaded", "time"] }
29+
smol = "0.1"
30+
thiserror = "1.0"
3131

3232
[dev-dependencies]
3333
rand = "0.3"
3434
criterion = "0.2"
3535
env_logger = "0.6"
36-
tokio = { version = "0.2", features = ["macros", "rt-core"] }
3736

3837
[lib]
3938
path = "src/lib.rs"

benches/bench_capture.rs

Lines changed: 43 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@ fn bench_stream_from_large_file(b: &mut Bencher) {
1515
info!("Benchmarking against {:?}", pcap_path.clone());
1616

1717
b.iter(|| {
18-
let mut rt = tokio::runtime::Runtime::new().expect("Failed to create runtime");
19-
2018
let clone_path = pcap_path.clone();
2119

2220
let handle = Handle::file_capture(clone_path.to_str().expect("No path found"))
@@ -25,17 +23,22 @@ fn bench_stream_from_large_file(b: &mut Bencher) {
2523
let mut cfg = Config::default();
2624
cfg.with_max_packets_read(5000);
2725

28-
let packet_provider = PacketStream::new(Config::default(), std::sync::Arc::clone(&handle))
29-
.expect("Failed to build");
30-
let packets = rt.block_on(packet_provider.collect::<Vec<_>>());
31-
let packets: Result<Vec<_>, pcap_async::Error> = packets.into_iter().collect();
32-
let packets = packets
33-
.expect("Failed to get packets")
34-
.iter()
35-
.flatten()
36-
.count();
26+
let packets = smol::run(async move {
27+
let packet_provider =
28+
PacketStream::new(Config::default(), std::sync::Arc::clone(&handle))
29+
.expect("Failed to build");
30+
let packets = packet_provider.collect::<Vec<_>>().await;
31+
let packets: Result<Vec<_>, pcap_async::Error> = packets.into_iter().collect();
32+
let packets = packets
33+
.expect("Failed to get packets")
34+
.iter()
35+
.flatten()
36+
.count();
37+
38+
handle.interrupt();
3739

38-
handle.interrupt();
40+
packets
41+
});
3942

4043
assert_eq!(packets, 246137);
4144
});
@@ -63,8 +66,6 @@ fn bench_stream_next_from_large_file_bridge(b: &mut Bencher) {
6366
info!("Benchmarking against {:?}", pcap_path.clone());
6467

6568
b.iter(|| {
66-
let mut rt = tokio::runtime::Runtime::new().expect("Failed to create runtime");
67-
6869
let clone_path = pcap_path.clone();
6970

7071
let handle1 = Handle::file_capture(clone_path.to_str().expect("No path found"))
@@ -81,23 +82,28 @@ fn bench_stream_next_from_large_file_bridge(b: &mut Bencher) {
8182
.map(|h| PacketStream::new(Config::default(), h).unwrap())
8283
.collect();
8384

84-
let packet_provider = BridgeStream::new(Config::default().retry_after().clone(), streams)
85-
.expect("Failed to build");
86-
let fut_packets = async move {
87-
let mut packet_provider = packet_provider.boxed();
88-
let mut packets = vec![];
89-
while let Some(p) = packet_provider.next().await {
90-
let p = p.expect("Could not get packets");
91-
packets.extend(p);
85+
let packets = smol::run(async move {
86+
let packet_provider =
87+
BridgeStream::new(streams, Config::default().buffer_for().clone(), 0)
88+
.expect("Failed to build");
89+
let packets = async move {
90+
let mut packet_provider = packet_provider.boxed();
91+
let mut packets = vec![];
92+
while let Some(p) = packet_provider.next().await {
93+
let p = p.expect("Could not get packets");
94+
packets.extend(p);
95+
}
96+
packets
9297
}
93-
packets
94-
};
95-
let packets = rt.block_on(fut_packets).len();
98+
.await;
99+
100+
handle1.interrupt();
101+
handle2.interrupt();
96102

97-
handle1.interrupt();
98-
handle2.interrupt();
103+
packets
104+
});
99105

100-
assert_eq!(packets, 246137 * 2);
106+
assert_eq!(packets.len(), 246137 * 2);
101107
});
102108
}
103109

@@ -111,8 +117,6 @@ fn bench_stream_next_from_large_file(b: &mut Bencher) {
111117
info!("Benchmarking against {:?}", pcap_path.clone());
112118

113119
b.iter(|| {
114-
let mut rt = tokio::runtime::Runtime::new().expect("Failed to create runtime");
115-
116120
let clone_path = pcap_path.clone();
117121

118122
let handle = Handle::file_capture(clone_path.to_str().expect("No path found"))
@@ -121,22 +125,23 @@ fn bench_stream_next_from_large_file(b: &mut Bencher) {
121125
let mut cfg = Config::default();
122126
cfg.with_max_packets_read(5000);
123127

124-
let packet_provider = PacketStream::new(Config::default(), std::sync::Arc::clone(&handle))
125-
.expect("Failed to build");
126-
let fut_packets = async move {
128+
let packets = smol::run(async move {
129+
let packet_provider =
130+
PacketStream::new(Config::default(), std::sync::Arc::clone(&handle))
131+
.expect("Failed to build");
127132
let mut packet_provider = packet_provider.boxed();
128133
let mut packets = vec![];
129134
while let Some(p) = packet_provider.next().await {
130135
let p = p.expect("Could not get packets");
131136
packets.extend(p);
132137
}
133-
packets
134-
};
135-
let packets = rt.block_on(fut_packets).len();
136138

137-
handle.interrupt();
139+
handle.interrupt();
138140

139-
assert_eq!(packets, 246137);
141+
packets
142+
});
143+
144+
assert_eq!(packets.len(), 246137);
140145
});
141146
}
142147

src/bridge_stream.rs

Lines changed: 61 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,9 @@ use std::task::{Context, Poll};
77
use std::thread::current;
88
use std::time::{Duration, SystemTime};
99

10-
use failure::Fail;
1110
use futures::future::Pending;
1211
use futures::stream::{Stream, StreamExt};
1312
use log::*;
14-
use tokio::time::Delay;
1513

1614
use futures::stream::FuturesUnordered;
1715
use pin_project::pin_project;
@@ -26,14 +24,14 @@ use crate::stream::StreamItem;
2624
#[pin_project]
2725
struct CallbackFuture<E, T>
2826
where
29-
E: Fail + Sync + Send,
27+
E: Sync + Send,
3028
T: Stream<Item = StreamItem<E>> + Sized + Unpin,
3129
{
3230
idx: usize,
3331
stream: Option<T>,
3432
}
3533

36-
impl<E: Fail + Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> Future
34+
impl<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> Future
3735
for CallbackFuture<E, T>
3836
{
3937
type Output = (usize, Option<(T, StreamItem<E>)>);
@@ -64,17 +62,15 @@ impl<E: Fail + Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> Fut
6462

6563
struct BridgeStreamState<E, T>
6664
where
67-
E: Fail + Sync + Send,
65+
E: Sync + Send,
6866
T: Stream<Item = StreamItem<E>> + Sized + Unpin,
6967
{
7068
stream: Option<T>,
7169
current: Vec<Vec<Packet>>,
7270
complete: bool,
7371
}
7472

75-
impl<E: Fail + Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin>
76-
BridgeStreamState<E, T>
77-
{
73+
impl<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> BridgeStreamState<E, T> {
7874
fn is_complete(&self) -> bool {
7975
self.complete && self.current.is_empty()
8076
}
@@ -104,7 +100,7 @@ impl<E: Fail + Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin>
104100
// `max_buffer_time` will check the spread of packets, and if it to large it will sort what it has and pass it on.
105101

106102
#[pin_project]
107-
pub struct BridgeStream<E: Fail + Sync + Send, T>
103+
pub struct BridgeStream<E: Sync + Send, T>
108104
where
109105
T: Stream<Item = StreamItem<E>> + Sized + Unpin,
110106
{
@@ -114,7 +110,7 @@ where
114110
poll_queue: FuturesUnordered<CallbackFuture<E, T>>,
115111
}
116112

117-
impl<E: Fail + Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> BridgeStream<E, T> {
113+
impl<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> BridgeStream<E, T> {
118114
pub fn new(
119115
streams: Vec<T>,
120116
max_buffer_time: Duration,
@@ -145,7 +141,7 @@ impl<E: Fail + Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> Bri
145141
}
146142
}
147143

148-
fn gather_packets<E: Fail + Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin>(
144+
fn gather_packets<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin>(
149145
stream_states: &mut VecDeque<BridgeStreamState<E, T>>,
150146
) -> Vec<Packet> {
151147
let mut result = vec![];
@@ -187,7 +183,7 @@ fn gather_packets<E: Fail + Sync + Send, T: Stream<Item = StreamItem<E>> + Sized
187183
result
188184
}
189185

190-
impl<E: Fail + Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> Stream
186+
impl<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> Stream
191187
for BridgeStream<E, T>
192188
{
193189
type Item = StreamItem<E>;
@@ -310,8 +306,8 @@ mod tests {
310306
}
311307
}
312308

313-
#[tokio::test]
314-
async fn packets_from_file() {
309+
#[test]
310+
fn packets_from_file() {
315311
let _ = env_logger::try_init();
316312

317313
let pcap_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
@@ -329,16 +325,20 @@ mod tests {
329325
let packet_provider = BridgeStream::new(vec![packet_stream], Duration::from_millis(100), 2)
330326
.expect("Failed to build");
331327

332-
let fut_packets = packet_provider.collect::<Vec<_>>();
333-
let packets: Vec<_> = fut_packets
334-
.await
335-
.into_iter()
336-
.flatten()
337-
.flatten()
338-
.filter(|p| p.data().len() == p.actual_length() as usize)
339-
.collect();
328+
let packets = smol::run(async move {
329+
let fut_packets = packet_provider.collect::<Vec<_>>();
330+
let packets: Vec<_> = fut_packets
331+
.await
332+
.into_iter()
333+
.flatten()
334+
.flatten()
335+
.filter(|p| p.data().len() == p.actual_length() as usize)
336+
.collect();
337+
338+
handle.interrupt();
340339

341-
handle.interrupt();
340+
packets
341+
});
342342

343343
assert_eq!(packets.len(), 10);
344344

@@ -362,8 +362,9 @@ mod tests {
362362
);
363363
assert_eq!(actual_length, 54);
364364
}
365-
#[tokio::test]
366-
async fn packets_from_file_next_bridge() {
365+
366+
#[test]
367+
fn packets_from_file_next_bridge() {
367368
let _ = env_logger::try_init();
368369

369370
let pcap_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
@@ -381,23 +382,27 @@ mod tests {
381382
let packet_provider = BridgeStream::new(vec![packet_stream], Duration::from_millis(100), 2)
382383
.expect("Failed to build");
383384

384-
let fut_packets = async move {
385-
let mut packet_provider = packet_provider.boxed();
386-
let mut packets = vec![];
387-
while let Some(p) = packet_provider.next().await {
388-
info!("packets returned {:?}", p);
389-
packets.extend(p);
390-
}
391-
packets
392-
};
393-
let packets = fut_packets
394-
.await
395-
.into_iter()
396-
.flatten()
397-
.filter(|p| p.data().len() == p.actual_length() as _)
398-
.count();
385+
let packets = smol::run(async move {
386+
let fut_packets = async move {
387+
let mut packet_provider = packet_provider.boxed();
388+
let mut packets = vec![];
389+
while let Some(p) = packet_provider.next().await {
390+
info!("packets returned {:?}", p);
391+
packets.extend(p);
392+
}
393+
packets
394+
};
395+
let packets = fut_packets
396+
.await
397+
.into_iter()
398+
.flatten()
399+
.filter(|p| p.data().len() == p.actual_length() as _)
400+
.count();
401+
402+
handle.interrupt();
399403

400-
handle.interrupt();
404+
packets
405+
});
401406

402407
assert_eq!(packets, 10);
403408
}
@@ -438,8 +443,9 @@ mod tests {
438443
format!("Could not build stream {}", stream.err().unwrap())
439444
);
440445
}
441-
#[tokio::test]
442-
async fn packets_come_out_time_ordered() {
446+
447+
#[test]
448+
fn packets_come_out_time_ordered() {
443449
let mut packets1 = vec![];
444450
let mut packets2 = vec![];
445451

@@ -463,17 +469,19 @@ mod tests {
463469
let stream1 = futures::stream::iter(vec![item1]);
464470
let stream2 = futures::stream::iter(vec![item2]);
465471

466-
let bridge = BridgeStream::new(vec![stream1, stream2], Duration::from_millis(100), 0);
472+
let result = smol::run(async move {
473+
let bridge = BridgeStream::new(vec![stream1, stream2], Duration::from_millis(100), 0);
467474

468-
let mut result = bridge
469-
.expect("Unable to create BridgeStream")
470-
.collect::<Vec<StreamItem<Error>>>()
471-
.await;
472-
let result = result
473-
.into_iter()
474-
.map(|r| r.unwrap())
475-
.flatten()
476-
.collect::<Vec<Packet>>();
475+
let result = bridge
476+
.expect("Unable to create BridgeStream")
477+
.collect::<Vec<StreamItem<Error>>>()
478+
.await;
479+
result
480+
.into_iter()
481+
.map(|r| r.unwrap())
482+
.flatten()
483+
.collect::<Vec<Packet>>()
484+
});
477485
info!("Result {:?}", result);
478486

479487
let mut expected = vec![packets1, packets2]

0 commit comments

Comments
 (0)