@@ -32,7 +32,7 @@ use iroh_io::AsyncSliceReader;
3232use super :: mutable_mem_storage:: { MutableMemStorage , SizeInfo } ;
3333use crate :: {
3434 store:: BaoBatchWriter ,
35- util:: { get_limited_slice, MemOrFile , SparseMemFile } ,
35+ util:: { callback_lock :: CallbackLock , get_limited_slice, FileAndSize , MemOrFile , SparseMemFile } ,
3636 Hash , IROH_BLOCK_SIZE ,
3737} ;
3838
@@ -81,46 +81,63 @@ struct DataPaths {
8181///
8282/// For the memory variant, it does reading in a zero copy way, since storage
8383/// is already a `Bytes`.
84- #[ derive( Default , derive_more:: Debug ) ]
85- pub struct CompleteStorage {
84+ #[ derive( derive_more:: Debug ) ]
85+ #[ debug( bound( T : Debug ) ) ]
86+ pub struct CompleteStorage < T > {
8687 /// data part, which can be in memory or on disk.
8788 #[ debug( "{:?}" , data. as_ref( ) . map_mem( |x| x. len( ) ) ) ]
88- pub data : MemOrFile < Bytes , ( File , u64 ) > ,
89+ pub data : MemOrFile < Bytes , FileAndSize < T > > ,
8990 /// outboard part, which can be in memory or on disk.
9091 #[ debug( "{:?}" , outboard. as_ref( ) . map_mem( |x| x. len( ) ) ) ]
91- pub outboard : MemOrFile < Bytes , ( File , u64 ) > ,
92+ pub outboard : MemOrFile < Bytes , FileAndSize < T > > ,
9293}
9394
94- impl CompleteStorage {
95+ impl < T > Default for CompleteStorage < T > {
96+ fn default ( ) -> Self {
97+ Self {
98+ data : Default :: default ( ) ,
99+ outboard : Default :: default ( ) ,
100+ }
101+ }
102+ }
103+
104+ impl < T > CompleteStorage < T >
105+ where
106+ T : bao_tree:: io:: sync:: ReadAt ,
107+ {
95108 /// Read from the data file at the given offset, until end of file or max bytes.
96109 pub fn read_data_at ( & self , offset : u64 , len : usize ) -> Bytes {
97110 match & self . data {
98111 MemOrFile :: Mem ( mem) => get_limited_slice ( mem, offset, len) ,
99- MemOrFile :: File ( ( file, _size) ) => read_to_end ( file, offset, len) . unwrap ( ) ,
112+ MemOrFile :: File ( FileAndSize { file, size : _ } ) => {
113+ read_to_end ( file, offset, len) . unwrap ( )
114+ }
100115 }
101116 }
102117
103118 /// Read from the outboard file at the given offset, until end of file or max bytes.
104119 pub fn read_outboard_at ( & self , offset : u64 , len : usize ) -> Bytes {
105120 match & self . outboard {
106121 MemOrFile :: Mem ( mem) => get_limited_slice ( mem, offset, len) ,
107- MemOrFile :: File ( ( file, _size) ) => read_to_end ( file, offset, len) . unwrap ( ) ,
122+ MemOrFile :: File ( FileAndSize { file, size : _ } ) => {
123+ read_to_end ( file, offset, len) . unwrap ( )
124+ }
108125 }
109126 }
110127
111128 /// The size of the data file.
112129 pub fn data_size ( & self ) -> u64 {
113130 match & self . data {
114131 MemOrFile :: Mem ( mem) => mem. len ( ) as u64 ,
115- MemOrFile :: File ( ( _file , size) ) => * size,
132+ MemOrFile :: File ( FileAndSize { file : _ , size } ) => * size,
116133 }
117134 }
118135
119136 /// The size of the outboard file.
120137 pub fn outboard_size ( & self ) -> u64 {
121138 match & self . outboard {
122139 MemOrFile :: Mem ( mem) => mem. len ( ) as u64 ,
123- MemOrFile :: File ( ( _file , size) ) => * size,
140+ MemOrFile :: File ( FileAndSize { file : _ , size } ) => * size,
124141 }
125142 }
126143}
@@ -244,7 +261,7 @@ impl FileStorage {
244261
245262/// The storage for a bao file. This can be either in memory or on disk.
246263#[ derive( Debug ) ]
247- pub ( crate ) enum BaoFileStorage {
264+ pub ( crate ) enum BaoFileStorage < T > {
248265 /// The entry is incomplete and in memory.
249266 ///
250267 /// Since it is incomplete, it must be writeable.
@@ -261,16 +278,16 @@ pub(crate) enum BaoFileStorage {
261278 /// (memory or file).
262279 ///
263280 /// Writing to this is a no-op, since it is already complete.
264- Complete ( CompleteStorage ) ,
281+ Complete ( CompleteStorage < T > ) ,
265282}
266283
267- impl Default for BaoFileStorage {
284+ impl < T > Default for BaoFileStorage < T > {
268285 fn default ( ) -> Self {
269286 BaoFileStorage :: Complete ( Default :: default ( ) )
270287 }
271288}
272289
273- impl BaoFileStorage {
290+ impl < T > BaoFileStorage < T > {
274291 /// Take the storage out, leaving an empty storage in its place.
275292 ///
276293 /// Be careful to put something back in its place, or you will lose data.
@@ -310,11 +327,11 @@ impl BaoFileStorage {
310327
311328/// A weak reference to a bao file handle.
312329#[ derive( Debug , Clone ) ]
313- pub struct BaoFileHandleWeak ( Weak < BaoFileHandleInner > ) ;
330+ pub struct BaoFileHandleWeak < T > ( Weak < BaoFileHandleInner < T > > ) ;
314331
315- impl BaoFileHandleWeak {
332+ impl < T > BaoFileHandleWeak < T > {
316333 /// Upgrade to a strong reference if possible.
317- pub fn upgrade ( & self ) -> Option < BaoFileHandle > {
334+ pub fn upgrade ( & self ) -> Option < BaoFileHandle < T > > {
318335 self . 0 . upgrade ( ) . map ( BaoFileHandle )
319336 }
320337
@@ -326,15 +343,29 @@ impl BaoFileHandleWeak {
326343
327344/// The inner part of a bao file handle.
328345#[ derive( Debug ) ]
329- pub struct BaoFileHandleInner {
330- pub ( crate ) storage : RwLock < BaoFileStorage > ,
346+ pub struct BaoFileHandleInner < T > {
347+ pub ( crate ) storage : RwLock < BaoFileStorage < T > > ,
331348 config : Arc < BaoFileConfig > ,
332349 hash : Hash ,
333350}
334351
335352/// A cheaply cloneable handle to a bao file, including the hash and the configuration.
336- #[ derive( Debug , Clone , derive_more:: Deref ) ]
337- pub struct BaoFileHandle ( Arc < BaoFileHandleInner > ) ;
353+ #[ derive( Debug ) ]
354+ pub struct BaoFileHandle < T > ( Arc < BaoFileHandleInner < T > > ) ;
355+
356+ impl < T > Deref for BaoFileHandle < T > {
357+ type Target = Arc < BaoFileHandleInner < T > > ;
358+
359+ fn deref ( & self ) -> & Self :: Target {
360+ & self . 0
361+ }
362+ }
363+
364+ impl < T > Clone for BaoFileHandle < T > {
365+ fn clone ( & self ) -> Self {
366+ Self ( self . 0 . clone ( ) )
367+ }
368+ }
338369
339370pub ( crate ) type CreateCb = Arc < dyn Fn ( & Hash ) -> io:: Result < ( ) > + Send + Sync > ;
340371
@@ -375,13 +406,18 @@ impl BaoFileConfig {
375406
376407/// A reader for a bao file, reading just the data.
377408#[ derive( Debug ) ]
378- pub struct DataReader ( Option < BaoFileHandle > ) ;
409+ pub struct DataReader < T > ( Option < BaoFileHandle < T > > ) ;
379410
380- async fn with_storage < T , P , F > ( opt : & mut Option < BaoFileHandle > , no_io : P , f : F ) -> io:: Result < T >
411+ async fn with_storage < T , P , F , H > (
412+ opt : & mut Option < BaoFileHandle < H > > ,
413+ no_io : P ,
414+ f : F ,
415+ ) -> io:: Result < T >
381416where
382- P : Fn ( & BaoFileStorage ) -> bool + Send + ' static ,
383- F : FnOnce ( & BaoFileStorage ) -> io:: Result < T > + Send + ' static ,
417+ P : Fn ( & BaoFileStorage < H > ) -> bool + Send + ' static ,
418+ F : FnOnce ( & BaoFileStorage < H > ) -> io:: Result < T > + Send + ' static ,
384419 T : Send + ' static ,
420+ H : Send + Sync + ' static ,
385421{
386422 let handle = opt
387423 . take ( )
@@ -410,7 +446,10 @@ where
410446 res
411447}
412448
413- impl AsyncSliceReader for DataReader {
449+ impl < T > AsyncSliceReader for DataReader < T >
450+ where
451+ T : Send + Sync + bao_tree:: io:: sync:: ReadAt + ' static ,
452+ {
414453 async fn read_at ( & mut self , offset : u64 , len : usize ) -> io:: Result < Bytes > {
415454 with_storage (
416455 & mut self . 0 ,
@@ -440,9 +479,12 @@ impl AsyncSliceReader for DataReader {
440479
441480/// A reader for the outboard part of a bao file.
442481#[ derive( Debug ) ]
443- pub struct OutboardReader ( Option < BaoFileHandle > ) ;
482+ pub struct OutboardReader < T > ( Option < BaoFileHandle < T > > ) ;
444483
445- impl AsyncSliceReader for OutboardReader {
484+ impl < T > AsyncSliceReader for OutboardReader < T >
485+ where
486+ T : Send + Sync + bao_tree:: io:: sync:: ReadAt + ' static ,
487+ {
446488 async fn read_at ( & mut self , offset : u64 , len : usize ) -> io:: Result < Bytes > {
447489 with_storage (
448490 & mut self . 0 ,
@@ -476,7 +518,10 @@ enum HandleChange {
476518 // later: size verified
477519}
478520
479- impl BaoFileHandle {
521+ impl < T > BaoFileHandle < T >
522+ where
523+ T : bao_tree:: io:: sync:: ReadAt ,
524+ {
480525 /// Create a new bao file handle.
481526 ///
482527 /// This will create a new file handle with an empty memory storage.
@@ -509,8 +554,8 @@ impl BaoFileHandle {
509554 pub fn new_complete (
510555 config : Arc < BaoFileConfig > ,
511556 hash : Hash ,
512- data : MemOrFile < Bytes , ( File , u64 ) > ,
513- outboard : MemOrFile < Bytes , ( File , u64 ) > ,
557+ data : MemOrFile < Bytes , FileAndSize < T > > ,
558+ outboard : MemOrFile < Bytes , FileAndSize < T > > ,
514559 ) -> Self {
515560 let storage = BaoFileStorage :: Complete ( CompleteStorage { data, outboard } ) ;
516561 Self ( Arc :: new ( BaoFileHandleInner {
@@ -525,7 +570,7 @@ impl BaoFileHandle {
525570 #[ cfg( feature = "fs-store" ) ]
526571 pub ( crate ) fn transform (
527572 & self ,
528- f : impl FnOnce ( BaoFileStorage ) -> io:: Result < BaoFileStorage > ,
573+ f : impl FnOnce ( BaoFileStorage < T > ) -> io:: Result < BaoFileStorage < T > > ,
529574 ) -> io:: Result < ( ) > {
530575 let mut lock = self . storage . write ( ) . unwrap ( ) ;
531576 let storage = lock. take ( ) ;
@@ -545,15 +590,15 @@ impl BaoFileHandle {
545590 ///
546591 /// Caution: this is a reader for the unvalidated data file. Reading this
547592 /// can produce data that does not match the hash.
548- pub fn data_reader ( & self ) -> DataReader {
593+ pub fn data_reader ( & self ) -> DataReader < T > {
549594 DataReader ( Some ( self . clone ( ) ) )
550595 }
551596
552597 /// An AsyncSliceReader for the outboard file.
553598 ///
554599 /// The outboard file is used to validate the data file. It is not guaranteed
555600 /// to be complete.
556- pub fn outboard_reader ( & self ) -> OutboardReader {
601+ pub fn outboard_reader ( & self ) -> OutboardReader < T > {
557602 OutboardReader ( Some ( self . clone ( ) ) )
558603 }
559604
@@ -567,7 +612,7 @@ impl BaoFileHandle {
567612 }
568613
569614 /// The outboard for the file.
570- pub fn outboard ( & self ) -> io:: Result < PreOrderOutboard < OutboardReader > > {
615+ pub fn outboard ( & self ) -> io:: Result < PreOrderOutboard < OutboardReader < T > > > {
571616 let root = self . hash . into ( ) ;
572617 let tree = BaoTree :: new ( self . current_size ( ) ?, IROH_BLOCK_SIZE ) ;
573618 let outboard = self . outboard_reader ( ) ;
@@ -584,7 +629,7 @@ impl BaoFileHandle {
584629 }
585630
586631 /// Create a new writer from the handle.
587- pub fn writer ( & self ) -> BaoFileWriter {
632+ pub fn writer ( & self ) -> BaoFileWriter < T > {
588633 BaoFileWriter ( Some ( self . clone ( ) ) )
589634 }
590635
@@ -625,7 +670,7 @@ impl BaoFileHandle {
625670 }
626671
627672 /// Downgrade to a weak reference.
628- pub fn downgrade ( & self ) -> BaoFileHandleWeak {
673+ pub fn downgrade ( & self ) -> BaoFileHandleWeak < T > {
629674 BaoFileHandleWeak ( Arc :: downgrade ( & self . 0 ) )
630675 }
631676}
@@ -676,9 +721,12 @@ impl MutableMemStorage {
676721/// It is a BaoFileHandle wrapped in an Option, so that we can take it out
677722/// in the future.
678723#[ derive( Debug ) ]
679- pub struct BaoFileWriter ( Option < BaoFileHandle > ) ;
724+ pub struct BaoFileWriter < T > ( Option < BaoFileHandle < T > > ) ;
680725
681- impl BaoBatchWriter for BaoFileWriter {
726+ impl < T > BaoBatchWriter for BaoFileWriter < T >
727+ where
728+ T : Send + Sync + bao_tree:: io:: sync:: ReadAt + ' static ,
729+ {
682730 async fn write_batch ( & mut self , size : u64 , batch : Vec < BaoContentItem > ) -> std:: io:: Result < ( ) > {
683731 let Some ( handle) = self . 0 . take ( ) else {
684732 return Err ( io:: Error :: new ( io:: ErrorKind :: Other , "deferred batch busy" ) ) ;
@@ -828,7 +876,11 @@ pub mod test_support {
828876 ( outboard. root . into ( ) , chunk_ranges, encoded)
829877 }
830878
831- pub async fn validate ( handle : & BaoFileHandle , original : & [ u8 ] , ranges : & [ Range < u64 > ] ) {
879+ pub async fn validate (
880+ handle : & BaoFileHandle < std:: fs:: File > ,
881+ original : & [ u8 ] ,
882+ ranges : & [ Range < u64 > ] ,
883+ ) {
832884 let mut r = handle. data_reader ( ) ;
833885 for range in ranges {
834886 let start = range. start ;
0 commit comments