@@ -8,9 +8,18 @@ use std::collections::HashMap;
88use std:: fs;
99use std:: io:: { Read , Write } ;
1010use std:: path:: { Path , PathBuf } ;
11+ #[ cfg( feature = "tokio" ) ]
12+ use std:: sync:: atomic:: AtomicU64 ;
1113use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
1214use std:: sync:: { Arc , Mutex , RwLock } ;
1315
16+ #[ cfg( feature = "tokio" ) ]
17+ use core:: future:: Future ;
18+ #[ cfg( feature = "tokio" ) ]
19+ use core:: pin:: Pin ;
20+ #[ cfg( feature = "tokio" ) ]
21+ use lightning:: util:: persist:: KVStore ;
22+
1423#[ cfg( target_os = "windows" ) ]
1524use { std:: ffi:: OsStr , std:: os:: windows:: ffi:: OsStrExt } ;
1625
@@ -30,43 +39,70 @@ fn path_to_windows_str<T: AsRef<OsStr>>(path: &T) -> Vec<u16> {
3039 path. as_ref ( ) . encode_wide ( ) . chain ( Some ( 0 ) ) . collect ( )
3140}
3241
33- // The number of read/write/remove/list operations after which we clean up our `locks` HashMap.
34- const GC_LOCK_INTERVAL : usize = 25 ;
35-
36- /// A [`KVStoreSync`] implementation that writes to and reads from the file system.
37- pub struct FilesystemStore {
42+ struct FilesystemStoreInner {
3843 data_dir : PathBuf ,
3944 tmp_file_counter : AtomicUsize ,
40- gc_counter : AtomicUsize ,
41- locks : Mutex < HashMap < PathBuf , Arc < RwLock < ( ) > > > > ,
45+
46+ // Per path lock that ensures that we don't have concurrent writes to the same file. The lock also encapsulates the
47+ // latest written version per key.
48+ locks : Mutex < HashMap < PathBuf , Arc < RwLock < u64 > > > > ,
49+ }
50+
51+ /// A [`KVStore`] and [`KVStoreSync`] implementation that writes to and reads from the file system.
52+ pub struct FilesystemStore {
53+ inner : Arc < FilesystemStoreInner > ,
54+
55+ // Version counter to ensure that writes are applied in the correct order. It is assumed that read, list and remove
56+ // operations aren't sensitive to the order of execution.
57+ #[ cfg( feature = "tokio" ) ]
58+ version_counter : AtomicU64 ,
4259}
4360
4461impl FilesystemStore {
4562 /// Constructs a new [`FilesystemStore`].
4663 pub fn new ( data_dir : PathBuf ) -> Self {
4764 let locks = Mutex :: new ( HashMap :: new ( ) ) ;
4865 let tmp_file_counter = AtomicUsize :: new ( 0 ) ;
49- let gc_counter = AtomicUsize :: new ( 1 ) ;
50- Self { data_dir, tmp_file_counter, gc_counter, locks }
66+ Self {
67+ inner : Arc :: new ( FilesystemStoreInner { data_dir, tmp_file_counter, locks } ) ,
68+ #[ cfg( feature = "tokio" ) ]
69+ version_counter : AtomicU64 :: new ( 0 ) ,
70+ }
5171 }
5272
5373 /// Returns the data directory.
5474 pub fn get_data_dir ( & self ) -> PathBuf {
55- self . data_dir . clone ( )
75+ self . inner . data_dir . clone ( )
5676 }
77+ }
5778
58- fn garbage_collect_locks ( & self ) {
59- let gc_counter = self . gc_counter . fetch_add ( 1 , Ordering :: AcqRel ) ;
79+ impl KVStoreSync for FilesystemStore {
80+ fn read (
81+ & self , primary_namespace : & str , secondary_namespace : & str , key : & str ,
82+ ) -> Result < Vec < u8 > , lightning:: io:: Error > {
83+ self . inner . read ( primary_namespace, secondary_namespace, key)
84+ }
6085
61- if gc_counter % GC_LOCK_INTERVAL == 0 {
62- // Take outer lock for the cleanup.
63- let mut outer_lock = self . locks . lock ( ) . unwrap ( ) ;
86+ fn write (
87+ & self , primary_namespace : & str , secondary_namespace : & str , key : & str , buf : & [ u8 ] ,
88+ ) -> Result < ( ) , lightning:: io:: Error > {
89+ self . inner . write_version ( primary_namespace, secondary_namespace, key, buf, None )
90+ }
6491
65- // Garbage collect all lock entries that are not referenced anymore.
66- outer_lock. retain ( |_, v| Arc :: strong_count ( & v) > 1 ) ;
67- }
92+ fn remove (
93+ & self , primary_namespace : & str , secondary_namespace : & str , key : & str , lazy : bool ,
94+ ) -> Result < ( ) , lightning:: io:: Error > {
95+ self . inner . remove ( primary_namespace, secondary_namespace, key, lazy)
96+ }
97+
98+ fn list (
99+ & self , primary_namespace : & str , secondary_namespace : & str ,
100+ ) -> Result < Vec < String > , lightning:: io:: Error > {
101+ self . inner . list ( primary_namespace, secondary_namespace)
68102 }
103+ }
69104
105+ impl FilesystemStoreInner {
70106 fn get_dest_dir_path (
71107 & self , primary_namespace : & str , secondary_namespace : & str ,
72108 ) -> std:: io:: Result < PathBuf > {
@@ -90,9 +126,7 @@ impl FilesystemStore {
90126
91127 Ok ( dest_dir_path)
92128 }
93- }
94129
95- impl KVStoreSync for FilesystemStore {
96130 fn read (
97131 & self , primary_namespace : & str , secondary_namespace : & str , key : & str ,
98132 ) -> lightning:: io:: Result < Vec < u8 > > {
@@ -113,13 +147,14 @@ impl KVStoreSync for FilesystemStore {
113147 f. read_to_end ( & mut buf) ?;
114148 }
115149
116- self . garbage_collect_locks ( ) ;
117-
118150 Ok ( buf)
119151 }
120152
121- fn write (
153+ /// Writes a specific version of a key to the filesystem. If a newer version has been written already, this function
154+ /// returns early without writing.
155+ fn write_version (
122156 & self , primary_namespace : & str , secondary_namespace : & str , key : & str , buf : & [ u8 ] ,
157+ version : Option < u64 > ,
123158 ) -> lightning:: io:: Result < ( ) > {
124159 check_namespace_key_validity ( primary_namespace, secondary_namespace, Some ( key) , "write" ) ?;
125160
@@ -153,7 +188,18 @@ impl KVStoreSync for FilesystemStore {
153188 let mut outer_lock = self . locks . lock ( ) . unwrap ( ) ;
154189 Arc :: clone ( & outer_lock. entry ( dest_file_path. clone ( ) ) . or_default ( ) )
155190 } ;
156- let _guard = inner_lock_ref. write ( ) . unwrap ( ) ;
191+ let mut last_written_version = inner_lock_ref. write ( ) . unwrap ( ) ;
192+
193+ // If a version is provided, we check if we already have a newer version written. This is used in async
194+ // contexts to realize eventual consistency.
195+ if let Some ( version) = version {
196+ if version <= * last_written_version {
197+ // If the version is not greater, we don't write the file.
198+ return Ok ( ( ) ) ;
199+ }
200+
201+ * last_written_version = version;
202+ }
157203
158204 #[ cfg( not( target_os = "windows" ) ) ]
159205 {
@@ -200,8 +246,6 @@ impl KVStoreSync for FilesystemStore {
200246 }
201247 } ;
202248
203- self . garbage_collect_locks ( ) ;
204-
205249 res
206250 }
207251
@@ -295,8 +339,6 @@ impl KVStoreSync for FilesystemStore {
295339 }
296340 }
297341
298- self . garbage_collect_locks ( ) ;
299-
300342 Ok ( ( ) )
301343 }
302344
@@ -325,12 +367,90 @@ impl KVStoreSync for FilesystemStore {
325367 keys. push ( key) ;
326368 }
327369
328- self . garbage_collect_locks ( ) ;
329-
330370 Ok ( keys)
331371 }
332372}
333373
374+ #[ cfg( feature = "tokio" ) ]
375+ impl KVStore for FilesystemStore {
376+ fn read (
377+ & self , primary_namespace : & str , secondary_namespace : & str , key : & str ,
378+ ) -> Pin < Box < dyn Future < Output = Result < Vec < u8 > , lightning:: io:: Error > > + ' static + Send > > {
379+ let primary_namespace = primary_namespace. to_string ( ) ;
380+ let secondary_namespace = secondary_namespace. to_string ( ) ;
381+ let key = key. to_string ( ) ;
382+ let this = Arc :: clone ( & self . inner ) ;
383+
384+ Box :: pin ( async move {
385+ tokio:: task:: spawn_blocking ( move || {
386+ this. read ( & primary_namespace, & secondary_namespace, & key)
387+ } )
388+ . await
389+ . unwrap_or_else ( |e| Err ( lightning:: io:: Error :: new ( lightning:: io:: ErrorKind :: Other , e) ) )
390+ } )
391+ }
392+
393+ fn write (
394+ & self , primary_namespace : & str , secondary_namespace : & str , key : & str , buf : & [ u8 ] ,
395+ ) -> Pin < Box < dyn Future < Output = Result < ( ) , lightning:: io:: Error > > + ' static + Send > > {
396+ let primary_namespace = primary_namespace. to_string ( ) ;
397+ let secondary_namespace = secondary_namespace. to_string ( ) ;
398+ let key = key. to_string ( ) ;
399+ let buf = buf. to_vec ( ) ;
400+ let this = Arc :: clone ( & self . inner ) ;
401+
402+ // Obtain a version number to retain the call sequence.
403+ let version = self . version_counter . fetch_add ( 1 , Ordering :: SeqCst ) ;
404+
405+ Box :: pin ( async move {
406+ tokio:: task:: spawn_blocking ( move || {
407+ this. write_version (
408+ & primary_namespace,
409+ & secondary_namespace,
410+ & key,
411+ & buf,
412+ Some ( version) ,
413+ )
414+ } )
415+ . await
416+ . unwrap_or_else ( |e| Err ( lightning:: io:: Error :: new ( lightning:: io:: ErrorKind :: Other , e) ) )
417+ } )
418+ }
419+
420+ fn remove (
421+ & self , primary_namespace : & str , secondary_namespace : & str , key : & str , lazy : bool ,
422+ ) -> Pin < Box < dyn Future < Output = Result < ( ) , lightning:: io:: Error > > + ' static + Send > > {
423+ let primary_namespace = primary_namespace. to_string ( ) ;
424+ let secondary_namespace = secondary_namespace. to_string ( ) ;
425+ let key = key. to_string ( ) ;
426+ let this = Arc :: clone ( & self . inner ) ;
427+
428+ Box :: pin ( async move {
429+ tokio:: task:: spawn_blocking ( move || {
430+ this. remove ( & primary_namespace, & secondary_namespace, & key, lazy)
431+ } )
432+ . await
433+ . unwrap_or_else ( |e| Err ( lightning:: io:: Error :: new ( lightning:: io:: ErrorKind :: Other , e) ) )
434+ } )
435+ }
436+
437+ fn list (
438+ & self , primary_namespace : & str , secondary_namespace : & str ,
439+ ) -> Pin < Box < dyn Future < Output = Result < Vec < String > , lightning:: io:: Error > > + ' static + Send > > {
440+ let primary_namespace = primary_namespace. to_string ( ) ;
441+ let secondary_namespace = secondary_namespace. to_string ( ) ;
442+ let this = Arc :: clone ( & self . inner ) ;
443+
444+ Box :: pin ( async move {
445+ tokio:: task:: spawn_blocking ( move || this. list ( & primary_namespace, & secondary_namespace) )
446+ . await
447+ . unwrap_or_else ( |e| {
448+ Err ( lightning:: io:: Error :: new ( lightning:: io:: ErrorKind :: Other , e) )
449+ } )
450+ } )
451+ }
452+ }
453+
334454fn dir_entry_is_key ( p : & Path ) -> Result < bool , lightning:: io:: Error > {
335455 if let Some ( ext) = p. extension ( ) {
336456 #[ cfg( target_os = "windows" ) ]
@@ -427,7 +547,7 @@ fn get_key_from_dir_entry(p: &Path, base_path: &Path) -> Result<String, lightnin
427547
428548impl MigratableKVStore for FilesystemStore {
429549 fn list_all_keys ( & self ) -> Result < Vec < ( String , String , String ) > , lightning:: io:: Error > {
430- let prefixed_dest = & self . data_dir ;
550+ let prefixed_dest = & self . inner . data_dir ;
431551 if !prefixed_dest. exists ( ) {
432552 return Ok ( Vec :: new ( ) ) ;
433553 }
@@ -511,7 +631,7 @@ mod tests {
511631 fn drop ( & mut self ) {
512632 // We test for invalid directory names, so it's OK if directory removal
513633 // fails.
514- match fs:: remove_dir_all ( & self . data_dir ) {
634+ match fs:: remove_dir_all ( & self . inner . data_dir ) {
515635 Err ( e) => println ! ( "Failed to remove test persister directory: {}" , e) ,
516636 _ => { } ,
517637 }
@@ -526,6 +646,48 @@ mod tests {
526646 do_read_write_remove_list_persist ( & fs_store) ;
527647 }
528648
649+ #[ cfg( feature = "tokio" ) ]
650+ #[ tokio:: test]
651+ async fn read_write_remove_list_persist_async ( ) {
652+ use crate :: fs_store:: FilesystemStore ;
653+ use lightning:: util:: persist:: KVStore ;
654+ use std:: sync:: Arc ;
655+
656+ let mut temp_path = std:: env:: temp_dir ( ) ;
657+ temp_path. push ( "test_read_write_remove_list_persist_async" ) ;
658+ let fs_store: Arc < dyn KVStore > = Arc :: new ( FilesystemStore :: new ( temp_path) ) ;
659+
660+ let data1 = [ 42u8 ; 32 ] ;
661+ let data2 = [ 43u8 ; 32 ] ;
662+
663+ let primary_namespace = "testspace" ;
664+ let secondary_namespace = "testsubspace" ;
665+ let key = "testkey" ;
666+
667+ // Test writing the same key twice with different data. Execute the asynchronous part out of order to ensure
668+ // that eventual consistency works.
669+ let fut1 = fs_store. write ( primary_namespace, secondary_namespace, key, & data1) ;
670+ let fut2 = fs_store. write ( primary_namespace, secondary_namespace, key, & data2) ;
671+
672+ fut2. await . unwrap ( ) ;
673+ fut1. await . unwrap ( ) ;
674+
675+ // Test list.
676+ let listed_keys = fs_store. list ( primary_namespace, secondary_namespace) . await . unwrap ( ) ;
677+ assert_eq ! ( listed_keys. len( ) , 1 ) ;
678+ assert_eq ! ( listed_keys[ 0 ] , key) ;
679+
680+ // Test read. We expect to read data2, as the write call was initiated later.
681+ let read_data = fs_store. read ( primary_namespace, secondary_namespace, key) . await . unwrap ( ) ;
682+ assert_eq ! ( data2, & * read_data) ;
683+
684+ // Test remove.
685+ fs_store. remove ( primary_namespace, secondary_namespace, key, false ) . await . unwrap ( ) ;
686+
687+ let listed_keys = fs_store. list ( primary_namespace, secondary_namespace) . await . unwrap ( ) ;
688+ assert_eq ! ( listed_keys. len( ) , 0 ) ;
689+ }
690+
529691 #[ test]
530692 fn test_data_migration ( ) {
531693 let mut source_temp_path = std:: env:: temp_dir ( ) ;
0 commit comments