@@ -13,8 +13,8 @@ use futures::stream::{Stream, StreamExt};
1313use log:: * ;
1414use tokio:: time:: Delay ;
1515
16- use pin_project:: pin_project;
1716use futures:: stream:: FuturesUnordered ;
17+ use pin_project:: pin_project;
1818
1919use crate :: config:: Config ;
2020use crate :: errors:: Error ;
@@ -23,11 +23,9 @@ use crate::packet::Packet;
2323use crate :: pcap_util;
2424use crate :: stream:: StreamItem ;
2525
26-
27-
28-
2926#[ pin_project]
30- struct CallbackFuture < E , T > where
27+ struct CallbackFuture < E , T >
28+ where
3129 E : Fail + Sync + Send ,
3230 T : Stream < Item = StreamItem < E > > + Sized + Unpin ,
3331{
@@ -36,7 +34,8 @@ struct CallbackFuture<E, T> where
3634}
3735
3836impl < E : Fail + Sync + Send , T : Stream < Item = StreamItem < E > > + Sized + Unpin > Future
39- for CallbackFuture < E , T > {
37+ for CallbackFuture < E , T >
38+ {
4039 type Output = ( usize , Option < ( T , StreamItem < E > ) > ) ;
4140
4241 fn poll ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
@@ -49,7 +48,7 @@ impl<E: Fail + Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> Fut
4948 Poll :: Pending => {
5049 std:: mem:: replace ( this. stream , Some ( stream) ) ;
5150 return Poll :: Pending ;
52- } ,
51+ }
5352 Poll :: Ready ( Some ( t) ) => {
5453 return Poll :: Ready ( ( idx, Some ( ( stream, t) ) ) ) ;
5554 }
@@ -89,11 +88,11 @@ impl<E: Fail + Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin>
8988 ( Some ( min) , Some ( max) ) => {
9089 let since = max. timestamp ( ) . duration_since ( * min. timestamp ( ) ) ;
9190 if let Ok ( since) = since {
92- return since
91+ return since;
9392 } else {
9493 Duration :: from_millis ( 0 )
9594 }
96- } ,
95+ }
9796 _ => Duration :: from_millis ( 0 ) ,
9897 }
9998 }
@@ -116,7 +115,11 @@ where
116115}
117116
118117impl < E : Fail + Sync + Send , T : Stream < Item = StreamItem < E > > + Sized + Unpin > BridgeStream < E , T > {
119- pub fn new ( streams : Vec < T > , max_buffer_time : Duration , min_states_needed : usize ) -> Result < BridgeStream < E , T > , Error > {
118+ pub fn new (
119+ streams : Vec < T > ,
120+ max_buffer_time : Duration ,
121+ min_states_needed : usize ,
122+ ) -> Result < BridgeStream < E , T > , Error > {
120123 let mut poll_queue = FuturesUnordered :: new ( ) ;
121124 let mut stream_states = VecDeque :: with_capacity ( streams. len ( ) ) ;
122125 for ( idx, stream) in streams. into_iter ( ) . enumerate ( ) {
@@ -125,7 +128,7 @@ impl<E: Fail + Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> Bri
125128 current : vec ! [ ] ,
126129 complete : false ,
127130 } ;
128- let fut = CallbackFuture {
131+ let fut = CallbackFuture {
129132 idx,
130133 stream : Some ( stream) ,
131134 } ;
@@ -137,7 +140,7 @@ impl<E: Fail + Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> Bri
137140 stream_states : stream_states,
138141 max_buffer_time,
139142 min_states_needed : min_states_needed,
140- poll_queue
143+ poll_queue,
141144 } )
142145 }
143146}
@@ -191,7 +194,11 @@ impl<E: Fail + Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> Str
191194
192195 fn poll_next ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
193196 let this = self . project ( ) ;
194- trace ! ( "Interfaces: {:?} poll_queue {}" , this. stream_states. len( ) , this. poll_queue. len( ) ) ;
197+ trace ! (
198+ "Interfaces: {:?} poll_queue {}" ,
199+ this. stream_states. len( ) ,
200+ this. poll_queue. len( )
201+ ) ;
195202 let states: & mut VecDeque < BridgeStreamState < E , T > > = this. stream_states ;
196203 let min_states_needed: usize = * this. min_states_needed ;
197204 let max_buffer_time = this. max_buffer_time ;
@@ -204,7 +211,7 @@ impl<E: Fail + Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> Str
204211 Poll :: Ready ( Some ( ( _, Some ( ( _, Err ( err) ) ) ) ) ) => {
205212 trace ! ( "got a error, passing upstream" ) ;
206213 return Poll :: Ready ( Some ( Err ( err) ) ) ;
207- } ,
214+ }
208215 Poll :: Ready ( Some ( ( idx, Some ( ( stream, Ok ( item) ) ) ) ) ) => {
209216 //When the future gives us a result we are given a index, that we use to locate an existing State, and re-add the stream.
210217 //For that reason the order must never change!
@@ -216,18 +223,18 @@ impl<E: Fail + Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> Str
216223 state. stream = Some ( stream) ;
217224 state. current . push ( item) ;
218225 }
219- } ,
226+ }
220227 Poll :: Ready ( Some ( ( idx, None ) ) ) => {
221228 if let Some ( state) = states. get_mut ( idx) {
222229 trace ! ( "Interface {} has completed" , idx) ;
223230 state. complete = true ;
224231 continue ;
225232 }
226- } ,
233+ }
227234 Poll :: Pending => {
228235 trace ! ( "Got Pending" ) ;
229236 break ;
230- } ,
237+ }
231238 Poll :: Ready ( None ) => {
232239 trace ! ( "Reached the end." ) ;
233240 break ;
@@ -264,10 +271,7 @@ impl<E: Fail + Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> Str
264271 vec ! [ ]
265272 } ;
266273
267- let completed_count = states
268- . iter ( )
269- . filter ( |s| s. complete )
270- . count ( ) ;
274+ let completed_count = states. iter ( ) . filter ( |s| s. complete ) . count ( ) ;
271275
272276 if res. is_empty ( ) && completed_count == states. len ( ) {
273277 trace ! ( "All ifaces are complete." ) ;
@@ -289,8 +293,8 @@ mod tests {
289293 use std:: path:: PathBuf ;
290294
291295 use byteorder:: { ByteOrder , ReadBytesExt } ;
292- use futures:: { Future , Stream } ;
293296 use futures:: stream;
297+ use futures:: { Future , Stream } ;
294298 use rand;
295299
296300 use crate :: PacketStream ;
@@ -406,7 +410,7 @@ mod tests {
406410 let packet_stream =
407411 PacketStream :: new ( Config :: default ( ) , Arc :: clone ( & handle) ) . expect ( "Failed to build" ) ;
408412
409- let stream = BridgeStream :: new ( vec ! [ packet_stream] , Duration :: from_millis ( 100 ) , 2 ) ;
413+ let stream = BridgeStream :: new ( vec ! [ packet_stream] , Duration :: from_millis ( 100 ) , 2 ) ;
410414
411415 assert ! (
412416 stream. is_ok( ) ,
0 commit comments