11use std:: {
22 io:: { self , ErrorKind , SeekFrom } ,
33 ops:: DerefMut ,
4+ pin:: Pin ,
45 sync:: { Arc , Mutex } ,
5- task:: Poll ,
6+ task:: { Context , Poll } ,
67} ;
78
8- use n0_future:: FutureExt ;
9+ use n0_future:: StreamExt ;
910
1011use crate :: api:: {
1112 blobs:: { Blobs , ReaderOptions } ,
12- RequestResult ,
13+ proto :: ExportRangesItem ,
1314} ;
1415
16+ #[ derive( Debug ) ]
1517pub struct Reader {
1618 blobs : Blobs ,
1719 options : ReaderOptions ,
1820 state : Arc < Mutex < ReaderState > > ,
1921}
2022
21- #[ derive( Default ) ]
23+ #[ derive( Default , derive_more :: Debug ) ]
2224enum ReaderState {
2325 Idle {
2426 position : u64 ,
@@ -28,7 +30,8 @@ enum ReaderState {
2830 } ,
2931 Reading {
3032 position : u64 ,
31- op : n0_future:: boxed:: BoxFuture < RequestResult < Vec < u8 > > > ,
33+ #[ debug( skip) ]
34+ op : n0_future:: boxed:: BoxStream < ExportRangesItem > ,
3235 } ,
3336 #[ default]
3437 Poisoned ,
@@ -46,79 +49,112 @@ impl Reader {
4649
4750impl tokio:: io:: AsyncRead for Reader {
4851 fn poll_read (
49- self : std :: pin :: Pin < & mut Self > ,
50- cx : & mut std :: task :: Context < ' _ > ,
52+ self : Pin < & mut Self > ,
53+ cx : & mut Context < ' _ > ,
5154 buf : & mut tokio:: io:: ReadBuf < ' _ > ,
52- ) -> std :: task :: Poll < std :: io:: Result < ( ) > > {
55+ ) -> Poll < io:: Result < ( ) > > {
5356 let this = self . get_mut ( ) ;
54- match std:: mem:: take ( this. state . lock ( ) . unwrap ( ) . deref_mut ( ) ) {
55- ReaderState :: Idle { position } => {
56- // todo: read until next page boundary instead of fixed size
57- let len = buf. remaining ( ) . min ( 1024 * 16 ) ;
58- let end = position. checked_add ( len as u64 ) . ok_or_else ( || {
59- io:: Error :: new ( ErrorKind :: InvalidInput , "Position overflow when reading" )
60- } ) ?;
61- let hash = this. options . hash ;
62- let blobs = this. blobs . clone ( ) ;
63- let ranges = position..end;
64- let op = async move { blobs. export_ranges ( hash, ranges) . concatenate ( ) . await } ;
65- * this. state . lock ( ) . unwrap ( ) = ReaderState :: Reading {
66- position,
67- op : Box :: pin ( op) ,
68- } ;
69- }
70- ReaderState :: Reading { position, mut op } => {
71- match op. poll ( cx) {
72- Poll :: Ready ( Ok ( data) ) => {
73- let len = data. len ( ) ;
74- if len > buf. remaining ( ) {
75- return Poll :: Ready ( Err ( io:: Error :: new (
76- ErrorKind :: UnexpectedEof ,
77- "Read more data than buffer can hold" ,
57+ let mut position1 = None ;
58+ loop {
59+ let mut guard = this. state . lock ( ) . unwrap ( ) ;
60+ match std:: mem:: take ( guard. deref_mut ( ) ) {
61+ ReaderState :: Idle { position } => {
62+ // todo: read until next page boundary instead of fixed size
63+ let len = buf. remaining ( ) as u64 ;
64+ let end = position. checked_add ( len) . ok_or_else ( || {
65+ io:: Error :: new ( ErrorKind :: InvalidInput , "Position overflow when reading" )
66+ } ) ?;
67+ // start the export op for the entire size of the buffer, and convert to a stream
68+ let stream = this
69+ . blobs
70+ . export_ranges ( this. options . hash , position..end)
71+ . stream ( ) ;
72+ position1 = Some ( position) ;
73+ * guard = ReaderState :: Reading {
74+ position,
75+ op : Box :: pin ( stream) ,
76+ } ;
77+ }
78+ ReaderState :: Reading { position, mut op } => {
79+ let position1 = position1. get_or_insert ( position) ;
80+ match op. poll_next ( cx) {
81+ Poll :: Ready ( Some ( ExportRangesItem :: Data ( data) ) ) => {
82+ if data. offset != * position1 {
83+ break Poll :: Ready ( Err ( io:: Error :: other (
84+ "Data offset does not match expected position" ,
85+ ) ) ) ;
86+ }
87+ buf. put_slice ( & data. data ) ;
88+ // update just local position1, not the position in the state.
89+ * position1 =
90+ position1
91+ . checked_add ( data. data . len ( ) as u64 )
92+ . ok_or_else ( || {
93+ io:: Error :: new ( ErrorKind :: InvalidInput , "Position overflow" )
94+ } ) ?;
95+ * guard = ReaderState :: Reading { position, op } ;
96+ }
97+ Poll :: Ready ( Some ( ExportRangesItem :: Error ( err) ) ) => {
98+ * guard = ReaderState :: Idle { position } ;
99+ break Poll :: Ready ( Err ( io:: Error :: other (
100+ format ! ( "Error reading data: {err}" ) ,
78101 ) ) ) ;
79102 }
80- buf. put_slice ( & data) ;
81- let position = position + len as u64 ;
82- * this. state . lock ( ) . unwrap ( ) = ReaderState :: Idle { position } ;
83- return Poll :: Ready ( Ok ( ( ) ) ) ;
84- }
85- Poll :: Ready ( Err ( e) ) => {
86- * this. state . lock ( ) . unwrap ( ) = ReaderState :: Idle { position } ;
87- let e = io:: Error :: new ( ErrorKind :: Other , e. to_string ( ) ) ;
88- return Poll :: Ready ( Err ( e) ) ;
89- }
90- Poll :: Pending => {
91- // Put back the state
92- * this. state . lock ( ) . unwrap ( ) = ReaderState :: Reading {
93- position,
94- op : Box :: pin ( op) ,
95- } ;
96- return Poll :: Pending ;
103+ Poll :: Ready ( Some ( ExportRangesItem :: Size ( _size) ) ) => {
104+ // put back the state and continue reading
105+ * guard = ReaderState :: Reading { position, op } ;
106+ }
107+ Poll :: Ready ( None ) => {
108+ // done with the stream, go back in idle.
109+ * guard = ReaderState :: Idle {
110+ position : * position1,
111+ } ;
112+ break Poll :: Ready ( Ok ( ( ) ) ) ;
113+ }
114+ Poll :: Pending => {
115+ break if position != * position1 {
116+ // we read some data so we need to abort the op.
117+ //
118+ // we can't be sure we won't be called with the same buf size next time.
119+ * guard = ReaderState :: Idle {
120+ position : * position1,
121+ } ;
122+ Poll :: Ready ( Ok ( ( ) ) )
123+ } else {
124+ // nothing was read yet, we remain in the reading state
125+ //
126+ // we make an assumption here that the next call will be with the same buf size.
127+ * guard = ReaderState :: Reading {
128+ position : * position1,
129+ op,
130+ } ;
131+ Poll :: Pending
132+ } ;
133+ }
97134 }
98135 }
99- }
100- state @ ReaderState :: Seeking { .. } => {
101- * this. state . lock ( ) . unwrap ( ) = state;
102- return Poll :: Ready ( Err ( io:: Error :: new (
103- ErrorKind :: Other ,
104- "Can't read while seeking" ,
105- ) ) ) ;
106- }
107- ReaderState :: Poisoned => {
108- return Poll :: Ready ( Err ( io:: Error :: other ( "Reader is poisoned" ) ) ) ;
109- }
110- } ;
111- todo ! ( )
136+ state @ ReaderState :: Seeking { .. } => {
137+ * this. state . lock ( ) . unwrap ( ) = state;
138+ break Poll :: Ready ( Err ( io:: Error :: other (
139+ "Can't read while seeking" ,
140+ ) ) ) ;
141+ }
142+ ReaderState :: Poisoned => {
143+ break Poll :: Ready ( Err ( io:: Error :: other ( "Reader is poisoned" ) ) ) ;
144+ }
145+ } ;
146+ }
112147 }
113148}
114149
115150impl tokio:: io:: AsyncSeek for Reader {
116151 fn start_seek (
117152 self : std:: pin:: Pin < & mut Self > ,
118153 seek_from : tokio:: io:: SeekFrom ,
119- ) -> std :: io:: Result < ( ) > {
154+ ) -> io:: Result < ( ) > {
120155 let this = self . get_mut ( ) ;
121- match std:: mem:: take ( this. state . lock ( ) . unwrap ( ) . deref_mut ( ) ) {
156+ let mut guard = this. state . lock ( ) . unwrap ( ) ;
157+ match std:: mem:: take ( guard. deref_mut ( ) ) {
122158 ReaderState :: Idle { position } => {
123159 let position1 = match seek_from {
124160 SeekFrom :: Start ( pos) => pos,
@@ -138,7 +174,7 @@ impl tokio::io::AsyncSeek for Reader {
138174 ) ) ?;
139175 }
140176 } ;
141- * this . state . lock ( ) . unwrap ( ) = ReaderState :: Seeking {
177+ * guard = ReaderState :: Seeking {
142178 position : position1,
143179 } ;
144180 Ok ( ( ) )
@@ -149,22 +185,144 @@ impl tokio::io::AsyncSeek for Reader {
149185 }
150186 }
151187
152- fn poll_complete (
153- self : std:: pin:: Pin < & mut Self > ,
154- _cx : & mut std:: task:: Context < ' _ > ,
155- ) -> std:: task:: Poll < std:: io:: Result < u64 > > {
188+ fn poll_complete ( self : Pin < & mut Self > , _cx : & mut Context < ' _ > ) -> Poll < io:: Result < u64 > > {
156189 let this = self . get_mut ( ) ;
157- Poll :: Ready (
158- match std:: mem:: take ( this. state . lock ( ) . unwrap ( ) . deref_mut ( ) ) {
159- ReaderState :: Seeking { position } => {
160- // we only put the state back if we are in the right state
161- * this. state . lock ( ) . unwrap ( ) = ReaderState :: Idle { position } ;
162- Ok ( position)
163- }
164- ReaderState :: Idle { .. } => Err ( io:: Error :: other ( "No seek operation in progress" ) ) ,
165- ReaderState :: Reading { .. } => Err ( io:: Error :: other ( "Can't seek while reading" ) ) ,
166- ReaderState :: Poisoned => Err ( io:: Error :: other ( "Reader is poisoned" ) ) ,
190+ let mut guard = this. state . lock ( ) . unwrap ( ) ;
191+ Poll :: Ready ( match std:: mem:: take ( guard. deref_mut ( ) ) {
192+ ReaderState :: Seeking { position } => {
193+ * guard = ReaderState :: Idle { position } ;
194+ Ok ( position)
195+ }
196+ ReaderState :: Idle { position } => {
197+ // seek calls poll_complete just in case, to finish a pending seek operation
198+ // before the next seek operation. So it is poll_complete/start_seek/poll_complete
199+ * guard = ReaderState :: Idle { position } ;
200+ Ok ( position)
201+ }
202+ ReaderState :: Reading { .. } => Err ( io:: Error :: other ( "Can't seek while reading" ) ) ,
203+ ReaderState :: Poisoned => Err ( io:: Error :: other ( "Reader is poisoned" ) ) ,
204+ } )
205+ }
206+ }
207+
208+ #[ cfg( test) ]
209+ mod tests {
210+ use bao_tree:: ChunkRanges ;
211+ use testresult:: TestResult ;
212+ use tokio:: io:: { AsyncReadExt , AsyncSeekExt } ;
213+
214+ use super :: * ;
215+ use crate :: {
216+ store:: {
217+ fs:: {
218+ tests:: { create_n0_bao, test_data, INTERESTING_SIZES } ,
219+ FsStore ,
167220 } ,
168- )
221+ mem:: MemStore ,
222+ } ,
223+ util:: ChunkRangesExt ,
224+ } ;
225+
226+ async fn reader_smoke ( blobs : & Blobs ) -> TestResult < ( ) > {
227+ for size in INTERESTING_SIZES {
228+ let data = test_data ( size) ;
229+ let tag = blobs. add_bytes ( data. clone ( ) ) . await ?;
230+ // read all
231+ {
232+ let mut reader = blobs. reader ( tag. hash ) ;
233+ let mut buf = Vec :: new ( ) ;
234+ reader. read_to_end ( & mut buf) . await ?;
235+ assert_eq ! ( buf, data) ;
236+ let pos = reader. stream_position ( ) . await ?;
237+ assert_eq ! ( pos, data. len( ) as u64 ) ;
238+ }
239+ // seek to mid and read all
240+ {
241+ let mut reader = blobs. reader ( tag. hash ) ;
242+ let mid = size / 2 ;
243+ reader. seek ( SeekFrom :: Start ( mid as u64 ) ) . await ?;
244+ let mut buf = Vec :: new ( ) ;
245+ reader. read_to_end ( & mut buf) . await ?;
246+ assert_eq ! ( buf, data[ mid..] . to_vec( ) ) ;
247+ let pos = reader. stream_position ( ) . await ?;
248+ assert_eq ! ( pos, data. len( ) as u64 ) ;
249+ }
250+ }
251+ Ok ( ( ) )
252+ }
253+
254+ async fn reader_partial ( blobs : & Blobs ) -> TestResult < ( ) > {
255+ for size in INTERESTING_SIZES {
256+ let data = test_data ( size) ;
257+ let ranges = ChunkRanges :: chunk ( 0 ) ;
258+ let ( hash, bao) = create_n0_bao ( & data, & ranges) ?;
259+ println ! ( "importing {} bytes" , bao. len( ) ) ;
260+ blobs. import_bao_bytes ( hash, ranges. clone ( ) , bao) . await ?;
261+ // read the first chunk or the entire blob, whatever is smaller
262+ // this should work!
263+ {
264+ let mut reader = blobs. reader ( hash) ;
265+ let valid = size. min ( 1024 ) ;
266+ let mut buf = vec ! [ 0u8 ; valid] ;
267+ reader. read_exact ( & mut buf) . await ?;
268+ assert_eq ! ( buf, data[ ..valid] ) ;
269+ let pos = reader. stream_position ( ) . await ?;
270+ assert_eq ! ( pos, valid as u64 ) ;
271+ }
272+ if size > 1024 {
273+ // read the part we don't have - should immediately return an error
274+ {
275+ let mut reader = blobs. reader ( hash) ;
276+ let mut rest = vec ! [ 0u8 ; size - 1024 ] ;
277+ reader. seek ( SeekFrom :: Start ( 1024 ) ) . await ?;
278+ let res = reader. read_exact ( & mut rest) . await ;
279+ assert ! ( res. is_err( ) ) ;
280+ }
281+ // read crossing the end of the blob - should return an error despite
282+ // the first bytes being valid.
283+ // A read that fails should not update the stream position.
284+ {
285+ let mut reader = blobs. reader ( hash) ;
286+ let mut buf = vec ! [ 0u8 ; size] ;
287+ let res = reader. read ( & mut buf) . await ;
288+ assert ! ( res. is_err( ) ) ;
289+ let pos = reader. stream_position ( ) . await ?;
290+ assert_eq ! ( pos, 0 ) ;
291+ }
292+ }
293+ }
294+ Ok ( ( ) )
295+ }
296+
297+ #[ tokio:: test]
298+ async fn reader_partial_fs ( ) -> TestResult < ( ) > {
299+ let testdir = tempfile:: tempdir ( ) ?;
300+ let store = FsStore :: load ( testdir. path ( ) . to_owned ( ) ) . await ?;
301+ // reader_smoke_raw(store.blobs()).await?;
302+ reader_partial ( store. blobs ( ) ) . await ?;
303+ Ok ( ( ) )
304+ }
305+
306+ #[ tokio:: test]
307+ async fn reader_partial_memory ( ) -> TestResult < ( ) > {
308+ let store = MemStore :: new ( ) ;
309+ reader_partial ( store. blobs ( ) ) . await ?;
310+ Ok ( ( ) )
311+ }
312+
313+ #[ tokio:: test]
314+ async fn reader_smoke_fs ( ) -> TestResult < ( ) > {
315+ let testdir = tempfile:: tempdir ( ) ?;
316+ let store = FsStore :: load ( testdir. path ( ) . to_owned ( ) ) . await ?;
317+ // reader_smoke_raw(store.blobs()).await?;
318+ reader_smoke ( store. blobs ( ) ) . await ?;
319+ Ok ( ( ) )
320+ }
321+
322+ #[ tokio:: test]
323+ async fn reader_smoke_memory ( ) -> TestResult < ( ) > {
324+ let store = MemStore :: new ( ) ;
325+ reader_smoke ( store. blobs ( ) ) . await ?;
326+ Ok ( ( ) )
169327 }
170328}
0 commit comments