1+ use std:: collections:: VecDeque ;
12use std:: path:: Path ;
23
34use async_trait:: async_trait;
@@ -20,7 +21,7 @@ impl DumbBlockScanner {
2021 }
2122 }
2223
23- /// Update blocks returned by `parse `
24+ /// Update blocks returned used the streamer constructed by `scan `
2425 pub async fn update_blocks ( & self , new_blocks : Vec < ScannedBlock > ) {
2526 let mut blocks = self . blocks . write ( ) . await ;
2627 * blocks = new_blocks;
@@ -35,8 +36,88 @@ impl BlockScanner for DumbBlockScanner {
3536 _from_immutable : Option < ImmutableFileNumber > ,
3637 _until_immutable : ImmutableFileNumber ,
3738 ) -> StdResult < Box < dyn BlockStreamer > > {
38- // let iter = self.blocks.read().await.clone().into_iter();
39- // Ok(Box::new(iter))
40- todo ! ( )
39+ let blocks = self . blocks . read ( ) . await . clone ( ) ;
40+ Ok ( Box :: new ( DumbBlockStreamer :: new ( vec ! [ blocks] ) ) )
41+ }
42+ }
43+
44+ /// Dumb block streamer
45+ pub struct DumbBlockStreamer {
46+ blocks : VecDeque < Vec < ScannedBlock > > ,
47+ }
48+
49+ impl DumbBlockStreamer {
50+ /// Factory - the resulting streamer can be polled one time for each list of blocks given
51+ pub fn new ( blocks : Vec < Vec < ScannedBlock > > ) -> Self {
52+ Self {
53+ blocks : VecDeque :: from ( blocks) ,
54+ }
55+ }
56+ }
57+
58+ #[ async_trait]
59+ impl BlockStreamer for DumbBlockStreamer {
60+ async fn poll_next ( & mut self ) -> StdResult < Option < Vec < ScannedBlock > > > {
61+ Ok ( self . blocks . pop_front ( ) )
62+ }
63+ }
64+
65+ #[ cfg( test) ]
66+ mod tests {
67+ use super :: * ;
68+
69+ #[ tokio:: test]
70+ async fn polling_without_set_of_block_return_none ( ) {
71+ let mut streamer = DumbBlockStreamer :: new ( vec ! [ ] ) ;
72+ let blocks = streamer. poll_next ( ) . await . unwrap ( ) ;
73+ assert_eq ! ( blocks, None ) ;
74+ }
75+
76+ #[ tokio:: test]
77+ async fn polling_with_one_set_of_block_returns_some_once ( ) {
78+ let expected_blocks = vec ! [ ScannedBlock :: new( "hash-1" , 1 , 10 , 20 , Vec :: <& str >:: new( ) ) ] ;
79+ let mut streamer = DumbBlockStreamer :: new ( vec ! [ expected_blocks. clone( ) ] ) ;
80+
81+ let blocks = streamer. poll_next ( ) . await . unwrap ( ) ;
82+ assert_eq ! ( blocks, Some ( expected_blocks) ) ;
83+
84+ let blocks = streamer. poll_next ( ) . await . unwrap ( ) ;
85+ assert_eq ! ( blocks, None ) ;
86+ }
87+
88+ #[ tokio:: test]
89+ async fn polling_with_multiple_sets_of_blocks_returns_some_once ( ) {
90+ let expected_blocks = vec ! [
91+ vec![ ScannedBlock :: new( "hash-1" , 1 , 10 , 20 , Vec :: <& str >:: new( ) ) ] ,
92+ vec![
93+ ScannedBlock :: new( "hash-2" , 2 , 11 , 21 , Vec :: <& str >:: new( ) ) ,
94+ ScannedBlock :: new( "hash-3" , 3 , 12 , 22 , Vec :: <& str >:: new( ) ) ,
95+ ] ,
96+ vec![ ScannedBlock :: new( "hash-4" , 4 , 13 , 23 , Vec :: <& str >:: new( ) ) ] ,
97+ ] ;
98+ let mut streamer = DumbBlockStreamer :: new ( expected_blocks. clone ( ) ) ;
99+
100+ let blocks = streamer. poll_next ( ) . await . unwrap ( ) ;
101+ assert_eq ! ( blocks, Some ( expected_blocks[ 0 ] . clone( ) ) ) ;
102+
103+ let blocks = streamer. poll_next ( ) . await . unwrap ( ) ;
104+ assert_eq ! ( blocks, Some ( expected_blocks[ 1 ] . clone( ) ) ) ;
105+
106+ let blocks = streamer. poll_next ( ) . await . unwrap ( ) ;
107+ assert_eq ! ( blocks, Some ( expected_blocks[ 2 ] . clone( ) ) ) ;
108+
109+ let blocks = streamer. poll_next ( ) . await . unwrap ( ) ;
110+ assert_eq ! ( blocks, None ) ;
111+ }
112+
113+ #[ tokio:: test]
114+ async fn dumb_scanned_construct_a_streamer_based_on_its_stored_blocks ( ) {
115+ let expected_blocks = vec ! [ ScannedBlock :: new( "hash-1" , 1 , 10 , 20 , Vec :: <& str >:: new( ) ) ] ;
116+
117+ let scanner = DumbBlockScanner :: new ( expected_blocks. clone ( ) ) ;
118+ let mut streamer = scanner. scan ( Path :: new ( "dummy" ) , None , 1 ) . await . unwrap ( ) ;
119+
120+ let blocks = streamer. poll_all ( ) . await . unwrap ( ) ;
121+ assert_eq ! ( blocks, expected_blocks) ;
41122 }
42123}
0 commit comments