@@ -450,6 +450,11 @@ pub trait Persistence: Clone {
450450 /// return the size of the file in bytes if it can be found/read
451451 /// otherwise return a [Self::Err]
452452 fn size ( & self , path : & Path ) -> impl Future < Output = Result < u64 , Self :: Err > > ;
453+
454+ /// read the contents of the file at the path
455+ /// returning the bytes of the file in the success case
456+ /// and [Self::Err] in the error case
457+ fn read ( & self , path : & Path ) -> impl Future < Output = Result < Vec < u8 > , Self :: Err > > ;
453458}
454459
455460/// A persistence layer that writes to the local file system
@@ -463,6 +468,11 @@ impl Persistence for FileSystemPersistence {
463468 let res = std:: fs:: metadata ( path) . map ( |m| m. len ( ) ) ;
464469 async move { res }
465470 }
471+
472+ fn read ( & self , path : & Path ) -> impl Future < Output = Result < Vec < u8 > , Self :: Err > > {
473+ let res = std:: fs:: read ( path) ;
474+ async move { res }
475+ }
466476}
467477
468478impl ImportSource {
@@ -1117,7 +1127,7 @@ where
11171127 // we don't know if the data will be inlined since we don't
11181128 // have the inline options here. But still for such a small file
11191129 // it does not seem worth it do to the temp file ceremony.
1120- let data = std :: fs :: read ( & path) ?;
1130+ let data = Handle :: current ( ) . block_on ( self . fs . read ( & path) ) ?;
11211131 ImportSource :: Memory ( data. into ( ) )
11221132 } else {
11231133 let temp_path = self . temp_file_name ( ) ;
@@ -1216,23 +1226,24 @@ impl<T> Drop for StoreInner<T> {
12161226 }
12171227}
12181228
1219- struct ActorState {
1229+ struct ActorState < T > {
12201230 handles : BTreeMap < Hash , BaoFileHandleWeak > ,
12211231 protected : BTreeSet < Hash > ,
12221232 temp : Arc < RwLock < TempCounterMap > > ,
12231233 msgs_rx : async_channel:: Receiver < ActorMessage > ,
12241234 create_options : Arc < BaoFileConfig > ,
12251235 options : Options ,
12261236 rt : tokio:: runtime:: Handle ,
1237+ fs : T ,
12271238}
12281239
12291240/// The actor for the redb store.
12301241///
12311242/// It is split into the database and the rest of the state to allow for split
12321243/// borrows in the message handlers.
1233- struct Actor {
1244+ struct Actor < T = FileSystemPersistence > {
12341245 db : redb:: Database ,
1235- state : ActorState ,
1246+ state : ActorState < T > ,
12361247}
12371248
12381249/// Error type for message handler functions of the redb actor.
@@ -1586,12 +1597,13 @@ pub(super) async fn gc_sweep_task(
15861597 Ok ( ( ) )
15871598}
15881599
1589- impl Actor {
1590- fn new (
1600+ impl < T > Actor < T > {
1601+ fn new_with_backend (
15911602 path : & Path ,
15921603 options : Options ,
15931604 temp : Arc < RwLock < TempCounterMap > > ,
15941605 rt : tokio:: runtime:: Handle ,
1606+ fs : T ,
15951607 ) -> ActorResult < ( Self , async_channel:: Sender < ActorMessage > ) > {
15961608 let db = match redb:: Database :: create ( path) {
15971609 Ok ( db) => db,
@@ -1635,11 +1647,23 @@ impl Actor {
16351647 options,
16361648 create_options : Arc :: new ( create_options) ,
16371649 rt,
1650+ fs,
16381651 } ,
16391652 } ,
16401653 tx,
16411654 ) )
16421655 }
1656+ }
1657+
1658+ impl Actor {
1659+ fn new (
1660+ path : & Path ,
1661+ options : Options ,
1662+ temp : Arc < RwLock < TempCounterMap > > ,
1663+ rt : tokio:: runtime:: Handle ,
1664+ ) -> ActorResult < ( Self , async_channel:: Sender < ActorMessage > ) > {
1665+ Self :: new_with_backend ( path, options, temp, rt, FileSystemPersistence )
1666+ }
16431667
16441668 async fn run_batched ( mut self ) -> ActorResult < ( ) > {
16451669 let mut msgs = PeekableFlumeReceiver :: new ( self . state . msgs_rx . clone ( ) ) ;
@@ -1723,7 +1747,11 @@ impl Actor {
17231747 }
17241748}
17251749
1726- impl ActorState {
1750+ impl < T > ActorState < T >
1751+ where
1752+ T : Persistence ,
1753+ ActorError : From < T :: Err > ,
1754+ {
17271755 fn entry_status (
17281756 & mut self ,
17291757 tables : & impl ReadableTables ,
@@ -1914,7 +1942,8 @@ impl ActorState {
19141942 "reading external data to inline it: {}" ,
19151943 external_path. display( )
19161944 ) ;
1917- let data = Bytes :: from ( std:: fs:: read ( & external_path) ?) ;
1945+ let data =
1946+ Bytes :: from ( Handle :: current ( ) . block_on ( self . fs . read ( & external_path) ) ?) ;
19181947 DataLocation :: Inline ( data)
19191948 } else {
19201949 DataLocation :: External ( vec ! [ external_path] , data_size)
@@ -2167,7 +2196,7 @@ impl ActorState {
21672196 // inline
21682197 if size <= self . options . inline . max_data_inlined {
21692198 let path = self . options . path . owned_data_path ( & hash) ;
2170- let data = std :: fs :: read ( & path) ?;
2199+ let data = Handle :: current ( ) . block_on ( self . fs . read ( & path) ) ?;
21712200 tables. delete_after_commit . insert ( hash, [ BaoFilePart :: Data ] ) ;
21722201 tables. inline_data . insert ( hash, data. as_slice ( ) ) ?;
21732202 ( DataLocation :: Inline ( ( ) ) , size, true )
@@ -2202,7 +2231,7 @@ impl ActorState {
22022231 if outboard_size <= self . options . inline . max_outboard_inlined =>
22032232 {
22042233 let path = self . options . path . owned_outboard_path ( & hash) ;
2205- let outboard = std :: fs :: read ( & path) ?;
2234+ let outboard = Handle :: current ( ) . block_on ( self . fs . read ( & path) ) ?;
22062235 tables
22072236 . delete_after_commit
22082237 . insert ( hash, [ BaoFilePart :: Outboard ] ) ;
0 commit comments