Skip to content

Commit 99fe94f

Browse files
committed
f: new approach
1 parent 5ea1a8e commit 99fe94f

File tree

1 file changed

+133
-96
lines changed

1 file changed

+133
-96
lines changed

lightning-persister/src/fs_store.rs

Lines changed: 133 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ use std::collections::HashMap;
88
use std::fs;
99
use std::io::{Read, Write};
1010
use std::path::{Path, PathBuf};
11-
#[cfg(feature = "tokio")]
12-
use std::sync::atomic::AtomicU64;
1311
use std::sync::atomic::{AtomicUsize, Ordering};
1412
use std::sync::{Arc, Mutex, RwLock};
1513

@@ -43,43 +41,50 @@ fn path_to_windows_str<T: AsRef<OsStr>>(path: &T) -> Vec<u16> {
4341
// a consistent view and error out.
4442
const LIST_DIR_CONSISTENCY_RETRIES: usize = 10;
4543

44+
#[derive(Default)]
45+
struct AsyncState {
46+
// Version counter to ensure that writes are applied in the correct order. It is assumed that read, list and remove
47+
// operations aren't sensitive to the order of execution.
48+
latest_version: u64,
49+
50+
// The last version that was written to disk.
51+
last_written_version: u64,
52+
}
53+
4654
struct FilesystemStoreInner {
4755
data_dir: PathBuf,
4856
tmp_file_counter: AtomicUsize,
4957

5058
// Per path lock that ensures that we don't have concurrent writes to the same file. The lock also encapsulates the
5159
// latest written version per key.
52-
locks: Mutex<HashMap<PathBuf, Arc<RwLock<u64>>>>,
60+
locks: Mutex<HashMap<PathBuf, Arc<RwLock<AsyncState>>>>,
5361
}
5462

5563
/// A [`KVStore`] and [`KVStoreSync`] implementation that writes to and reads from the file system.
5664
///
5765
/// [`KVStore`]: lightning::util::persist::KVStore
5866
pub struct FilesystemStore {
5967
inner: Arc<FilesystemStoreInner>,
60-
61-
// Version counter to ensure that writes are applied in the correct order. It is assumed that read, list and remove
62-
// operations aren't sensitive to the order of execution.
63-
#[cfg(feature = "tokio")]
64-
version_counter: AtomicU64,
6568
}
6669

6770
impl FilesystemStore {
6871
/// Constructs a new [`FilesystemStore`].
6972
pub fn new(data_dir: PathBuf) -> Self {
7073
let locks = Mutex::new(HashMap::new());
7174
let tmp_file_counter = AtomicUsize::new(0);
72-
Self {
73-
inner: Arc::new(FilesystemStoreInner { data_dir, tmp_file_counter, locks }),
74-
#[cfg(feature = "tokio")]
75-
version_counter: AtomicU64::new(0),
76-
}
75+
Self { inner: Arc::new(FilesystemStoreInner { data_dir, tmp_file_counter, locks }) }
7776
}
7877

7978
/// Returns the data directory.
8079
pub fn get_data_dir(&self) -> PathBuf {
8180
self.inner.data_dir.clone()
8281
}
82+
83+
#[cfg(all(feature = "tokio", test))]
84+
pub(crate) fn state_size(&self) -> usize {
85+
let outer_lock = self.inner.locks.lock().unwrap();
86+
outer_lock.len()
87+
}
8388
}
8489

8590
impl KVStoreSync for FilesystemStore {
@@ -104,7 +109,8 @@ impl KVStoreSync for FilesystemStore {
104109
Some(key),
105110
"write",
106111
)?;
107-
self.inner.write_version(path, buf, None)
112+
let inner_lock_ref = self.inner.get_inner_lock_ref(path.clone());
113+
self.inner.write_version(inner_lock_ref, path, buf, None)
108114
}
109115

110116
fn remove(
@@ -116,7 +122,8 @@ impl KVStoreSync for FilesystemStore {
116122
Some(key),
117123
"remove",
118124
)?;
119-
self.inner.remove_version(path, lazy, None)
125+
let inner_lock_ref = self.inner.get_inner_lock_ref(path.clone());
126+
self.inner.remove_version(inner_lock_ref, path, lazy, None)
120127
}
121128

122129
fn list(
@@ -133,6 +140,11 @@ impl KVStoreSync for FilesystemStore {
133140
}
134141

135142
impl FilesystemStoreInner {
143+
fn get_inner_lock_ref(&self, path: PathBuf) -> Arc<RwLock<AsyncState>> {
144+
let mut outer_lock = self.locks.lock().unwrap();
145+
Arc::clone(&outer_lock.entry(path).or_default())
146+
}
147+
136148
fn get_dest_dir_path(
137149
&self, primary_namespace: &str, secondary_namespace: &str,
138150
) -> std::io::Result<PathBuf> {
@@ -171,13 +183,31 @@ impl FilesystemStoreInner {
171183
Ok(dest_file_path)
172184
}
173185

186+
#[cfg(feature = "tokio")]
187+
fn get_new_version_and_state(&self, dest_file_path: PathBuf) -> (u64, Arc<RwLock<AsyncState>>) {
188+
let inner_lock_ref: Arc<RwLock<AsyncState>> = self.get_inner_lock_ref(dest_file_path);
189+
190+
let new_version = {
191+
let mut async_state = inner_lock_ref.write().unwrap();
192+
Self::get_new_version(&mut async_state)
193+
};
194+
195+
return (new_version, inner_lock_ref);
196+
}
197+
198+
fn get_new_version(async_state: &mut AsyncState) -> u64 {
199+
async_state.latest_version += 1;
200+
if async_state.latest_version == 0 {
201+
panic!("FilesystemStore version counter overflowed");
202+
}
203+
204+
async_state.latest_version
205+
}
206+
174207
fn read(&self, dest_file_path: PathBuf) -> lightning::io::Result<Vec<u8>> {
175208
let mut buf = Vec::new();
176209
{
177-
let inner_lock_ref = {
178-
let mut outer_lock = self.locks.lock().unwrap();
179-
Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
180-
};
210+
let inner_lock_ref = self.get_inner_lock_ref(dest_file_path.clone());
181211
let _guard = inner_lock_ref.read().unwrap();
182212

183213
let mut f = fs::File::open(dest_file_path)?;
@@ -187,10 +217,44 @@ impl FilesystemStoreInner {
187217
Ok(buf)
188218
}
189219

220+
fn execute_locked<F: FnOnce() -> Result<(), lightning::io::Error>>(
221+
&self, inner_lock_ref: Arc<RwLock<AsyncState>>, dest_file_path: PathBuf,
222+
version: Option<u64>, callback: F,
223+
) -> Result<(), lightning::io::Error> {
224+
let mut async_state = inner_lock_ref.write().unwrap();
225+
226+
// Sync calls haven't assigned a version yet because it would require another lock acquisition.
227+
let version = version.unwrap_or_else(|| Self::get_new_version(&mut async_state));
228+
229+
// Check if we already have a newer version written/removed. This is used in async contexts to realize eventual
230+
// consistency.
231+
let stale = version <= async_state.last_written_version;
232+
233+
// If the version is not stale, we execute the callback. Otherwise we can and must skip writing.
234+
let res = if stale {
235+
Ok(())
236+
} else {
237+
callback().map(|_| {
238+
async_state.last_written_version = version;
239+
})
240+
};
241+
242+
let more_writes_pending = async_state.last_written_version < async_state.latest_version;
243+
244+
// If there are no more writes pending and no arcs in use elsewhere, we can remove the map entry to prevent
245+
// leaking memory. The two arcs are the one in the map and the one held here in inner_lock_ref.
246+
if !more_writes_pending && Arc::strong_count(&inner_lock_ref) == 2 {
247+
self.locks.lock().unwrap().remove(&dest_file_path);
248+
}
249+
250+
res
251+
}
252+
190253
/// Writes a specific version of a key to the filesystem. If a newer version has been written already, this function
191254
/// returns early without writing.
192255
fn write_version(
193-
&self, dest_file_path: PathBuf, buf: Vec<u8>, version: Option<u64>,
256+
&self, inner_lock_ref: Arc<RwLock<AsyncState>>, dest_file_path: PathBuf, buf: Vec<u8>,
257+
version: Option<u64>,
194258
) -> lightning::io::Result<()> {
195259
let parent_directory = dest_file_path.parent().ok_or_else(|| {
196260
let msg =
@@ -214,24 +278,7 @@ impl FilesystemStoreInner {
214278
tmp_file.sync_all()?;
215279
}
216280

217-
let res = {
218-
let inner_lock_ref = {
219-
let mut outer_lock = self.locks.lock().unwrap();
220-
Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
221-
};
222-
let mut last_written_version = inner_lock_ref.write().unwrap();
223-
224-
// If a version is provided, we check if we already have a newer version written/removed. This is used in
225-
// async contexts to realize eventual consistency.
226-
if let Some(version) = version {
227-
if version <= *last_written_version {
228-
// If the version is not greater, we don't touch the file.
229-
return Ok(());
230-
}
231-
232-
*last_written_version = version;
233-
}
234-
281+
self.execute_locked(inner_lock_ref, dest_file_path.clone(), version, || {
235282
#[cfg(not(target_os = "windows"))]
236283
{
237284
fs::rename(&tmp_file_path, &dest_file_path)?;
@@ -275,34 +322,16 @@ impl FilesystemStoreInner {
275322
Err(e) => Err(e.into()),
276323
}
277324
}
278-
};
279-
280-
res
325+
})
281326
}
282327

283328
fn remove_version(
284-
&self, dest_file_path: PathBuf, lazy: bool, version: Option<u64>,
329+
&self, inner_lock_ref: Arc<RwLock<AsyncState>>, dest_file_path: PathBuf, lazy: bool,
330+
version: Option<u64>,
285331
) -> lightning::io::Result<()> {
286-
if !dest_file_path.is_file() {
287-
return Ok(());
288-
}
289-
290-
{
291-
let inner_lock_ref = {
292-
let mut outer_lock = self.locks.lock().unwrap();
293-
Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
294-
};
295-
let mut last_written_version = inner_lock_ref.write().unwrap();
296-
297-
// If a version is provided, we check if we already have a newer version written/removed. This is used in
298-
// async contexts to realize eventual consistency.
299-
if let Some(version) = version {
300-
if version <= *last_written_version {
301-
// If the version is not greater, we don't touch the file.
302-
return Ok(());
303-
}
304-
305-
*last_written_version = version;
332+
self.execute_locked(inner_lock_ref, dest_file_path.clone(), version, || {
333+
if !dest_file_path.is_file() {
334+
return Ok(());
306335
}
307336

308337
if lazy {
@@ -352,11 +381,11 @@ impl FilesystemStoreInner {
352381

353382
call!(unsafe {
354383
windows_sys::Win32::Storage::FileSystem::MoveFileExW(
355-
path_to_windows_str(&dest_file_path).as_ptr(),
356-
path_to_windows_str(&trash_file_path).as_ptr(),
357-
windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH
384+
path_to_windows_str(&dest_file_path).as_ptr(),
385+
path_to_windows_str(&trash_file_path).as_ptr(),
386+
windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH
358387
| windows_sys::Win32::Storage::FileSystem::MOVEFILE_REPLACE_EXISTING,
359-
)
388+
)
360389
})?;
361390

362391
{
@@ -374,9 +403,9 @@ impl FilesystemStoreInner {
374403
fs::remove_file(trash_file_path).ok();
375404
}
376405
}
377-
}
378406

379-
Ok(())
407+
Ok(())
408+
})
380409
}
381410

382411
fn list(&self, prefixed_dest: PathBuf) -> lightning::io::Result<Vec<String>> {
@@ -462,17 +491,13 @@ impl KVStore for FilesystemStore {
462491
};
463492

464493
// Obtain a version number to retain the call sequence.
465-
let version = self.version_counter.fetch_add(1, Ordering::Relaxed);
466-
if version == u64::MAX {
467-
panic!("FilesystemStore version counter overflowed");
468-
}
469-
494+
let (version, inner_lock_ref) = self.inner.get_new_version_and_state(path.clone());
470495
Box::pin(async move {
471-
tokio::task::spawn_blocking(move || this.write_version(path, buf, Some(version)))
472-
.await
473-
.unwrap_or_else(|e| {
474-
Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))
475-
})
496+
tokio::task::spawn_blocking(move || {
497+
this.write_version(inner_lock_ref, path, buf, Some(version))
498+
})
499+
.await
500+
.unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)))
476501
})
477502
}
478503

@@ -491,17 +516,13 @@ impl KVStore for FilesystemStore {
491516
};
492517

493518
// Obtain a version number to retain the call sequence.
494-
let version = self.version_counter.fetch_add(1, Ordering::Relaxed);
495-
if version == u64::MAX {
496-
panic!("FilesystemStore version counter overflowed");
497-
}
498-
519+
let (version, inner_lock_ref) = self.inner.get_new_version_and_state(path.clone());
499520
Box::pin(async move {
500-
tokio::task::spawn_blocking(move || this.remove_version(path, lazy, Some(version)))
501-
.await
502-
.unwrap_or_else(|e| {
503-
Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))
504-
})
521+
tokio::task::spawn_blocking(move || {
522+
this.remove_version(inner_lock_ref, path, lazy, Some(version))
523+
})
524+
.await
525+
.unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)))
505526
})
506527
}
507528

@@ -729,7 +750,10 @@ mod tests {
729750

730751
let mut temp_path = std::env::temp_dir();
731752
temp_path.push("test_read_write_remove_list_persist_async");
732-
let fs_store: Arc<dyn KVStore> = Arc::new(FilesystemStore::new(temp_path));
753+
let fs_store = Arc::new(FilesystemStore::new(temp_path));
754+
assert_eq!(fs_store.state_size(), 0);
755+
756+
let async_fs_store: Arc<dyn KVStore> = fs_store.clone();
733757

734758
let data1 = vec![42u8; 32];
735759
let data2 = vec![43u8; 32];
@@ -740,27 +764,40 @@ mod tests {
740764

741765
// Test writing the same key twice with different data. Execute the asynchronous part out of order to ensure
742766
// that eventual consistency works.
743-
let fut1 = fs_store.write(primary_namespace, secondary_namespace, key, data1);
744-
let fut2 = fs_store.remove(primary_namespace, secondary_namespace, key, false);
745-
let fut3 = fs_store.write(primary_namespace, secondary_namespace, key, data2.clone());
767+
let fut1 = async_fs_store.write(primary_namespace, secondary_namespace, key, data1);
768+
assert_eq!(fs_store.state_size(), 1);
769+
770+
let fut2 = async_fs_store.remove(primary_namespace, secondary_namespace, key, false);
771+
assert_eq!(fs_store.state_size(), 1);
772+
773+
let fut3 = async_fs_store.write(primary_namespace, secondary_namespace, key, data2.clone());
774+
assert_eq!(fs_store.state_size(), 1);
746775

747776
fut3.await.unwrap();
777+
assert_eq!(fs_store.state_size(), 1);
778+
748779
fut2.await.unwrap();
780+
assert_eq!(fs_store.state_size(), 1);
781+
749782
fut1.await.unwrap();
783+
assert_eq!(fs_store.state_size(), 0);
750784

751785
// Test list.
752-
let listed_keys = fs_store.list(primary_namespace, secondary_namespace).await.unwrap();
786+
let listed_keys =
787+
async_fs_store.list(primary_namespace, secondary_namespace).await.unwrap();
753788
assert_eq!(listed_keys.len(), 1);
754789
assert_eq!(listed_keys[0], key);
755790

756791
// Test read. We expect to read data2, as the write call was initiated later.
757-
let read_data = fs_store.read(primary_namespace, secondary_namespace, key).await.unwrap();
792+
let read_data =
793+
async_fs_store.read(primary_namespace, secondary_namespace, key).await.unwrap();
758794
assert_eq!(data2, &*read_data);
759795

760796
// Test remove.
761-
fs_store.remove(primary_namespace, secondary_namespace, key, false).await.unwrap();
797+
async_fs_store.remove(primary_namespace, secondary_namespace, key, false).await.unwrap();
762798

763-
let listed_keys = fs_store.list(primary_namespace, secondary_namespace).await.unwrap();
799+
let listed_keys =
800+
async_fs_store.list(primary_namespace, secondary_namespace).await.unwrap();
764801
assert_eq!(listed_keys.len(), 0);
765802
}
766803

0 commit comments

Comments
 (0)