diff --git a/fuzz/Cargo.toml b/fuzz/Cargo.toml index 6dbbfe13612..908dd559ff8 100644 --- a/fuzz/Cargo.toml +++ b/fuzz/Cargo.toml @@ -22,8 +22,10 @@ lightning = { path = "../lightning", features = ["regex", "_test_utils"] } lightning-invoice = { path = "../lightning-invoice" } lightning-liquidity = { path = "../lightning-liquidity" } lightning-rapid-gossip-sync = { path = "../lightning-rapid-gossip-sync" } +lightning-persister = { path = "../lightning-persister", features = ["tokio"]} bech32 = "0.11.0" bitcoin = { version = "0.32.2", features = ["secp-lowmemory"] } +tokio = { version = "~1.35", default-features = false, features = ["rt-multi-thread"] } afl = { version = "0.12", optional = true } honggfuzz = { version = "0.5", optional = true, default-features = false } diff --git a/fuzz/ci-fuzz.sh b/fuzz/ci-fuzz.sh index 5f5cb0b5ad2..d1274d751a8 100755 --- a/fuzz/ci-fuzz.sh +++ b/fuzz/ci-fuzz.sh @@ -40,6 +40,8 @@ for TARGET in src/bin/*.rs; do HFUZZ_RUN_ARGS="$HFUZZ_RUN_ARGS -N10000" elif [ "$FILE" = "indexedmap_target" ]; then HFUZZ_RUN_ARGS="$HFUZZ_RUN_ARGS -N100000" + elif [ "$FILE" = "fs_store_target" ]; then + HFUZZ_RUN_ARGS="$HFUZZ_RUN_ARGS -F 64 -N10000" else HFUZZ_RUN_ARGS="$HFUZZ_RUN_ARGS -N1000000" fi diff --git a/fuzz/src/bin/fs_store_target.rs b/fuzz/src/bin/fs_store_target.rs new file mode 100644 index 00000000000..804b09a84cf --- /dev/null +++ b/fuzz/src/bin/fs_store_target.rs @@ -0,0 +1,120 @@ +// This file is Copyright its original authors, visible in version control +// history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license +// , at your option. +// You may not use this file except in accordance with one or both of these +// licenses. + +// This file is auto-generated by gen_target.sh based on target_template.txt +// To modify it, modify target_template.txt and run gen_target.sh instead. + +#![cfg_attr(feature = "libfuzzer_fuzz", no_main)] +#![cfg_attr(rustfmt, rustfmt_skip)] + +#[cfg(not(fuzzing))] +compile_error!("Fuzz targets need cfg=fuzzing"); + +#[cfg(not(hashes_fuzz))] +compile_error!("Fuzz targets need cfg=hashes_fuzz"); + +#[cfg(not(secp256k1_fuzz))] +compile_error!("Fuzz targets need cfg=secp256k1_fuzz"); + +extern crate lightning_fuzz; +use lightning_fuzz::fs_store::*; + +#[cfg(feature = "afl")] +#[macro_use] extern crate afl; +#[cfg(feature = "afl")] +fn main() { + fuzz!(|data| { + fs_store_run(data.as_ptr(), data.len()); + }); +} + +#[cfg(feature = "honggfuzz")] +#[macro_use] extern crate honggfuzz; +#[cfg(feature = "honggfuzz")] +fn main() { + loop { + fuzz!(|data| { + fs_store_run(data.as_ptr(), data.len()); + }); + } +} + +#[cfg(feature = "libfuzzer_fuzz")] +#[macro_use] extern crate libfuzzer_sys; +#[cfg(feature = "libfuzzer_fuzz")] +fuzz_target!(|data: &[u8]| { + fs_store_run(data.as_ptr(), data.len()); +}); + +#[cfg(feature = "stdin_fuzz")] +fn main() { + use std::io::Read; + + let mut data = Vec::with_capacity(8192); + std::io::stdin().read_to_end(&mut data).unwrap(); + fs_store_run(data.as_ptr(), data.len()); +} + +#[test] +fn run_test_cases() { + use std::fs; + use std::io::Read; + use lightning_fuzz::utils::test_logger::StringBuffer; + + use std::sync::{atomic, Arc}; + { + let data: Vec = vec![0]; + fs_store_run(data.as_ptr(), data.len()); + } + let mut threads = Vec::new(); + let threads_running = Arc::new(atomic::AtomicUsize::new(0)); + if let Ok(tests) = fs::read_dir("test_cases/fs_store") { + for test in tests { + let mut data: Vec = Vec::new(); + let path = test.unwrap().path(); + fs::File::open(&path).unwrap().read_to_end(&mut data).unwrap(); + threads_running.fetch_add(1, atomic::Ordering::AcqRel); + + let thread_count_ref = Arc::clone(&threads_running); + let main_thread_ref = std::thread::current(); + threads.push((path.file_name().unwrap().to_str().unwrap().to_string(), + std::thread::spawn(move || { + let string_logger = StringBuffer::new(); + + let panic_logger = string_logger.clone(); + let res = if ::std::panic::catch_unwind(move || { + fs_store_test(&data, panic_logger); + }).is_err() { + Some(string_logger.into_string()) + } else { None }; + thread_count_ref.fetch_sub(1, atomic::Ordering::AcqRel); + main_thread_ref.unpark(); + res + }) + )); + while threads_running.load(atomic::Ordering::Acquire) > 32 { + std::thread::park(); + } + } + } + let mut failed_outputs = Vec::new(); + for (test, thread) in threads.drain(..) { + if let Some(output) = thread.join().unwrap() { + println!("\nOutput of {}:\n{}\n", test, output); + failed_outputs.push(test); + } + } + if !failed_outputs.is_empty() { + println!("Test cases which failed: "); + for case in failed_outputs { + println!("{}", case); + } + panic!(); + } +} diff --git a/fuzz/src/bin/gen_target.sh b/fuzz/src/bin/gen_target.sh index 0f7f9c9210d..b4f0c7a12b9 100755 --- a/fuzz/src/bin/gen_target.sh +++ b/fuzz/src/bin/gen_target.sh @@ -28,6 +28,7 @@ GEN_TEST base32 GEN_TEST fromstr_to_netaddress GEN_TEST feature_flags GEN_TEST lsps_message +GEN_TEST fs_store GEN_TEST msg_accept_channel msg_targets:: GEN_TEST msg_announcement_signatures msg_targets:: diff --git a/fuzz/src/fs_store.rs b/fuzz/src/fs_store.rs new file mode 100644 index 00000000000..821439f390e --- /dev/null +++ b/fuzz/src/fs_store.rs @@ -0,0 +1,184 @@ +use core::hash::{BuildHasher, Hasher}; +use lightning::util::persist::{KVStore, KVStoreSync}; +use lightning_persister::fs_store::FilesystemStore; +use std::fs; +use tokio::runtime::Runtime; + +use crate::utils::test_logger; + +struct TempFilesystemStore { + temp_path: std::path::PathBuf, + inner: FilesystemStore, +} + +impl TempFilesystemStore { + fn new() -> Self { + const SHM_PATH: &str = "/dev/shm"; + let mut temp_path = if std::path::Path::new(SHM_PATH).exists() { + std::path::PathBuf::from(SHM_PATH) + } else { + std::env::temp_dir() + }; + + let random_number = std::collections::hash_map::RandomState::new().build_hasher().finish(); + let random_folder_name = format!("fs_store_fuzz_{:016x}", random_number); + temp_path.push(random_folder_name); + + let inner = FilesystemStore::new(temp_path.clone()); + TempFilesystemStore { inner, temp_path } + } +} + +impl Drop for TempFilesystemStore { + fn drop(&mut self) { + _ = fs::remove_dir_all(&self.temp_path) + } +} + +/// Actual fuzz test, method signature and name are fixed +fn do_test(data: &[u8], out: Out) { + let rt = Runtime::new().unwrap(); + rt.block_on(do_test_internal(data, out)); +} + +async fn do_test_internal(data: &[u8], _out: Out) { + let mut read_pos = 0; + macro_rules! get_slice { + ($len: expr) => {{ + let slice_len = $len as usize; + if data.len() < read_pos + slice_len { + None + } else { + read_pos += slice_len; + Some(&data[read_pos - slice_len..read_pos]) + } + }}; + } + + let temp_fs_store = TempFilesystemStore::new(); + let fs_store = &temp_fs_store.inner; + + let primary_namespace = "primary"; + let secondary_namespace = "secondary"; + let key = "key"; + + let mut next_data_value = 0u64; + let mut get_next_data_value = || { + let data_value = next_data_value.to_be_bytes().to_vec(); + next_data_value += 1; + + data_value + }; + + let mut current_data = None; + + let mut handles = Vec::new(); + loop { + let v = match get_slice!(1) { + Some(b) => b[0], + None => break, + }; + match v % 13 { + // Sync write + 0 => { + let data_value = get_next_data_value(); + + KVStoreSync::write( + fs_store, + primary_namespace, + secondary_namespace, + key, + data_value.clone(), + ) + .unwrap(); + + current_data = Some(data_value); + }, + // Sync remove + 1 => { + KVStoreSync::remove(fs_store, primary_namespace, secondary_namespace, key, false) + .unwrap(); + + current_data = None; + }, + // Sync list + 2 => { + KVStoreSync::list(fs_store, primary_namespace, secondary_namespace).unwrap(); + }, + // Sync read + 3 => { + _ = KVStoreSync::read(fs_store, primary_namespace, secondary_namespace, key); + }, + // Async write. Bias writes a bit. + 4..=9 => { + let data_value = get_next_data_value(); + + let fut = KVStore::write( + fs_store, + primary_namespace, + secondary_namespace, + key, + data_value.clone(), + ); + + // Already set the current_data, even though writing hasn't finished yet. This supports the call-time + // ordering semantics. + current_data = Some(data_value); + + let handle = tokio::task::spawn(fut); + + // Store the handle to later await the result. + handles.push(handle); + }, + // Async remove + 10 | 11 => { + let lazy = v == 10; + let fut = + KVStore::remove(fs_store, primary_namespace, secondary_namespace, key, lazy); + + // Already set the current_data, even though writing hasn't finished yet. This supports the call-time + // ordering semantics. + current_data = None; + + let handle = tokio::task::spawn(fut); + handles.push(handle); + }, + // Join tasks. + 12 => { + for handle in handles.drain(..) { + let _ = handle.await.unwrap(); + } + }, + _ => unreachable!(), + } + + // If no more writes are pending, we can reliably see if the data is consistent. + if handles.is_empty() { + let data_value = + KVStoreSync::read(fs_store, primary_namespace, secondary_namespace, key).ok(); + assert_eq!(data_value, current_data); + + let list = KVStoreSync::list(fs_store, primary_namespace, secondary_namespace).unwrap(); + assert_eq!(list.is_empty(), current_data.is_none()); + + assert_eq!(0, fs_store.state_size()); + } + } + + // Always make sure that all async tasks are completed before returning. Otherwise the temporary storage dir could + // be removed, and then again recreated by unfinished tasks. + for handle in handles.drain(..) { + let _ = handle.await.unwrap(); + } +} + +/// Method that needs to be added manually, {name}_test +pub fn fs_store_test(data: &[u8], out: Out) { + do_test(data, out); +} + +/// Method that needs to be added manually, {name}_run +#[no_mangle] +pub extern "C" fn fs_store_run(data: *const u8, datalen: usize) { + do_test(unsafe { std::slice::from_raw_parts(data, datalen) }, test_logger::DevNull {}); +} diff --git a/fuzz/src/lib.rs b/fuzz/src/lib.rs index 58956fa6b3b..582fa346c54 100644 --- a/fuzz/src/lib.rs +++ b/fuzz/src/lib.rs @@ -9,6 +9,7 @@ extern crate bitcoin; extern crate lightning; +extern crate lightning_persister; extern crate lightning_rapid_gossip_sync; #[cfg(not(fuzzing))] @@ -45,4 +46,5 @@ pub mod router; pub mod static_invoice_deser; pub mod zbase32; +pub mod fs_store; pub mod msg_targets; diff --git a/fuzz/targets.h b/fuzz/targets.h index 99a88f59d5c..921439836af 100644 --- a/fuzz/targets.h +++ b/fuzz/targets.h @@ -21,6 +21,7 @@ void base32_run(const unsigned char* data, size_t data_len); void fromstr_to_netaddress_run(const unsigned char* data, size_t data_len); void feature_flags_run(const unsigned char* data, size_t data_len); void lsps_message_run(const unsigned char* data, size_t data_len); +void fs_store_run(const unsigned char* data, size_t data_len); void msg_accept_channel_run(const unsigned char* data, size_t data_len); void msg_announcement_signatures_run(const unsigned char* data, size_t data_len); void msg_channel_reestablish_run(const unsigned char* data, size_t data_len); diff --git a/lightning-persister/Cargo.toml b/lightning-persister/Cargo.toml index ec34fa8a88d..5bae5bc418f 100644 --- a/lightning-persister/Cargo.toml +++ b/lightning-persister/Cargo.toml @@ -16,6 +16,7 @@ rustdoc-args = ["--cfg", "docsrs"] [dependencies] bitcoin = "0.32.2" lightning = { version = "0.2.0", path = "../lightning" } +tokio = { version = "1.35", optional = true, default-features = false, features = ["rt-multi-thread"] } [target.'cfg(windows)'.dependencies] windows-sys = { version = "0.48.0", default-features = false, features = ["Win32_Storage_FileSystem", "Win32_Foundation"] } @@ -26,6 +27,7 @@ criterion = { version = "0.4", optional = true, default-features = false } [dev-dependencies] lightning = { version = "0.2.0", path = "../lightning", features = ["_test_utils"] } bitcoin = { version = "0.32.2", default-features = false } +tokio = { version = "1.35", default-features = false, features = ["macros"] } [lints] workspace = true diff --git a/lightning-persister/src/fs_store.rs b/lightning-persister/src/fs_store.rs index a9edb4e2e6f..9b15398d4d1 100644 --- a/lightning-persister/src/fs_store.rs +++ b/lightning-persister/src/fs_store.rs @@ -8,9 +8,16 @@ use std::collections::HashMap; use std::fs; use std::io::{Read, Write}; use std::path::{Path, PathBuf}; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, RwLock}; +#[cfg(feature = "tokio")] +use core::future::Future; +#[cfg(feature = "tokio")] +use core::pin::Pin; +#[cfg(feature = "tokio")] +use lightning::util::persist::KVStore; + #[cfg(target_os = "windows")] use {std::ffi::OsStr, std::os::windows::ffi::OsStrExt}; @@ -30,19 +37,28 @@ fn path_to_windows_str>(path: &T) -> Vec { path.as_ref().encode_wide().chain(Some(0)).collect() } -// The number of read/write/remove/list operations after which we clean up our `locks` HashMap. -const GC_LOCK_INTERVAL: usize = 25; - // The number of times we retry listing keys in `FilesystemStore::list` before we give up reaching // a consistent view and error out. const LIST_DIR_CONSISTENCY_RETRIES: usize = 10; -/// A [`KVStoreSync`] implementation that writes to and reads from the file system. -pub struct FilesystemStore { +struct FilesystemStoreInner { data_dir: PathBuf, tmp_file_counter: AtomicUsize, - gc_counter: AtomicUsize, - locks: Mutex>>>, + + // Per path lock that ensures that we don't have concurrent writes to the same file. The lock also encapsulates the + // latest written version per key. + locks: Mutex>>>, +} + +/// A [`KVStore`] and [`KVStoreSync`] implementation that writes to and reads from the file system. +/// +/// [`KVStore`]: lightning::util::persist::KVStore +pub struct FilesystemStore { + inner: Arc, + + // Version counter to ensure that writes are applied in the correct order. It is assumed that read and list + // operations aren't sensitive to the order of execution. + next_version: AtomicU64, } impl FilesystemStore { @@ -50,25 +66,94 @@ impl FilesystemStore { pub fn new(data_dir: PathBuf) -> Self { let locks = Mutex::new(HashMap::new()); let tmp_file_counter = AtomicUsize::new(0); - let gc_counter = AtomicUsize::new(1); - Self { data_dir, tmp_file_counter, gc_counter, locks } + Self { + inner: Arc::new(FilesystemStoreInner { data_dir, tmp_file_counter, locks }), + next_version: AtomicU64::new(1), + } } /// Returns the data directory. pub fn get_data_dir(&self) -> PathBuf { - self.data_dir.clone() + self.inner.data_dir.clone() } - fn garbage_collect_locks(&self) { - let gc_counter = self.gc_counter.fetch_add(1, Ordering::AcqRel); + fn get_new_version_and_lock_ref(&self, dest_file_path: PathBuf) -> (Arc>, u64) { + let version = self.next_version.fetch_add(1, Ordering::Relaxed); + if version == u64::MAX { + panic!("FilesystemStore version counter overflowed"); + } - if gc_counter % GC_LOCK_INTERVAL == 0 { - // Take outer lock for the cleanup. - let mut outer_lock = self.locks.lock().unwrap(); + // Get a reference to the inner lock. We do this early so that the arc can double as an in-flight counter for + // cleaning up unused locks. + let inner_lock_ref = self.inner.get_inner_lock_ref(dest_file_path); - // Garbage collect all lock entries that are not referenced anymore. - outer_lock.retain(|_, v| Arc::strong_count(&v) > 1); - } + (inner_lock_ref, version) + } + + #[cfg(any(all(feature = "tokio", test), fuzzing))] + /// Returns the size of the async state. + pub fn state_size(&self) -> usize { + let outer_lock = self.inner.locks.lock().unwrap(); + outer_lock.len() + } +} + +impl KVStoreSync for FilesystemStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Result, lightning::io::Error> { + let path = self.inner.get_checked_dest_file_path( + primary_namespace, + secondary_namespace, + Some(key), + "read", + )?; + self.inner.read(path) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> Result<(), lightning::io::Error> { + let path = self.inner.get_checked_dest_file_path( + primary_namespace, + secondary_namespace, + Some(key), + "write", + )?; + let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(path.clone()); + self.inner.write_version(inner_lock_ref, path, buf, version) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> Result<(), lightning::io::Error> { + let path = self.inner.get_checked_dest_file_path( + primary_namespace, + secondary_namespace, + Some(key), + "remove", + )?; + let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(path.clone()); + self.inner.remove_version(inner_lock_ref, path, lazy, version) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Result, lightning::io::Error> { + let path = self.inner.get_checked_dest_file_path( + primary_namespace, + secondary_namespace, + None, + "list", + )?; + self.inner.list(path) + } +} + +impl FilesystemStoreInner { + fn get_inner_lock_ref(&self, path: PathBuf) -> Arc> { + let mut outer_lock = self.locks.lock().unwrap(); + Arc::clone(&outer_lock.entry(path).or_default()) } fn get_dest_dir_path( @@ -94,42 +179,91 @@ impl FilesystemStore { Ok(dest_dir_path) } -} -impl KVStoreSync for FilesystemStore { - fn read( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, - ) -> lightning::io::Result> { - check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?; + fn get_checked_dest_file_path( + &self, primary_namespace: &str, secondary_namespace: &str, key: Option<&str>, + operation: &str, + ) -> lightning::io::Result { + check_namespace_key_validity(primary_namespace, secondary_namespace, key, operation)?; let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?; - dest_file_path.push(key); + if let Some(key) = key { + dest_file_path.push(key); + } + Ok(dest_file_path) + } + + fn read(&self, dest_file_path: PathBuf) -> lightning::io::Result> { let mut buf = Vec::new(); - { - let inner_lock_ref = { - let mut outer_lock = self.locks.lock().unwrap(); - Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default()) - }; - let _guard = inner_lock_ref.read().unwrap(); + self.execute_locked_read(dest_file_path.clone(), || { let mut f = fs::File::open(dest_file_path)?; f.read_to_end(&mut buf)?; - } - - self.garbage_collect_locks(); + Ok(()) + })?; Ok(buf) } - fn write( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, - ) -> lightning::io::Result<()> { - check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?; + fn execute_locked_write Result<(), lightning::io::Error>>( + &self, inner_lock_ref: Arc>, dest_file_path: PathBuf, version: u64, callback: F, + ) -> Result<(), lightning::io::Error> { + let res = { + let mut last_written_version = inner_lock_ref.write().unwrap(); - let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?; - dest_file_path.push(key); + // Check if we already have a newer version written/removed. This is used in async contexts to realize eventual + // consistency. + let is_stale_version = version <= *last_written_version; + + // If the version is not stale, we execute the callback. Otherwise we can and must skip writing. + if is_stale_version { + Ok(()) + } else { + callback().map(|_| { + *last_written_version = version; + }) + } + }; + + self.clean_locks(&inner_lock_ref, dest_file_path); + + res + } + + fn execute_locked_read Result<(), lightning::io::Error>>( + &self, dest_file_path: PathBuf, callback: F, + ) -> Result<(), lightning::io::Error> { + let inner_lock_ref = self.get_inner_lock_ref(dest_file_path.clone()); + let res = { + let _guard = inner_lock_ref.read().unwrap(); + callback() + }; + self.clean_locks(&inner_lock_ref, dest_file_path); + res + } + + fn clean_locks(&self, inner_lock_ref: &Arc>, dest_file_path: PathBuf) { + // If there no arcs in use elsewhere, this means that there are no in-flight writes. We can remove the map entry + // to prevent leaking memory. The two arcs that are expected are the one in the map and the one held here in + // inner_lock_ref. The outer lock is obtained first, to avoid a new arc being cloned after we've already + // counted. + let mut outer_lock = self.locks.lock().unwrap(); + + let strong_count = Arc::strong_count(&inner_lock_ref); + debug_assert!(strong_count >= 2, "Unexpected FilesystemStore strong count"); + if strong_count == 2 { + outer_lock.remove(&dest_file_path); + } + } + + /// Writes a specific version of a key to the filesystem. If a newer version has been written already, this function + /// returns early without writing. + fn write_version( + &self, inner_lock_ref: Arc>, dest_file_path: PathBuf, buf: Vec, + version: u64, + ) -> lightning::io::Result<()> { let parent_directory = dest_file_path.parent().ok_or_else(|| { let msg = format!("Could not retrieve parent directory of {}.", dest_file_path.display()); @@ -152,13 +286,7 @@ impl KVStoreSync for FilesystemStore { tmp_file.sync_all()?; } - let res = { - let inner_lock_ref = { - let mut outer_lock = self.locks.lock().unwrap(); - Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default()) - }; - let _guard = inner_lock_ref.write().unwrap(); - + self.execute_locked_write(inner_lock_ref, dest_file_path.clone(), version, || { #[cfg(not(target_os = "windows"))] { fs::rename(&tmp_file_path, &dest_file_path)?; @@ -202,31 +330,16 @@ impl KVStoreSync for FilesystemStore { Err(e) => Err(e.into()), } } - }; - - self.garbage_collect_locks(); - - res + }) } - fn remove( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + fn remove_version( + &self, inner_lock_ref: Arc>, dest_file_path: PathBuf, lazy: bool, version: u64, ) -> lightning::io::Result<()> { - check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "remove")?; - - let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?; - dest_file_path.push(key); - - if !dest_file_path.is_file() { - return Ok(()); - } - - { - let inner_lock_ref = { - let mut outer_lock = self.locks.lock().unwrap(); - Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default()) - }; - let _guard = inner_lock_ref.write().unwrap(); + self.execute_locked_write(inner_lock_ref, dest_file_path.clone(), version, || { + if !dest_file_path.is_file() { + return Ok(()); + } if lazy { // If we're lazy we just call remove and be done with it. @@ -297,20 +410,12 @@ impl KVStoreSync for FilesystemStore { fs::remove_file(trash_file_path).ok(); } } - } - - self.garbage_collect_locks(); - Ok(()) + Ok(()) + }) } - fn list( - &self, primary_namespace: &str, secondary_namespace: &str, - ) -> lightning::io::Result> { - check_namespace_key_validity(primary_namespace, secondary_namespace, None, "list")?; - - let prefixed_dest = self.get_dest_dir_path(primary_namespace, secondary_namespace)?; - + fn list(&self, prefixed_dest: PathBuf) -> lightning::io::Result> { if !Path::new(&prefixed_dest).exists() { return Ok(Vec::new()); } @@ -351,12 +456,104 @@ impl KVStoreSync for FilesystemStore { break 'retry_list; } - self.garbage_collect_locks(); - Ok(keys) } } +#[cfg(feature = "tokio")] +impl KVStore for FilesystemStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Pin, lightning::io::Error>> + 'static + Send>> { + let this = Arc::clone(&self.inner); + let path = match this.get_checked_dest_file_path( + primary_namespace, + secondary_namespace, + Some(key), + "read", + ) { + Ok(path) => path, + Err(e) => return Box::pin(async move { Err(e) }), + }; + + Box::pin(async move { + tokio::task::spawn_blocking(move || this.read(path)).await.unwrap_or_else(|e| { + Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)) + }) + }) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> Pin> + 'static + Send>> { + let this = Arc::clone(&self.inner); + let path = match this.get_checked_dest_file_path( + primary_namespace, + secondary_namespace, + Some(key), + "write", + ) { + Ok(path) => path, + Err(e) => return Box::pin(async move { Err(e) }), + }; + + let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(path.clone()); + Box::pin(async move { + tokio::task::spawn_blocking(move || { + this.write_version(inner_lock_ref, path, buf, version) + }) + .await + .unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))) + }) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> Pin> + 'static + Send>> { + let this = Arc::clone(&self.inner); + let path = match this.get_checked_dest_file_path( + primary_namespace, + secondary_namespace, + Some(key), + "remove", + ) { + Ok(path) => path, + Err(e) => return Box::pin(async move { Err(e) }), + }; + + let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(path.clone()); + Box::pin(async move { + tokio::task::spawn_blocking(move || { + this.remove_version(inner_lock_ref, path, lazy, version) + }) + .await + .unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))) + }) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Pin, lightning::io::Error>> + 'static + Send>> { + let this = Arc::clone(&self.inner); + + let path = match this.get_checked_dest_file_path( + primary_namespace, + secondary_namespace, + None, + "list", + ) { + Ok(path) => path, + Err(e) => return Box::pin(async move { Err(e) }), + }; + + Box::pin(async move { + tokio::task::spawn_blocking(move || this.list(path)).await.unwrap_or_else(|e| { + Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)) + }) + }) + } +} + fn dir_entry_is_key(dir_entry: &fs::DirEntry) -> Result { let p = dir_entry.path(); if let Some(ext) = p.extension() { @@ -447,7 +644,7 @@ fn get_key_from_dir_entry_path(p: &Path, base_path: &Path) -> Result Result, lightning::io::Error> { - let prefixed_dest = &self.data_dir; + let prefixed_dest = &self.inner.data_dir; if !prefixed_dest.exists() { return Ok(Vec::new()); } @@ -534,7 +731,7 @@ mod tests { fn drop(&mut self) { // We test for invalid directory names, so it's OK if directory removal // fails. - match fs::remove_dir_all(&self.data_dir) { + match fs::remove_dir_all(&self.inner.data_dir) { Err(e) => println!("Failed to remove test persister directory: {}", e), _ => {}, } @@ -549,6 +746,66 @@ mod tests { do_read_write_remove_list_persist(&fs_store); } + #[cfg(feature = "tokio")] + #[tokio::test] + async fn read_write_remove_list_persist_async() { + use crate::fs_store::FilesystemStore; + use lightning::util::persist::KVStore; + use std::sync::Arc; + + let mut temp_path = std::env::temp_dir(); + temp_path.push("test_read_write_remove_list_persist_async"); + let fs_store = Arc::new(FilesystemStore::new(temp_path)); + assert_eq!(fs_store.state_size(), 0); + + let async_fs_store: Arc = fs_store.clone(); + + let data1 = vec![42u8; 32]; + let data2 = vec![43u8; 32]; + + let primary_namespace = "testspace"; + let secondary_namespace = "testsubspace"; + let key = "testkey"; + + // Test writing the same key twice with different data. Execute the asynchronous part out of order to ensure + // that eventual consistency works. + let fut1 = async_fs_store.write(primary_namespace, secondary_namespace, key, data1); + assert_eq!(fs_store.state_size(), 1); + + let fut2 = async_fs_store.remove(primary_namespace, secondary_namespace, key, false); + assert_eq!(fs_store.state_size(), 1); + + let fut3 = async_fs_store.write(primary_namespace, secondary_namespace, key, data2.clone()); + assert_eq!(fs_store.state_size(), 1); + + fut3.await.unwrap(); + assert_eq!(fs_store.state_size(), 1); + + fut2.await.unwrap(); + assert_eq!(fs_store.state_size(), 1); + + fut1.await.unwrap(); + assert_eq!(fs_store.state_size(), 0); + + // Test list. + let listed_keys = + async_fs_store.list(primary_namespace, secondary_namespace).await.unwrap(); + assert_eq!(listed_keys.len(), 1); + assert_eq!(listed_keys[0], key); + + // Test read. We expect to read data2, as the write call was initiated later. + let read_data = + async_fs_store.read(primary_namespace, secondary_namespace, key).await.unwrap(); + assert_eq!(data2, &*read_data); + + // Test remove. + async_fs_store.remove(primary_namespace, secondary_namespace, key, false).await.unwrap(); + + let listed_keys = + async_fs_store.list(primary_namespace, secondary_namespace).await.unwrap(); + assert_eq!(listed_keys.len(), 0); + } + #[test] fn test_data_migration() { let mut source_temp_path = std::env::temp_dir();