@@ -84,7 +84,7 @@ use iroh_io::AsyncSliceReader;
8484use redb:: { AccessGuard , DatabaseError , ReadableTable , StorageError } ;
8585use serde:: { Deserialize , Serialize } ;
8686use smallvec:: SmallVec ;
87- use tokio:: io:: AsyncWriteExt ;
87+ use tokio:: { io:: AsyncWriteExt , runtime :: Handle } ;
8888use tracing:: trace_span;
8989mod tables;
9090#[ doc( hidden) ]
@@ -441,6 +441,30 @@ pub(crate) enum ImportSource {
441441 Memory ( #[ debug( skip) ] Bytes ) ,
442442}
443443
444+ /// trait which defines the backend persistence layer
445+ /// for this store. e.g. filesystem, s3 etc
446+ pub trait Persistence : Clone {
447+ /// the error type that is returned for the persistence layer
448+ type Err ;
449+
450+ /// return the size of the file in bytes if it can be found/read
451+ /// otherwise return a [Self::Err]
452+ fn size ( & self , path : & Path ) -> impl Future < Output = Result < u64 , Self :: Err > > ;
453+ }
454+
455+ /// A persistence layer that writes to the local file system
456+ #[ derive( Debug , Clone , Copy ) ]
457+ pub struct FileSystemPersistence ;
458+
459+ impl Persistence for FileSystemPersistence {
460+ type Err = io:: Error ;
461+
462+ fn size ( & self , path : & Path ) -> impl Future < Output = Result < u64 , Self :: Err > > {
463+ let res = std:: fs:: metadata ( path) . map ( |m| m. len ( ) ) ;
464+ async move { res }
465+ }
466+ }
467+
444468impl ImportSource {
445469 fn content ( & self ) -> MemOrFile < & [ u8 ] , & Path > {
446470 match self {
@@ -450,10 +474,10 @@ impl ImportSource {
450474 }
451475 }
452476
453- fn len ( & self ) -> io :: Result < u64 > {
477+ async fn len < T : Persistence > ( & self , fs : & T ) -> Result < u64 , T :: Err > {
454478 match self {
455- Self :: TempFile ( path) => std :: fs :: metadata ( path) . map ( |m| m . len ( ) ) ,
456- Self :: External ( path) => std :: fs :: metadata ( path) . map ( |m| m . len ( ) ) ,
479+ Self :: TempFile ( path) => fs . size ( path) . await ,
480+ Self :: External ( path) => fs . size ( path) . await ,
457481 Self :: Memory ( data) => Ok ( data. len ( ) as u64 ) ,
458482 }
459483 }
@@ -711,7 +735,7 @@ pub(crate) type FilterPredicate<K, V> =
711735/// Storage that is using a redb database for small files and files for
712736/// large files.
713737#[ derive( Debug , Clone ) ]
714- pub struct Store ( Arc < StoreInner > ) ;
738+ pub struct Store < T = FileSystemPersistence > ( Arc < StoreInner < T > > ) ;
715739
716740impl Store {
717741 /// Load or create a new store.
@@ -758,11 +782,12 @@ impl Store {
758782}
759783
760784#[ derive( Debug ) ]
761- struct StoreInner {
785+ struct StoreInner < T > {
762786 tx : async_channel:: Sender < ActorMessage > ,
763787 temp : Arc < RwLock < TempCounterMap > > ,
764788 handle : Option < std:: thread:: JoinHandle < ( ) > > ,
765789 path_options : Arc < PathOptions > ,
790+ fs : T ,
766791}
767792
768793impl TagDrop for RwLock < TempCounterMap > {
@@ -777,8 +802,23 @@ impl TagCounter for RwLock<TempCounterMap> {
777802 }
778803}
779804
780- impl StoreInner {
805+ impl StoreInner < FileSystemPersistence > {
781806 fn new_sync ( path : PathBuf , options : Options , rt : tokio:: runtime:: Handle ) -> io:: Result < Self > {
807+ Self :: new_sync_with_backend ( path, options, rt, FileSystemPersistence )
808+ }
809+ }
810+
811+ impl < T > StoreInner < T >
812+ where
813+ T : Persistence ,
814+ OuterError : From < T :: Err > ,
815+ {
816+ fn new_sync_with_backend (
817+ path : PathBuf ,
818+ options : Options ,
819+ rt : tokio:: runtime:: Handle ,
820+ fs : T ,
821+ ) -> io:: Result < Self > {
782822 tracing:: trace!(
783823 "creating data directory: {}" ,
784824 options. path. data_path. display( )
@@ -811,6 +851,7 @@ impl StoreInner {
811851 temp,
812852 handle : Some ( handle) ,
813853 path_options : Arc :: new ( options. path ) ,
854+ fs,
814855 } )
815856 }
816857
@@ -977,10 +1018,13 @@ impl StoreInner {
9771018 . into ( ) ) ;
9781019 }
9791020 let parent = target. parent ( ) . ok_or_else ( || {
980- OuterError :: from ( io:: Error :: new (
981- io:: ErrorKind :: InvalidInput ,
982- "target path has no parent directory" ,
983- ) )
1021+ OuterError :: Inner (
1022+ io:: Error :: new (
1023+ io:: ErrorKind :: InvalidInput ,
1024+ "target path has no parent directory" ,
1025+ )
1026+ . into ( ) ,
1027+ )
9841028 } ) ?;
9851029 std:: fs:: create_dir_all ( parent) ?;
9861030 let temp_tag = self . temp . temp_tag ( HashAndFormat :: raw ( hash) ) ;
@@ -1069,7 +1113,7 @@ impl StoreInner {
10691113 let file = match mode {
10701114 ImportMode :: TryReference => ImportSource :: External ( path) ,
10711115 ImportMode :: Copy => {
1072- if std :: fs :: metadata ( & path) ? . len ( ) < 16 * 1024 {
1116+ if Handle :: current ( ) . block_on ( self . fs . size ( & path) ) ? < 16 * 1024 {
10731117 // we don't know if the data will be inlined since we don't
10741118 // have the inline options here. But still for such a small file
10751119 // it does not seem worth it do to the temp file ceremony.
@@ -1108,7 +1152,7 @@ impl StoreInner {
11081152 id : u64 ,
11091153 progress : impl ProgressSender < Msg = ImportProgress > + IdGenerator ,
11101154 ) -> OuterResult < ( TempTag , u64 ) > {
1111- let data_size = file. len ( ) ?;
1155+ let data_size = Handle :: current ( ) . block_on ( file. len ( & self . fs ) ) ?;
11121156 tracing:: debug!( "finalize_import_sync {:?} {}" , file, data_size) ;
11131157 progress. blocking_send ( ImportProgress :: Size {
11141158 id,
@@ -1161,7 +1205,7 @@ impl StoreInner {
11611205 }
11621206}
11631207
1164- impl Drop for StoreInner {
1208+ impl < T > Drop for StoreInner < T > {
11651209 fn drop ( & mut self ) {
11661210 if let Some ( handle) = self . handle . take ( ) {
11671211 self . tx
@@ -1217,10 +1261,7 @@ pub(crate) enum ActorError {
12171261
12181262impl From < ActorError > for io:: Error {
12191263 fn from ( e : ActorError ) -> Self {
1220- match e {
1221- ActorError :: Io ( e) => e,
1222- e => io:: Error :: new ( io:: ErrorKind :: Other , e) ,
1223- }
1264+ io:: Error :: new ( io:: ErrorKind :: Other , e)
12241265 }
12251266}
12261267
0 commit comments