diff --git a/Cargo.toml b/Cargo.toml index 5876efc48..51b0329c4 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,17 +29,17 @@ panic = 'abort' # Abort on panic default = [] [dependencies] -lightning = { version = "0.2.0-beta1", features = ["std"] } -lightning-types = { version = "0.3.0-beta1" } -lightning-invoice = { version = "0.34.0-beta1", features = ["std"] } -lightning-net-tokio = { version = "0.2.0-beta1" } -lightning-persister = { version = "0.2.0-beta1", features = ["tokio"] } -lightning-background-processor = { version = "0.2.0-beta1" } -lightning-rapid-gossip-sync = { version = "0.2.0-beta1" } -lightning-block-sync = { version = "0.2.0-beta1", features = ["rest-client", "rpc-client", "tokio"] } -lightning-transaction-sync = { version = "0.2.0-beta1", features = ["esplora-async-https", "time", "electrum-rustls-ring"] } -lightning-liquidity = { version = "0.2.0-beta1", features = ["std"] } -lightning-macros = { version = "0.2.0-beta1" } +lightning = { version = "0.2.0-rc1", features = ["std"] } +lightning-types = { version = "0.3.0-rc1" } +lightning-invoice = { version = "0.34.0-rc1", features = ["std"] } +lightning-net-tokio = { version = "0.2.0-rc1" } +lightning-persister = { version = "0.2.0-rc1", features = ["tokio"] } +lightning-background-processor = { version = "0.2.0-rc1" } +lightning-rapid-gossip-sync = { version = "0.2.0-rc1" } +lightning-block-sync = { version = "0.2.0-rc1", features = ["rest-client", "rpc-client", "tokio"] } +lightning-transaction-sync = { version = "0.2.0-rc1", features = ["esplora-async-https", "time", "electrum-rustls-ring"] } +lightning-liquidity = { version = "0.2.0-rc1", features = ["std"] } +lightning-macros = { version = "0.2.0-rc1" } #lightning = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main", features = ["std"] } #lightning-types = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main" } @@ -108,7 +108,7 @@ prost = { version = "0.11.6", default-features = false} winapi = { version = "0.3", features = ["winbase"] } [dev-dependencies] -lightning = { version = "0.2.0-beta1", features = ["std", "_test_utils"] } +lightning = { version = "0.2.0-rc1", features = ["std", "_test_utils"] } #lightning = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main", features = ["std", "_test_utils"] } #lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "21e9a9c0ef80021d0669f2a366f55d08ba8d9b03", features = ["std", "_test_utils"] } #lightning = { path = "../rust-lightning/lightning", features = ["std", "_test_utils"] } diff --git a/src/data_store.rs b/src/data_store.rs index ce4b294e0..83cbf4476 100644 --- a/src/data_store.rs +++ b/src/data_store.rs @@ -103,6 +103,7 @@ where &self.primary_namespace, &self.secondary_namespace, &store_key, + false, ) .map_err(|e| { log_error!( diff --git a/src/io/sqlite_store/mod.rs b/src/io/sqlite_store/mod.rs index c41df8ea0..789330cef 100644 --- a/src/io/sqlite_store/mod.rs +++ b/src/io/sqlite_store/mod.rs @@ -137,7 +137,7 @@ impl KVStore for SqliteStore { } fn remove( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool, ) -> Pin> + Send>> { let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key); let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone()); @@ -205,7 +205,7 @@ impl KVStoreSync for SqliteStore { } fn remove( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool, ) -> io::Result<()> { let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key); let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone()); diff --git a/src/io/test_utils.rs b/src/io/test_utils.rs index 59ad09458..059e66aee 100644 --- a/src/io/test_utils.rs +++ b/src/io/test_utils.rs @@ -68,7 +68,7 @@ pub(crate) fn do_read_write_remove_list_persist( let read_data = kv_store.read(primary_namespace, secondary_namespace, key).unwrap(); assert_eq!(data, &*read_data); - kv_store.remove(primary_namespace, secondary_namespace, key).unwrap(); + kv_store.remove(primary_namespace, secondary_namespace, key, false).unwrap(); let listed_keys = kv_store.list(primary_namespace, secondary_namespace).unwrap(); assert_eq!(listed_keys.len(), 0); @@ -84,7 +84,7 @@ pub(crate) fn do_read_write_remove_list_persist( let read_data = kv_store.read(&max_chars, &max_chars, &max_chars).unwrap(); assert_eq!(data, &*read_data); - kv_store.remove(&max_chars, &max_chars, &max_chars).unwrap(); + kv_store.remove(&max_chars, &max_chars, &max_chars, false).unwrap(); let listed_keys = kv_store.list(&max_chars, &max_chars).unwrap(); assert_eq!(listed_keys.len(), 0); diff --git a/src/io/vss_store.rs b/src/io/vss_store.rs index ed8e13890..229236c0c 100644 --- a/src/io/vss_store.rs +++ b/src/io/vss_store.rs @@ -15,6 +15,7 @@ use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; +use bdk_chain::Merge; use bitcoin::hashes::{sha256, Hash, HashEngine, Hmac, HmacEngine}; use lightning::io::{self, Error, ErrorKind}; use lightning::util::persist::{KVStore, KVStoreSync}; @@ -181,7 +182,7 @@ impl KVStoreSync for VssStore { } fn remove( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, ) -> io::Result<()> { let internal_runtime = self.internal_runtime.as_ref().ok_or_else(|| { debug_assert!(false, "Failed to access internal runtime"); @@ -203,6 +204,7 @@ impl KVStoreSync for VssStore { primary_namespace, secondary_namespace, key, + lazy, ) .await }; @@ -275,7 +277,7 @@ impl KVStore for VssStore { }) } fn remove( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, ) -> Pin> + Send>> { let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key); let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone()); @@ -292,6 +294,7 @@ impl KVStore for VssStore { primary_namespace, secondary_namespace, key, + lazy, ) .await }) @@ -321,6 +324,7 @@ struct VssStoreInner { // Per-key locks that ensures that we don't have concurrent writes to the same namespace/key. // The lock also encapsulates the latest written version per key. locks: Mutex>>>, + pending_lazy_deletes: Mutex>, } impl VssStoreInner { @@ -347,7 +351,8 @@ impl VssStoreInner { let client = VssClient::new_with_headers(base_url, retry_policy, header_provider); let locks = Mutex::new(HashMap::new()); - Self { client, store_id, storable_builder, key_obfuscator, locks } + let pending_lazy_deletes = Mutex::new(Vec::new()); + Self { client, store_id, storable_builder, key_obfuscator, locks, pending_lazy_deletes } } fn get_inner_lock_ref(&self, locking_key: String) -> Arc> { @@ -451,6 +456,12 @@ impl VssStoreInner { "write", )?; + let delete_items = self + .pending_lazy_deletes + .try_lock() + .ok() + .and_then(|mut guard| guard.take()) + .unwrap_or_default(); self.execute_locked_write(inner_lock_ref, locking_key, version, async move || { let obfuscated_key = self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key); @@ -464,7 +475,7 @@ impl VssStoreInner { version: vss_version, value: storable.encode_to_vec(), }], - delete_items: vec![], + delete_items, }; self.client.put_object(&request).await.map_err(|e| { @@ -482,7 +493,7 @@ impl VssStoreInner { async fn remove_internal( &self, inner_lock_ref: Arc>, locking_key: String, version: u64, - primary_namespace: String, secondary_namespace: String, key: String, + primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, ) -> io::Result<()> { check_namespace_key_validity( &primary_namespace, @@ -491,13 +502,19 @@ impl VssStoreInner { "remove", )?; + let obfuscated_key = + self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key); + + let key_value = KeyValue { key: obfuscated_key, version: -1, value: vec![] }; + if lazy { + let mut pending_lazy_deletes = self.pending_lazy_deletes.lock().unwrap(); + pending_lazy_deletes.push(key_value); + return Ok(()); + } + self.execute_locked_write(inner_lock_ref, locking_key, version, async move || { - let obfuscated_key = - self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key); - let request = DeleteObjectRequest { - store_id: self.store_id.clone(), - key_value: Some(KeyValue { key: obfuscated_key, version: -1, value: vec![] }), - }; + let request = + DeleteObjectRequest { store_id: self.store_id.clone(), key_value: Some(key_value) }; self.client.delete_object(&request).await.map_err(|e| { let msg = format!( @@ -644,4 +661,87 @@ mod tests { do_read_write_remove_list_persist(&vss_store); drop(vss_store) } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn vss_lazy_delete() { + let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap(); + let mut rng = thread_rng(); + let rand_store_id: String = (0..7).map(|_| rng.sample(Alphanumeric) as char).collect(); + let mut vss_seed = [0u8; 32]; + rng.fill_bytes(&mut vss_seed); + let header_provider = Arc::new(FixedHeaders::new(HashMap::new())); + let logger = Arc::new(Logger::new_log_facade()); + let runtime = Arc::new(Runtime::new(logger).unwrap()); + let vss_store = + VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider, runtime); + + let primary_namespace = "test_namespace"; + let secondary_namespace = ""; + let key_to_delete = "key_to_delete"; + let key_for_trigger = "key_for_trigger"; + let data_to_delete = b"data_to_delete".to_vec(); + let trigger_data = b"trigger_data".to_vec(); + + // Write the key that we'll later lazily delete + KVStore::write( + &vss_store, + primary_namespace, + secondary_namespace, + key_to_delete, + data_to_delete.clone(), + ) + .await + .unwrap(); + + // Verify the key exists + let read_data = + KVStore::read(&vss_store, primary_namespace, secondary_namespace, key_to_delete) + .await + .unwrap(); + assert_eq!(read_data, data_to_delete); + + // Perform a lazy delete + KVStore::remove(&vss_store, primary_namespace, secondary_namespace, key_to_delete, true) + .await + .unwrap(); + + // Verify the key still exists (lazy delete doesn't immediately remove it) + let read_data = + KVStore::read(&vss_store, primary_namespace, secondary_namespace, key_to_delete) + .await + .unwrap(); + assert_eq!(read_data, data_to_delete); + + // Verify the key is still in the list + let keys = KVStore::list(&vss_store, primary_namespace, secondary_namespace).await.unwrap(); + assert!(keys.contains(&key_to_delete.to_string())); + + // Trigger the actual deletion by performing a write operation + KVStore::write( + &vss_store, + primary_namespace, + secondary_namespace, + key_for_trigger, + trigger_data.clone(), + ) + .await + .unwrap(); + + // Now verify the key is actually deleted + let read_result = + KVStore::read(&vss_store, primary_namespace, secondary_namespace, key_to_delete).await; + assert!(read_result.is_err()); + assert_eq!(read_result.unwrap_err().kind(), ErrorKind::NotFound); + + // Verify the key is no longer in the list + let keys = KVStore::list(&vss_store, primary_namespace, secondary_namespace).await.unwrap(); + assert!(!keys.contains(&key_to_delete.to_string())); + + // Verify the trigger key still exists + let read_data = + KVStore::read(&vss_store, primary_namespace, secondary_namespace, key_for_trigger) + .await + .unwrap(); + assert_eq!(read_data, trigger_data); + } } diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 05326b03d..058a8df19 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -1244,14 +1244,14 @@ impl KVStore for TestSyncStore { }) } fn remove( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, ) -> Pin> + Send>> { let primary_namespace = primary_namespace.to_string(); let secondary_namespace = secondary_namespace.to_string(); let key = key.to_string(); let inner = Arc::clone(&self.inner); let fut = tokio::task::spawn_blocking(move || { - inner.remove_internal(&primary_namespace, &secondary_namespace, &key) + inner.remove_internal(&primary_namespace, &secondary_namespace, &key, lazy) }); Box::pin(async move { fut.await.unwrap_or_else(|e| { @@ -1292,9 +1292,9 @@ impl KVStoreSync for TestSyncStore { } fn remove( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, ) -> lightning::io::Result<()> { - self.inner.remove_internal(primary_namespace, secondary_namespace, key) + self.inner.remove_internal(primary_namespace, secondary_namespace, key, lazy) } fn list( @@ -1432,15 +1432,25 @@ impl TestSyncStoreInner { } fn remove_internal( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, ) -> lightning::io::Result<()> { let _guard = self.serializer.write().unwrap(); let fs_res = - KVStoreSync::remove(&self.fs_store, primary_namespace, secondary_namespace, key); - let sqlite_res = - KVStoreSync::remove(&self.sqlite_store, primary_namespace, secondary_namespace, key); - let test_res = - KVStoreSync::remove(&self.test_store, primary_namespace, secondary_namespace, key); + KVStoreSync::remove(&self.fs_store, primary_namespace, secondary_namespace, key, lazy); + let sqlite_res = KVStoreSync::remove( + &self.sqlite_store, + primary_namespace, + secondary_namespace, + key, + lazy, + ); + let test_res = KVStoreSync::remove( + &self.test_store, + primary_namespace, + secondary_namespace, + key, + lazy, + ); assert!(!self .do_list(primary_namespace, secondary_namespace)