Skip to content

Commit 1b87eb4

Browse files
committed
24: Builder Api
Introduce a builder api that better supports creating pending and activated handles, and creating streams from those handles. Config now supports defining which handle type (dead, live, file). Additional support for configuring handles with linktype and rfmon.
1 parent 0de85e2 commit 1b87eb4

File tree

6 files changed

+405
-236
lines changed

6 files changed

+405
-236
lines changed

README.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,25 +24,25 @@ First, add this to your `Cargo.toml`:
2424

2525
```toml
2626
[dependencies]
27-
pcap-async = "0.3"
27+
pcap-async = "0.5"
2828
```
2929

3030
Next, add this to your crate:
3131

3232
```rust
3333
use futures::StreamExt;
3434
use pcap_async::{Config, Handle, PacketStream};
35+
use std::convert::TryFrom;
3536

3637
fn main() {
3738
smol::run(async move {
38-
let handle = Handle::lookup().expect("No handle created");
39-
let mut provider = PacketStream::new(Config::default(), handle)
40-
.expect("Could not create provider")
41-
.fuse();
39+
let cfg = Config::default();
40+
let mut provider = PacketStream::try_from(cfg)
41+
.expect("Could not create provider");
4242
while let Some(packets) = provider.next().await {
4343

4444
}
45-
handle.interrupt();
45+
provider.interrupt();
4646
})
4747
}
4848
```

src/bridge_stream.rs

Lines changed: 83 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,19 @@ use crate::errors::Error;
1919
use crate::handle::Handle;
2020
use crate::packet::Packet;
2121
use crate::pcap_util;
22-
use crate::stream::StreamItem;
22+
use crate::stream::{Interruptable, StreamItem};
2323

2424
#[pin_project]
25-
struct CallbackFuture<E, T>
25+
struct CallbackFuture<T>
2626
where
27-
E: Sync + Send,
28-
T: Stream<Item = StreamItem<E>> + Sized + Unpin,
27+
T: Stream<Item = StreamItem> + Sized + Unpin,
2928
{
3029
idx: usize,
3130
stream: Option<T>,
3231
}
3332

34-
impl<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> Future
35-
for CallbackFuture<E, T>
36-
{
37-
type Output = (usize, Option<(T, StreamItem<E>)>);
33+
impl<T: Stream<Item = StreamItem> + Sized + Unpin> Future for CallbackFuture<T> {
34+
type Output = (usize, Option<(T, StreamItem)>);
3835

3936
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
4037
let this = self.project();
@@ -60,17 +57,22 @@ impl<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> Future
6057
}
6158
}
6259

63-
struct BridgeStreamState<E, T>
60+
struct BridgeStreamState<T>
6461
where
65-
E: Sync + Send,
66-
T: Stream<Item = StreamItem<E>> + Sized + Unpin,
62+
T: Interruptable + Sized + Unpin,
6763
{
6864
stream: Option<T>,
6965
current: Vec<Vec<Packet>>,
7066
complete: bool,
7167
}
7268

73-
impl<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> BridgeStreamState<E, T> {
69+
impl<T: Interruptable + Sized + Unpin> BridgeStreamState<T> {
70+
fn interrupt(&self) {
71+
if let Some(st) = &self.stream {
72+
st.interrupt();
73+
}
74+
}
75+
7476
fn is_complete(&self) -> bool {
7577
self.complete && self.current.is_empty()
7678
}
@@ -100,22 +102,22 @@ impl<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> BridgeStre
100102
// `max_buffer_time` will check the spread of packets, and if it to large it will sort what it has and pass it on.
101103

102104
#[pin_project]
103-
pub struct BridgeStream<E: Sync + Send, T>
105+
pub struct BridgeStream<T>
104106
where
105-
T: Stream<Item = StreamItem<E>> + Sized + Unpin,
107+
T: Interruptable + Sized + Unpin,
106108
{
107-
stream_states: VecDeque<BridgeStreamState<E, T>>,
109+
stream_states: VecDeque<BridgeStreamState<T>>,
108110
max_buffer_time: Duration,
109111
min_states_needed: usize,
110-
poll_queue: FuturesUnordered<CallbackFuture<E, T>>,
112+
poll_queue: FuturesUnordered<CallbackFuture<T>>,
111113
}
112114

113-
impl<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> BridgeStream<E, T> {
115+
impl<T: Interruptable + Sized + Unpin> BridgeStream<T> {
114116
pub fn new(
115117
streams: Vec<T>,
116118
max_buffer_time: Duration,
117119
min_states_needed: usize,
118-
) -> Result<BridgeStream<E, T>, Error> {
120+
) -> Result<BridgeStream<T>, Error> {
119121
let poll_queue = FuturesUnordered::new();
120122
let mut stream_states = VecDeque::with_capacity(streams.len());
121123
for (idx, stream) in streams.into_iter().enumerate() {
@@ -139,10 +141,16 @@ impl<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> BridgeStre
139141
poll_queue,
140142
})
141143
}
144+
145+
pub fn interrupt(&self) {
146+
for st in &self.stream_states {
147+
st.interrupt();
148+
}
149+
}
142150
}
143151

144-
fn gather_packets<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin>(
145-
stream_states: &mut VecDeque<BridgeStreamState<E, T>>,
152+
fn gather_packets<T: Interruptable + Sized + Unpin>(
153+
stream_states: &mut VecDeque<BridgeStreamState<T>>,
146154
) -> Vec<Packet> {
147155
let mut result = vec![];
148156
let mut gather_to: Option<SystemTime> = None;
@@ -183,10 +191,11 @@ fn gather_packets<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpi
183191
result
184192
}
185193

186-
impl<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> Stream
187-
for BridgeStream<E, T>
194+
impl<T> Stream for BridgeStream<T>
195+
where
196+
T: Interruptable + Sized + Unpin,
188197
{
189-
type Item = StreamItem<E>;
198+
type Item = StreamItem;
190199

191200
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
192201
let this = self.project();
@@ -195,12 +204,12 @@ impl<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> Stream
195204
this.stream_states.len(),
196205
this.poll_queue.len()
197206
);
198-
let states: &mut VecDeque<BridgeStreamState<E, T>> = this.stream_states;
207+
let states: &mut VecDeque<BridgeStreamState<T>> = this.stream_states;
199208
let min_states_needed: usize = *this.min_states_needed;
200209
let max_buffer_time = this.max_buffer_time;
201210
let mut max_time_spread: Duration = Duration::from_millis(0);
202211
let mut not_pending: usize = 0;
203-
let mut poll_queue: &mut FuturesUnordered<CallbackFuture<E, T>> = this.poll_queue;
212+
let mut poll_queue: &mut FuturesUnordered<CallbackFuture<T>> = this.poll_queue;
204213

205214
loop {
206215
match Pin::new(&mut poll_queue).poll_next(cx) {
@@ -284,6 +293,7 @@ impl<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> Stream
284293

285294
#[cfg(test)]
286295
mod tests {
296+
use std::convert::TryFrom;
287297
use std::io::Cursor;
288298
use std::ops::Range;
289299
use std::path::PathBuf;
@@ -293,9 +303,10 @@ mod tests {
293303
use futures::{Future, Stream};
294304
use rand;
295305

296-
use crate::PacketStream;
306+
use crate::{Interface, PacketStream};
297307

298308
use super::*;
309+
use std::sync::atomic::{AtomicBool, Ordering};
299310

300311
fn make_packet(ts: usize) -> Packet {
301312
Packet {
@@ -316,11 +327,10 @@ mod tests {
316327

317328
info!("Testing against {:?}", pcap_path);
318329

319-
let handle = Handle::file_capture(pcap_path.to_str().expect("No path found"))
320-
.expect("No handle created");
330+
let mut cfg = Config::default();
331+
cfg.with_interface(Interface::File(pcap_path));
321332

322-
let packet_stream =
323-
PacketStream::new(Config::default(), Arc::clone(&handle)).expect("Failed to build");
333+
let packet_stream = PacketStream::try_from(cfg).expect("Failed to build");
324334

325335
let packet_provider = BridgeStream::new(vec![packet_stream], Duration::from_millis(100), 2)
326336
.expect("Failed to build");
@@ -335,8 +345,6 @@ mod tests {
335345
.filter(|p| p.data().len() == p.actual_length() as usize)
336346
.collect();
337347

338-
handle.interrupt();
339-
340348
packets
341349
});
342350

@@ -373,11 +381,10 @@ mod tests {
373381

374382
info!("Testing against {:?}", pcap_path);
375383

376-
let handle = Handle::file_capture(pcap_path.to_str().expect("No path found"))
377-
.expect("No handle created");
384+
let mut cfg = Config::default();
385+
cfg.with_interface(Interface::File(pcap_path));
378386

379-
let packet_stream =
380-
PacketStream::new(Config::default(), Arc::clone(&handle)).expect("Failed to build");
387+
let packet_stream = PacketStream::try_from(cfg).expect("Failed to build");
381388

382389
let packet_provider = BridgeStream::new(vec![packet_stream], Duration::from_millis(100), 2)
383390
.expect("Failed to build");
@@ -396,11 +403,9 @@ mod tests {
396403
.await
397404
.into_iter()
398405
.flatten()
399-
.filter(|p| p.data().len() == p.actual_length() as _)
406+
.filter(|p| p.data().len() == p.actual_length() as usize)
400407
.count();
401408

402-
handle.interrupt();
403-
404409
packets
405410
});
406411

@@ -411,9 +416,8 @@ mod tests {
411416
fn packets_from_lookup_bridge() {
412417
let _ = env_logger::try_init();
413418

414-
let handle = Handle::lookup().expect("No handle created");
415-
let packet_stream =
416-
PacketStream::new(Config::default(), Arc::clone(&handle)).expect("Failed to build");
419+
let cfg = Config::default();
420+
let packet_stream = PacketStream::try_from(cfg).expect("Failed to build");
417421

418422
let stream = BridgeStream::new(vec![packet_stream], Duration::from_millis(100), 2);
419423

@@ -432,9 +436,7 @@ mod tests {
432436
"(not (net 172.16.0.0/16 and port 443)) and (not (host 172.17.76.33 and port 443))"
433437
.to_owned(),
434438
);
435-
let handle = Handle::lookup().expect("No handle created");
436-
let packet_stream =
437-
PacketStream::new(Config::default(), Arc::clone(&handle)).expect("Failed to build");
439+
let packet_stream = PacketStream::try_from(cfg).expect("Failed to build");
438440

439441
let stream = BridgeStream::new(vec![packet_stream], Duration::from_millis(100), 2);
440442

@@ -444,6 +446,33 @@ mod tests {
444446
);
445447
}
446448

449+
#[pin_project]
450+
struct IterStream {
451+
inner: Vec<Packet>,
452+
interrupted: AtomicBool,
453+
}
454+
455+
impl Interruptable for IterStream {
456+
fn interrupt(&self) {
457+
self.interrupted.store(true, Ordering::Relaxed);
458+
}
459+
}
460+
461+
impl Stream for IterStream {
462+
type Item = StreamItem;
463+
464+
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
465+
let mut this = self;
466+
if !this.interrupted.load(Ordering::Relaxed) {
467+
let d = std::mem::replace(&mut this.inner, vec![]);
468+
this.interrupted.store(true, Ordering::Relaxed);
469+
return Poll::Ready(Some(Ok(d)));
470+
} else {
471+
return Poll::Ready(None);
472+
}
473+
}
474+
}
475+
447476
#[test]
448477
fn packets_come_out_time_ordered() {
449478
let mut packets1 = vec![];
@@ -463,18 +492,21 @@ mod tests {
463492
packets2.push(p)
464493
}
465494

466-
let item1: StreamItem<Error> = Ok(packets1.clone());
467-
let item2: StreamItem<Error> = Ok(packets2.clone());
468-
469-
let stream1 = futures::stream::iter(vec![item1]);
470-
let stream2 = futures::stream::iter(vec![item2]);
495+
let stream1 = IterStream {
496+
interrupted: AtomicBool::default(),
497+
inner: packets1.clone(),
498+
};
499+
let stream2 = IterStream {
500+
interrupted: AtomicBool::default(),
501+
inner: packets2.clone(),
502+
};
471503

472504
let result = smol::block_on(async move {
473505
let bridge = BridgeStream::new(vec![stream1, stream2], Duration::from_millis(100), 0);
474506

475507
let result = bridge
476508
.expect("Unable to create BridgeStream")
477-
.collect::<Vec<StreamItem<Error>>>()
509+
.collect::<Vec<StreamItem>>()
478510
.await;
479511
result
480512
.into_iter()

0 commit comments

Comments
 (0)