@@ -8,7 +8,7 @@ use std::collections::HashMap;
88use std:: fs;
99use std:: io:: { Read , Write } ;
1010use std:: path:: { Path , PathBuf } ;
11- use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
11+ use std:: sync:: atomic:: { AtomicU64 , AtomicUsize , Ordering } ;
1212use std:: sync:: { Arc , Mutex , RwLock } ;
1313
1414#[ cfg( feature = "tokio" ) ]
@@ -41,45 +41,55 @@ fn path_to_windows_str<T: AsRef<OsStr>>(path: &T) -> Vec<u16> {
4141// a consistent view and error out.
4242const LIST_DIR_CONSISTENCY_RETRIES : usize = 10 ;
4343
44- #[ derive( Default ) ]
45- struct AsyncState {
46- // Version counter to ensure that writes are applied in the correct order. It is assumed that read, list and remove
47- // operations aren't sensitive to the order of execution.
48- latest_version : u64 ,
49-
50- // The last version that was written to disk.
51- latest_written_version : u64 ,
52- }
53-
5444struct FilesystemStoreInner {
5545 data_dir : PathBuf ,
5646 tmp_file_counter : AtomicUsize ,
5747
5848 // Per path lock that ensures that we don't have concurrent writes to the same file. The lock also encapsulates the
5949 // latest written version per key.
60- locks : Mutex < HashMap < PathBuf , Arc < RwLock < AsyncState > > > > ,
50+ locks : Mutex < HashMap < PathBuf , Arc < RwLock < u64 > > > > ,
6151}
6252
6353/// A [`KVStore`] and [`KVStoreSync`] implementation that writes to and reads from the file system.
6454///
6555/// [`KVStore`]: lightning::util::persist::KVStore
6656pub struct FilesystemStore {
6757 inner : Arc < FilesystemStoreInner > ,
58+
59+ // Version counter to ensure that writes are applied in the correct order. It is assumed that read and list
60+ // operations aren't sensitive to the order of execution.
61+ next_version : AtomicU64 ,
6862}
6963
7064impl FilesystemStore {
7165 /// Constructs a new [`FilesystemStore`].
7266 pub fn new ( data_dir : PathBuf ) -> Self {
7367 let locks = Mutex :: new ( HashMap :: new ( ) ) ;
7468 let tmp_file_counter = AtomicUsize :: new ( 0 ) ;
75- Self { inner : Arc :: new ( FilesystemStoreInner { data_dir, tmp_file_counter, locks } ) }
69+ Self {
70+ inner : Arc :: new ( FilesystemStoreInner { data_dir, tmp_file_counter, locks } ) ,
71+ next_version : AtomicU64 :: new ( 1 ) ,
72+ }
7673 }
7774
7875 /// Returns the data directory.
7976 pub fn get_data_dir ( & self ) -> PathBuf {
8077 self . inner . data_dir . clone ( )
8178 }
8279
80+ fn get_new_version_and_lock_ref ( & self , dest_file_path : PathBuf ) -> ( Arc < RwLock < u64 > > , u64 ) {
81+ let version = self . next_version . fetch_add ( 1 , Ordering :: Relaxed ) ;
82+ if version == u64:: MAX {
83+ panic ! ( "FilesystemStore version counter overflowed" ) ;
84+ }
85+
86+ // Get a reference to the inner lock. We do this early so that the arc can double as an in-flight counter for
87+ // cleaning up unused locks.
88+ let inner_lock_ref = self . inner . get_inner_lock_ref ( dest_file_path) ;
89+
90+ ( inner_lock_ref, version)
91+ }
92+
8393 #[ cfg( any( all( feature = "tokio" , test) , fuzzing) ) ]
8494 /// Returns the size of the async state.
8595 pub fn state_size ( & self ) -> usize {
@@ -110,8 +120,8 @@ impl KVStoreSync for FilesystemStore {
110120 Some ( key) ,
111121 "write" ,
112122 ) ?;
113- let inner_lock_ref = self . inner . get_inner_lock_ref ( path. clone ( ) ) ;
114- self . inner . write_version ( inner_lock_ref, path, buf, None )
123+ let ( inner_lock_ref, version ) = self . get_new_version_and_lock_ref ( path. clone ( ) ) ;
124+ self . inner . write_version ( inner_lock_ref, path, buf, version )
115125 }
116126
117127 fn remove (
@@ -123,8 +133,8 @@ impl KVStoreSync for FilesystemStore {
123133 Some ( key) ,
124134 "remove" ,
125135 ) ?;
126- let inner_lock_ref = self . inner . get_inner_lock_ref ( path. clone ( ) ) ;
127- self . inner . remove_version ( inner_lock_ref, path, lazy, None )
136+ let ( inner_lock_ref, version ) = self . get_new_version_and_lock_ref ( path. clone ( ) ) ;
137+ self . inner . remove_version ( inner_lock_ref, path, lazy, version )
128138 }
129139
130140 fn list (
@@ -141,7 +151,7 @@ impl KVStoreSync for FilesystemStore {
141151}
142152
143153impl FilesystemStoreInner {
144- fn get_inner_lock_ref ( & self , path : PathBuf ) -> Arc < RwLock < AsyncState > > {
154+ fn get_inner_lock_ref ( & self , path : PathBuf ) -> Arc < RwLock < u64 > > {
145155 let mut outer_lock = self . locks . lock ( ) . unwrap ( ) ;
146156 Arc :: clone ( & outer_lock. entry ( path) . or_default ( ) )
147157 }
@@ -184,34 +194,10 @@ impl FilesystemStoreInner {
184194 Ok ( dest_file_path)
185195 }
186196
187- #[ cfg( feature = "tokio" ) ]
188- fn get_new_version_and_state ( & self , dest_file_path : PathBuf ) -> ( u64 , Arc < RwLock < AsyncState > > ) {
189- let inner_lock_ref: Arc < RwLock < AsyncState > > = self . get_inner_lock_ref ( dest_file_path) ;
190-
191- let new_version = {
192- let mut async_state = inner_lock_ref. write ( ) . unwrap ( ) ;
193- Self :: get_new_version ( & mut async_state)
194- } ;
195-
196- return ( new_version, inner_lock_ref) ;
197- }
198-
199- fn get_new_version ( async_state : & mut AsyncState ) -> u64 {
200- async_state. latest_version += 1 ;
201- if async_state. latest_version == 0 {
202- panic ! ( "FilesystemStore version counter overflowed" ) ;
203- }
204-
205- debug_assert ! ( async_state. latest_version > async_state. latest_written_version) ;
206-
207- async_state. latest_version
208- }
209-
210197 fn read ( & self , dest_file_path : PathBuf ) -> lightning:: io:: Result < Vec < u8 > > {
211198 let mut buf = Vec :: new ( ) ;
212199
213- let inner_lock_ref = self . get_inner_lock_ref ( dest_file_path. clone ( ) ) ;
214- self . execute_locked_read ( inner_lock_ref, dest_file_path. clone ( ) , || {
200+ self . execute_locked_read ( dest_file_path. clone ( ) , || {
215201 let mut f = fs:: File :: open ( dest_file_path. clone ( ) ) ?;
216202 f. read_to_end ( & mut buf) ?;
217203 Ok ( ( ) )
@@ -221,24 +207,20 @@ impl FilesystemStoreInner {
221207 }
222208
223209 fn execute_locked_write < F : FnOnce ( ) -> Result < ( ) , lightning:: io:: Error > > (
224- & self , inner_lock_ref : Arc < RwLock < AsyncState > > , dest_file_path : PathBuf ,
225- version : Option < u64 > , callback : F ,
210+ & self , inner_lock_ref : Arc < RwLock < u64 > > , dest_file_path : PathBuf , version : u64 , callback : F ,
226211 ) -> Result < ( ) , lightning:: io:: Error > {
227- let mut async_state = inner_lock_ref. write ( ) . unwrap ( ) ;
228-
229- // Sync calls haven't assigned a version yet because it would require another lock acquisition.
230- let version = version. unwrap_or_else ( || Self :: get_new_version ( & mut async_state) ) ;
212+ let mut last_written_version = inner_lock_ref. write ( ) . unwrap ( ) ;
231213
232214 // Check if we already have a newer version written/removed. This is used in async contexts to realize eventual
233215 // consistency.
234- let is_stale_version = version <= async_state . latest_written_version ;
216+ let is_stale_version = version <= * last_written_version ;
235217
236218 // If the version is not stale, we execute the callback. Otherwise we can and must skip writing.
237219 let res = if is_stale_version {
238220 Ok ( ( ) )
239221 } else {
240222 callback ( ) . map ( |_| {
241- async_state . latest_written_version = version;
223+ * last_written_version = version;
242224 } )
243225 } ;
244226
@@ -248,15 +230,16 @@ impl FilesystemStoreInner {
248230 }
249231
250232 fn execute_locked_read < F : FnOnce ( ) -> Result < ( ) , lightning:: io:: Error > > (
251- & self , inner_lock_ref : Arc < RwLock < AsyncState > > , dest_file_path : PathBuf , callback : F ,
233+ & self , dest_file_path : PathBuf , callback : F ,
252234 ) -> Result < ( ) , lightning:: io:: Error > {
253- let _async_state = inner_lock_ref. read ( ) . unwrap ( ) ;
235+ let inner_lock_ref = self . get_inner_lock_ref ( dest_file_path. clone ( ) ) ;
236+ let _guard = inner_lock_ref. read ( ) . unwrap ( ) ;
254237 let res = callback ( ) ;
255238 self . clean_locks ( & inner_lock_ref, dest_file_path) ;
256239 res
257240 }
258241
259- fn clean_locks ( & self , inner_lock_ref : & Arc < RwLock < AsyncState > > , dest_file_path : PathBuf ) {
242+ fn clean_locks ( & self , inner_lock_ref : & Arc < RwLock < u64 > > , dest_file_path : PathBuf ) {
260243 // If there no arcs in use elsewhere, this means that there are no in-flight writes. We can remove the map entry
261244 // to prevent leaking memory. The two arcs that are expected are the one in the map and the one held here in
262245 // inner_lock_ref. The outer lock is obtained first, to avoid a new arc being cloned after we've already
@@ -270,8 +253,8 @@ impl FilesystemStoreInner {
270253 /// Writes a specific version of a key to the filesystem. If a newer version has been written already, this function
271254 /// returns early without writing.
272255 fn write_version (
273- & self , inner_lock_ref : Arc < RwLock < AsyncState > > , dest_file_path : PathBuf , buf : Vec < u8 > ,
274- version : Option < u64 > ,
256+ & self , inner_lock_ref : Arc < RwLock < u64 > > , dest_file_path : PathBuf , buf : Vec < u8 > ,
257+ version : u64 ,
275258 ) -> lightning:: io:: Result < ( ) > {
276259 let parent_directory = dest_file_path. parent ( ) . ok_or_else ( || {
277260 let msg =
@@ -343,8 +326,7 @@ impl FilesystemStoreInner {
343326 }
344327
345328 fn remove_version (
346- & self , inner_lock_ref : Arc < RwLock < AsyncState > > , dest_file_path : PathBuf , lazy : bool ,
347- version : Option < u64 > ,
329+ & self , inner_lock_ref : Arc < RwLock < u64 > > , dest_file_path : PathBuf , lazy : bool , version : u64 ,
348330 ) -> lightning:: io:: Result < ( ) > {
349331 self . execute_locked_write ( inner_lock_ref, dest_file_path. clone ( ) , version, || {
350332 if !dest_file_path. is_file ( ) {
@@ -507,11 +489,10 @@ impl KVStore for FilesystemStore {
507489 Err ( e) => return Box :: pin ( async move { Err ( e) } ) ,
508490 } ;
509491
510- // Obtain a version number to retain the call sequence.
511- let ( version, inner_lock_ref) = self . inner . get_new_version_and_state ( path. clone ( ) ) ;
492+ let ( inner_lock_ref, version) = self . get_new_version_and_lock_ref ( path. clone ( ) ) ;
512493 Box :: pin ( async move {
513494 tokio:: task:: spawn_blocking ( move || {
514- this. write_version ( inner_lock_ref, path, buf, Some ( version) )
495+ this. write_version ( inner_lock_ref, path, buf, version)
515496 } )
516497 . await
517498 . unwrap_or_else ( |e| Err ( lightning:: io:: Error :: new ( lightning:: io:: ErrorKind :: Other , e) ) )
@@ -532,11 +513,10 @@ impl KVStore for FilesystemStore {
532513 Err ( e) => return Box :: pin ( async move { Err ( e) } ) ,
533514 } ;
534515
535- // Obtain a version number to retain the call sequence.
536- let ( version, inner_lock_ref) = self . inner . get_new_version_and_state ( path. clone ( ) ) ;
516+ let ( inner_lock_ref, version) = self . get_new_version_and_lock_ref ( path. clone ( ) ) ;
537517 Box :: pin ( async move {
538518 tokio:: task:: spawn_blocking ( move || {
539- this. remove_version ( inner_lock_ref, path, lazy, Some ( version) )
519+ this. remove_version ( inner_lock_ref, path, lazy, version)
540520 } )
541521 . await
542522 . unwrap_or_else ( |e| Err ( lightning:: io:: Error :: new ( lightning:: io:: ErrorKind :: Other , e) ) )
0 commit comments