@@ -450,6 +450,11 @@ pub trait Persistence: Clone {
450450 /// return the size of the file in bytes if it can be found/read
451451 /// otherwise return a [Self::Err]
452452 fn size ( & self , path : & Path ) -> impl Future < Output = Result < u64 , Self :: Err > > ;
453+
454+ /// read the contents of the file at the path
455+ /// returning the bytes of the file in the success case
456+ /// and [Self::Err] in the error case
457+ fn read ( & self , path : & Path ) -> impl Future < Output = Result < Vec < u8 > , Self :: Err > > ;
453458}
454459
455460/// A persistence layer that writes to the local file system
@@ -463,6 +468,11 @@ impl Persistence for FileSystemPersistence {
463468 let res = std:: fs:: metadata ( path) . map ( |m| m. len ( ) ) ;
464469 async move { res }
465470 }
471+
472+ fn read ( & self , path : & Path ) -> impl Future < Output = Result < Vec < u8 > , Self :: Err > > {
473+ let res = std:: fs:: read ( path) ;
474+ async move { res }
475+ }
466476}
467477
468478impl ImportSource {
@@ -1117,7 +1127,7 @@ where
11171127 // we don't know if the data will be inlined since we don't
11181128 // have the inline options here. But still for such a small file
11191129 // it does not seem worth it do to the temp file ceremony.
1120- let data = std :: fs :: read ( & path) ?;
1130+ let data = Handle :: current ( ) . block_on ( self . fs . read ( & path) ) ?;
11211131 ImportSource :: Memory ( data. into ( ) )
11221132 } else {
11231133 let temp_path = self . temp_file_name ( ) ;
@@ -1216,23 +1226,24 @@ impl<T> Drop for StoreInner<T> {
12161226 }
12171227}
12181228
1219- struct ActorState {
1229+ struct ActorState < T > {
12201230 handles : BTreeMap < Hash , BaoFileHandleWeak > ,
12211231 protected : BTreeSet < Hash > ,
12221232 temp : Arc < RwLock < TempCounterMap > > ,
12231233 msgs_rx : async_channel:: Receiver < ActorMessage > ,
12241234 create_options : Arc < BaoFileConfig > ,
12251235 options : Options ,
12261236 rt : tokio:: runtime:: Handle ,
1237+ fs : T ,
12271238}
12281239
12291240/// The actor for the redb store.
12301241///
12311242/// It is split into the database and the rest of the state to allow for split
12321243/// borrows in the message handlers.
1233- struct Actor {
1244+ struct Actor < T = FileSystemPersistence > {
12341245 db : redb:: Database ,
1235- state : ActorState ,
1246+ state : ActorState < T > ,
12361247}
12371248
12381249/// Error type for message handler functions of the redb actor.
@@ -1583,12 +1594,13 @@ pub(super) async fn gc_sweep_task(
15831594 Ok ( ( ) )
15841595}
15851596
1586- impl Actor {
1587- fn new (
1597+ impl < T > Actor < T > {
1598+ fn new_with_backend (
15881599 path : & Path ,
15891600 options : Options ,
15901601 temp : Arc < RwLock < TempCounterMap > > ,
15911602 rt : tokio:: runtime:: Handle ,
1603+ fs : T ,
15921604 ) -> ActorResult < ( Self , async_channel:: Sender < ActorMessage > ) > {
15931605 let db = match redb:: Database :: create ( path) {
15941606 Ok ( db) => db,
@@ -1632,11 +1644,23 @@ impl Actor {
16321644 options,
16331645 create_options : Arc :: new ( create_options) ,
16341646 rt,
1647+ fs,
16351648 } ,
16361649 } ,
16371650 tx,
16381651 ) )
16391652 }
1653+ }
1654+
1655+ impl Actor {
1656+ fn new (
1657+ path : & Path ,
1658+ options : Options ,
1659+ temp : Arc < RwLock < TempCounterMap > > ,
1660+ rt : tokio:: runtime:: Handle ,
1661+ ) -> ActorResult < ( Self , async_channel:: Sender < ActorMessage > ) > {
1662+ Self :: new_with_backend ( path, options, temp, rt, FileSystemPersistence )
1663+ }
16401664
16411665 async fn run_batched ( mut self ) -> ActorResult < ( ) > {
16421666 let mut msgs = PeekableFlumeReceiver :: new ( self . state . msgs_rx . clone ( ) ) ;
@@ -1720,7 +1744,11 @@ impl Actor {
17201744 }
17211745}
17221746
1723- impl ActorState {
1747+ impl < T > ActorState < T >
1748+ where
1749+ T : Persistence ,
1750+ ActorError : From < T :: Err > ,
1751+ {
17241752 fn entry_status (
17251753 & mut self ,
17261754 tables : & impl ReadableTables ,
@@ -1911,7 +1939,8 @@ impl ActorState {
19111939 "reading external data to inline it: {}" ,
19121940 external_path. display( )
19131941 ) ;
1914- let data = Bytes :: from ( std:: fs:: read ( & external_path) ?) ;
1942+ let data =
1943+ Bytes :: from ( Handle :: current ( ) . block_on ( self . fs . read ( & external_path) ) ?) ;
19151944 DataLocation :: Inline ( data)
19161945 } else {
19171946 DataLocation :: External ( vec ! [ external_path] , data_size)
@@ -2164,7 +2193,7 @@ impl ActorState {
21642193 // inline
21652194 if size <= self . options . inline . max_data_inlined {
21662195 let path = self . options . path . owned_data_path ( & hash) ;
2167- let data = std :: fs :: read ( & path) ?;
2196+ let data = Handle :: current ( ) . block_on ( self . fs . read ( & path) ) ?;
21682197 tables. delete_after_commit . insert ( hash, [ BaoFilePart :: Data ] ) ;
21692198 tables. inline_data . insert ( hash, data. as_slice ( ) ) ?;
21702199 ( DataLocation :: Inline ( ( ) ) , size, true )
@@ -2199,7 +2228,7 @@ impl ActorState {
21992228 if outboard_size <= self . options . inline . max_outboard_inlined =>
22002229 {
22012230 let path = self . options . path . owned_outboard_path ( & hash) ;
2202- let outboard = std :: fs :: read ( & path) ?;
2231+ let outboard = Handle :: current ( ) . block_on ( self . fs . read ( & path) ) ?;
22032232 tables
22042233 . delete_after_commit
22052234 . insert ( hash, [ BaoFilePart :: Outboard ] ) ;
0 commit comments