diff --git a/Cargo.toml b/Cargo.toml index 544dfca08..d02b88bbf 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,7 +65,7 @@ serde = { version = "1.0.210", default-features = false, features = ["std", "der serde_json = { version = "1.0.128", default-features = false, features = ["std"] } log = { version = "0.4.22", default-features = false, features = ["std"]} -vss-client = "0.3" +vss-client-ng = "0.4.0" prost = { version = "0.11.6", default-features = false} [target.'cfg(windows)'.dependencies] @@ -151,3 +151,6 @@ harness = false #lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "21e9a9c0ef80021d0669f2a366f55d08ba8d9b03" } #lightning-liquidity = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "21e9a9c0ef80021d0669f2a366f55d08ba8d9b03" } #lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "21e9a9c0ef80021d0669f2a366f55d08ba8d9b03" } + +#vss-client-ng = { path = "../vss-client" } +#vss-client-ng = { git = "https://github.com/lightningdevkit/vss-client", branch = "main" } diff --git a/src/builder.rs b/src/builder.rs index c0e39af7a..a15697c91 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -39,7 +39,7 @@ use lightning::util::persist::{ use lightning::util::ser::ReadableArgs; use lightning::util::sweep::OutputSweeper; use lightning_persister::fs_store::FilesystemStore; -use vss_client::headers::{FixedHeaders, LnurlAuthToJwtProvider, VssHeaderProvider}; +use vss_client_ng::headers::{FixedHeaders, LnurlAuthToJwtProvider, VssHeaderProvider}; use crate::chain::ChainSource; use crate::config::{ @@ -732,7 +732,11 @@ impl NodeBuilder { let vss_seed_bytes: [u8; 32] = vss_xprv.private_key.secret_bytes(); let vss_store = - VssStore::new(vss_url, store_id, vss_seed_bytes, header_provider, Arc::clone(&runtime)); + VssStore::new(vss_url, store_id, vss_seed_bytes, header_provider).map_err(|e| { + log_error!(logger, "Failed to setup VSS store: {}", e); + BuildError::KVStoreSetupFailed + })?; + build_with_store_internal( config, self.chain_data_source_config.as_ref(), diff --git a/src/ffi/types.rs b/src/ffi/types.rs index 3c88a665f..e99cdc230 100644 --- a/src/ffi/types.rs +++ b/src/ffi/types.rs @@ -40,7 +40,7 @@ pub use lightning_liquidity::lsps1::msgs::{ }; pub use lightning_types::payment::{PaymentHash, PaymentPreimage, PaymentSecret}; pub use lightning_types::string::UntrustedString; -pub use vss_client::headers::{VssHeaderProvider, VssHeaderProviderError}; +pub use vss_client_ng::headers::{VssHeaderProvider, VssHeaderProviderError}; use crate::builder::sanitize_alias; pub use crate::config::{ diff --git a/src/io/vss_store.rs b/src/io/vss_store.rs index 0e7d0872a..f34a303c3 100644 --- a/src/io/vss_store.rs +++ b/src/io/vss_store.rs @@ -16,26 +16,27 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; use bitcoin::hashes::{sha256, Hash, HashEngine, Hmac, HmacEngine}; +use lightning::impl_writeable_tlv_based_enum; use lightning::io::{self, Error, ErrorKind}; use lightning::util::persist::{KVStore, KVStoreSync}; +use lightning::util::ser::{Readable, Writeable}; use prost::Message; use rand::RngCore; -use vss_client::client::VssClient; -use vss_client::error::VssError; -use vss_client::headers::VssHeaderProvider; -use vss_client::types::{ +use vss_client_ng::client::VssClient; +use vss_client_ng::error::VssError; +use vss_client_ng::headers::VssHeaderProvider; +use vss_client_ng::types::{ DeleteObjectRequest, GetObjectRequest, KeyValue, ListKeyVersionsRequest, PutObjectRequest, Storable, }; -use vss_client::util::key_obfuscator::KeyObfuscator; -use vss_client::util::retry::{ +use vss_client_ng::util::key_obfuscator::KeyObfuscator; +use vss_client_ng::util::retry::{ ExponentialBackoffRetryPolicy, FilteredRetryPolicy, JitteredRetryPolicy, MaxAttemptsRetryPolicy, MaxTotalDelayRetryPolicy, RetryPolicy, }; -use vss_client::util::storable_builder::{EntropySource, StorableBuilder}; +use vss_client_ng::util::storable_builder::{EntropySource, StorableBuilder}; use crate::io::utils::check_namespace_key_validity; -use crate::runtime::Runtime; type CustomRetryPolicy = FilteredRetryPolicy< JitteredRetryPolicy< @@ -44,10 +45,27 @@ type CustomRetryPolicy = FilteredRetryPolicy< Box bool + 'static + Send + Sync>, >; +#[derive(Debug, PartialEq)] +enum VssSchemaVersion { + // The initial schema version. + // This used an empty `aad` and unobfuscated `primary_namespace`/`secondary_namespace`s in the + // stored key. + V0, + // The second deployed schema version. + // Here we started to obfuscate the primary and secondary namespaces and the obfuscated `store_key` (`obfuscate(primary_namespace)#obfuscate(secondary_namespace)#obfuscate(key)`) is now used as `aad` for encryption, ensuring that the encrypted blobs commit to the key they're stored under. + V1, +} + +impl_writeable_tlv_based_enum!(VssSchemaVersion, + (0, V0) => {}, + (1, V1) => {}, +); + +const VSS_SCHEMA_VERSION_KEY: &str = "vss_schema_version"; + // We set this to a small number of threads that would still allow to make some progress if one // would hit a blocking case const INTERNAL_RUNTIME_WORKERS: usize = 2; -const VSS_IO_TIMEOUT: Duration = Duration::from_secs(5); /// A [`KVStoreSync`] implementation that writes to and reads from a [VSS](https://github.com/lightningdevkit/vss-server/blob/main/README.md) backend. pub struct VssStore { @@ -55,7 +73,6 @@ pub struct VssStore { // Version counter to ensure that writes are applied in the correct order. It is assumed that read and list // operations aren't sensitive to the order of execution. next_version: AtomicU64, - runtime: Arc, // A VSS-internal runtime we use to avoid any deadlocks we could hit when waiting on a spawned // blocking task to finish while the blocked thread had acquired the reactor. In particular, // this works around a previously-hit case where a concurrent call to @@ -68,25 +85,61 @@ pub struct VssStore { impl VssStore { pub(crate) fn new( base_url: String, store_id: String, vss_seed: [u8; 32], - header_provider: Arc, runtime: Arc, - ) -> Self { - let inner = Arc::new(VssStoreInner::new(base_url, store_id, vss_seed, header_provider)); + header_provider: Arc, + ) -> io::Result { let next_version = AtomicU64::new(1); - let internal_runtime = Some( - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .thread_name_fn(|| { - static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); - let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); - format!("ldk-node-vss-runtime-{}", id) - }) - .worker_threads(INTERNAL_RUNTIME_WORKERS) - .max_blocking_threads(INTERNAL_RUNTIME_WORKERS) - .build() - .unwrap(), + let internal_runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .thread_name_fn(|| { + static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); + let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); + format!("ldk-node-vss-runtime-{}", id) + }) + .worker_threads(INTERNAL_RUNTIME_WORKERS) + .max_blocking_threads(INTERNAL_RUNTIME_WORKERS) + .build() + .unwrap(); + + let (data_encryption_key, obfuscation_master_key) = + derive_data_encryption_and_obfuscation_keys(&vss_seed); + let key_obfuscator = KeyObfuscator::new(obfuscation_master_key); + + let sync_retry_policy = retry_policy(); + let blocking_client = VssClient::new_with_headers( + base_url.clone(), + sync_retry_policy, + header_provider.clone(), ); - Self { inner, next_version, runtime, internal_runtime } + let runtime_handle = internal_runtime.handle(); + let schema_store_id = store_id.clone(); + let schema_key_obfuscator = KeyObfuscator::new(obfuscation_master_key); + let (schema_version, blocking_client) = tokio::task::block_in_place(move || { + runtime_handle.block_on(async { + determine_and_write_schema_version( + blocking_client, + schema_store_id, + data_encryption_key, + schema_key_obfuscator, + ) + .await + }) + })?; + + let async_retry_policy = retry_policy(); + let async_client = + VssClient::new_with_headers(base_url, async_retry_policy, header_provider); + + let inner = Arc::new(VssStoreInner::new( + schema_version, + blocking_client, + async_client, + store_id, + data_encryption_key, + key_obfuscator, + )); + + Ok(Self { inner, next_version, internal_runtime: Some(internal_runtime) }) } // Same logic as for the obfuscated keys below, but just for locking, using the plaintext keys @@ -129,17 +182,12 @@ impl KVStoreSync for VssStore { let secondary_namespace = secondary_namespace.to_string(); let key = key.to_string(); let inner = Arc::clone(&self.inner); - let fut = - async move { inner.read_internal(primary_namespace, secondary_namespace, key).await }; - // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always - // times out. - let spawned_fut = internal_runtime.spawn(async move { - tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| { - let msg = "VssStore::read timed out"; - Error::new(ErrorKind::Other, msg) - }) - }); - self.runtime.block_on(spawned_fut).expect("We should always finish")? + let fut = async move { + inner + .read_internal(&inner.blocking_client, primary_namespace, secondary_namespace, key) + .await + }; + tokio::task::block_in_place(move || internal_runtime.block_on(fut)) } fn write( @@ -159,6 +207,7 @@ impl KVStoreSync for VssStore { let fut = async move { inner .write_internal( + &inner.blocking_client, inner_lock_ref, locking_key, version, @@ -169,15 +218,7 @@ impl KVStoreSync for VssStore { ) .await }; - // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always - // times out. - let spawned_fut = internal_runtime.spawn(async move { - tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| { - let msg = "VssStore::write timed out"; - Error::new(ErrorKind::Other, msg) - }) - }); - self.runtime.block_on(spawned_fut).expect("We should always finish")? + tokio::task::block_in_place(move || internal_runtime.block_on(fut)) } fn remove( @@ -197,6 +238,7 @@ impl KVStoreSync for VssStore { let fut = async move { inner .remove_internal( + &inner.blocking_client, inner_lock_ref, locking_key, version, @@ -206,15 +248,7 @@ impl KVStoreSync for VssStore { ) .await }; - // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always - // times out. - let spawned_fut = internal_runtime.spawn(async move { - tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| { - let msg = "VssStore::remove timed out"; - Error::new(ErrorKind::Other, msg) - }) - }); - self.runtime.block_on(spawned_fut).expect("We should always finish")? + tokio::task::block_in_place(move || internal_runtime.block_on(fut)) } fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { @@ -226,16 +260,12 @@ impl KVStoreSync for VssStore { let primary_namespace = primary_namespace.to_string(); let secondary_namespace = secondary_namespace.to_string(); let inner = Arc::clone(&self.inner); - let fut = async move { inner.list_internal(primary_namespace, secondary_namespace).await }; - // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always - // times out. - let spawned_fut = internal_runtime.spawn(async move { - tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| { - let msg = "VssStore::list timed out"; - Error::new(ErrorKind::Other, msg) - }) - }); - self.runtime.block_on(spawned_fut).expect("We should always finish")? + let fut = async move { + inner + .list_internal(&inner.blocking_client, primary_namespace, secondary_namespace) + .await + }; + tokio::task::block_in_place(move || internal_runtime.block_on(fut)) } } @@ -247,9 +277,11 @@ impl KVStore for VssStore { let secondary_namespace = secondary_namespace.to_string(); let key = key.to_string(); let inner = Arc::clone(&self.inner); - Box::pin( - async move { inner.read_internal(primary_namespace, secondary_namespace, key).await }, - ) + Box::pin(async move { + inner + .read_internal(&inner.async_client, primary_namespace, secondary_namespace, key) + .await + }) } fn write( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, @@ -263,6 +295,7 @@ impl KVStore for VssStore { Box::pin(async move { inner .write_internal( + &inner.async_client, inner_lock_ref, locking_key, version, @@ -286,6 +319,7 @@ impl KVStore for VssStore { Box::pin(async move { inner .remove_internal( + &inner.async_client, inner_lock_ref, locking_key, version, @@ -302,7 +336,9 @@ impl KVStore for VssStore { let primary_namespace = primary_namespace.to_string(); let secondary_namespace = secondary_namespace.to_string(); let inner = Arc::clone(&self.inner); - Box::pin(async move { inner.list_internal(primary_namespace, secondary_namespace).await }) + Box::pin(async move { + inner.list_internal(&inner.async_client, primary_namespace, secondary_namespace).await + }) } } @@ -314,9 +350,13 @@ impl Drop for VssStore { } struct VssStoreInner { - client: VssClient, + schema_version: VssSchemaVersion, + blocking_client: VssClient, + // A secondary client that will only be used for async persistence via `KVStore`, to ensure TCP + // connections aren't shared between our outer and the internal runtime. + async_client: VssClient, store_id: String, - storable_builder: StorableBuilder, + data_encryption_key: [u8; 32], key_obfuscator: KeyObfuscator, // Per-key locks that ensures that we don't have concurrent writes to the same namespace/key. // The lock also encapsulates the latest written version per key. @@ -325,29 +365,20 @@ struct VssStoreInner { impl VssStoreInner { pub(crate) fn new( - base_url: String, store_id: String, vss_seed: [u8; 32], - header_provider: Arc, + schema_version: VssSchemaVersion, blocking_client: VssClient, + async_client: VssClient, store_id: String, + data_encryption_key: [u8; 32], key_obfuscator: KeyObfuscator, ) -> Self { - let (data_encryption_key, obfuscation_master_key) = - derive_data_encryption_and_obfuscation_keys(&vss_seed); - let key_obfuscator = KeyObfuscator::new(obfuscation_master_key); - let storable_builder = StorableBuilder::new(data_encryption_key, RandEntropySource); - let retry_policy = ExponentialBackoffRetryPolicy::new(Duration::from_millis(10)) - .with_max_attempts(10) - .with_max_total_delay(Duration::from_secs(15)) - .with_max_jitter(Duration::from_millis(10)) - .skip_retry_on_error(Box::new(|e: &VssError| { - matches!( - e, - VssError::NoSuchKeyError(..) - | VssError::InvalidRequestError(..) - | VssError::ConflictError(..) - ) - }) as _); - - let client = VssClient::new_with_headers(base_url, retry_policy, header_provider); let locks = Mutex::new(HashMap::new()); - Self { client, store_id, storable_builder, key_obfuscator, locks } + Self { + schema_version, + blocking_client, + async_client, + store_id, + data_encryption_key, + key_obfuscator, + locks, + } } fn get_inner_lock_ref(&self, locking_key: String) -> Arc> { @@ -358,11 +389,35 @@ impl VssStoreInner { fn build_obfuscated_key( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> String { - let obfuscated_key = self.key_obfuscator.obfuscate(key); - if primary_namespace.is_empty() { - obfuscated_key + if self.schema_version == VssSchemaVersion::V1 { + let obfuscated_primary_namepace = self.key_obfuscator.obfuscate(primary_namespace); + let obfuscated_secondary_namepace = self.key_obfuscator.obfuscate(secondary_namespace); + let obfuscated_key = self.key_obfuscator.obfuscate(key); + format!( + "{}#{}#{}", + obfuscated_primary_namepace, obfuscated_secondary_namepace, obfuscated_key + ) } else { - format!("{}#{}#{}", primary_namespace, secondary_namespace, obfuscated_key) + // Default to V0 schema + let obfuscated_key = self.key_obfuscator.obfuscate(key); + if primary_namespace.is_empty() { + obfuscated_key + } else { + format!("{}#{}#{}", primary_namespace, secondary_namespace, obfuscated_key) + } + } + } + + fn build_obfuscated_prefix( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> String { + if self.schema_version == VssSchemaVersion::V1 { + let obfuscated_primary_namepace = self.key_obfuscator.obfuscate(primary_namespace); + let obfuscated_secondary_namepace = self.key_obfuscator.obfuscate(secondary_namespace); + format!("{}#{}", obfuscated_primary_namepace, obfuscated_secondary_namepace,) + } else { + // Default to V0 schema + format!("{}#{}", primary_namespace, secondary_namespace) } } @@ -379,11 +434,12 @@ impl VssStoreInner { } async fn list_all_keys( - &self, primary_namespace: &str, secondary_namespace: &str, + &self, client: &VssClient, primary_namespace: &str, + secondary_namespace: &str, ) -> io::Result> { let mut page_token = None; let mut keys = vec![]; - let key_prefix = format!("{}#{}", primary_namespace, secondary_namespace); + let key_prefix = self.build_obfuscated_prefix(primary_namespace, secondary_namespace); while page_token != Some("".to_string()) { let request = ListKeyVersionsRequest { store_id: self.store_id.clone(), @@ -392,7 +448,7 @@ impl VssStoreInner { page_size: None, }; - let response = self.client.list_key_versions(&request).await.map_err(|e| { + let response = client.list_key_versions(&request).await.map_err(|e| { let msg = format!( "Failed to list keys in {}/{}: {}", primary_namespace, secondary_namespace, e @@ -409,14 +465,14 @@ impl VssStoreInner { } async fn read_internal( - &self, primary_namespace: String, secondary_namespace: String, key: String, + &self, client: &VssClient, primary_namespace: String, + secondary_namespace: String, key: String, ) -> io::Result> { check_namespace_key_validity(&primary_namespace, &secondary_namespace, Some(&key), "read")?; - let obfuscated_key = - self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key); - let request = GetObjectRequest { store_id: self.store_id.clone(), key: obfuscated_key }; - let resp = self.client.get_object(&request).await.map_err(|e| { + let store_key = self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key); + let request = GetObjectRequest { store_id: self.store_id.clone(), key: store_key.clone() }; + let resp = client.get_object(&request).await.map_err(|e| { let msg = format!( "Failed to read from key {}/{}/{}: {}", primary_namespace, secondary_namespace, key, e @@ -437,12 +493,17 @@ impl VssStoreInner { Error::new(ErrorKind::Other, msg) })?; - Ok(self.storable_builder.deconstruct(storable)?.0) + let storable_builder = StorableBuilder::new(RandEntropySource); + let aad = + if self.schema_version == VssSchemaVersion::V1 { store_key.as_bytes() } else { &[] }; + let decrypted = storable_builder.deconstruct(storable, &self.data_encryption_key, aad)?.0; + Ok(decrypted) } async fn write_internal( - &self, inner_lock_ref: Arc>, locking_key: String, version: u64, - primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + &self, client: &VssClient, inner_lock_ref: Arc>, + locking_key: String, version: u64, primary_namespace: String, secondary_namespace: String, + key: String, buf: Vec, ) -> io::Result<()> { check_namespace_key_validity( &primary_namespace, @@ -451,26 +512,31 @@ impl VssStoreInner { "write", )?; - self.execute_locked_write(inner_lock_ref, locking_key, version, async move || { - let obfuscated_key = - self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key); - let vss_version = -1; - let storable = self.storable_builder.build(buf, vss_version); - let request = PutObjectRequest { - store_id: self.store_id.clone(), - global_version: None, - transaction_items: vec![KeyValue { - key: obfuscated_key, - version: vss_version, - value: storable.encode_to_vec(), - }], - delete_items: vec![], - }; + let store_key = self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key); + let cnt = store_key.chars().count(); + let len = store_key.len(); + let vss_version = -1; + let storable_builder = StorableBuilder::new(RandEntropySource); + let aad = + if self.schema_version == VssSchemaVersion::V1 { store_key.as_bytes() } else { &[] }; + let storable = + storable_builder.build(buf.to_vec(), vss_version, &self.data_encryption_key, aad); + let request = PutObjectRequest { + store_id: self.store_id.clone(), + global_version: None, + transaction_items: vec![KeyValue { + key: store_key, + version: vss_version, + value: storable.encode_to_vec(), + }], + delete_items: vec![], + }; - self.client.put_object(&request).await.map_err(|e| { + self.execute_locked_write(inner_lock_ref, locking_key, version, async move || { + client.put_object(&request).await.map_err(|e| { let msg = format!( - "Failed to write to key {}/{}/{}: {}", - primary_namespace, secondary_namespace, key, e + "Failed to write to key {}/{}/{}, obfs len {}, cnt {}: {}", + primary_namespace, secondary_namespace, key, len, cnt, e ); Error::new(ErrorKind::Other, msg) })?; @@ -481,8 +547,9 @@ impl VssStoreInner { } async fn remove_internal( - &self, inner_lock_ref: Arc>, locking_key: String, version: u64, - primary_namespace: String, secondary_namespace: String, key: String, + &self, client: &VssClient, inner_lock_ref: Arc>, + locking_key: String, version: u64, primary_namespace: String, secondary_namespace: String, + key: String, ) -> io::Result<()> { check_namespace_key_validity( &primary_namespace, @@ -499,7 +566,7 @@ impl VssStoreInner { key_value: Some(KeyValue { key: obfuscated_key, version: -1, value: vec![] }), }; - self.client.delete_object(&request).await.map_err(|e| { + client.delete_object(&request).await.map_err(|e| { let msg = format!( "Failed to delete key {}/{}/{}: {}", primary_namespace, secondary_namespace, key, e @@ -513,12 +580,15 @@ impl VssStoreInner { } async fn list_internal( - &self, primary_namespace: String, secondary_namespace: String, + &self, client: &VssClient, primary_namespace: String, + secondary_namespace: String, ) -> io::Result> { check_namespace_key_validity(&primary_namespace, &secondary_namespace, None, "list")?; - let keys = - self.list_all_keys(&primary_namespace, &secondary_namespace).await.map_err(|e| { + let keys = self + .list_all_keys(client, &primary_namespace, &secondary_namespace) + .await + .map_err(|e| { let msg = format!( "Failed to retrieve keys in namespace: {}/{} : {}", primary_namespace, secondary_namespace, e @@ -587,6 +657,133 @@ fn derive_data_encryption_and_obfuscation_keys(vss_seed: &[u8; 32]) -> ([u8; 32] (k1, k2) } +fn retry_policy() -> CustomRetryPolicy { + ExponentialBackoffRetryPolicy::new(Duration::from_millis(10)) + .with_max_attempts(100) + .with_max_total_delay(Duration::from_secs(60)) + .with_max_jitter(Duration::from_millis(50)) + .skip_retry_on_error(Box::new(|e: &VssError| { + matches!( + e, + VssError::NoSuchKeyError(..) + | VssError::InvalidRequestError(..) + | VssError::ConflictError(..) + ) + }) as _) +} + +// FIXME: This returns the used client as currently `VssClient`'s `RetryPolicy`s aren't `Clone`. So +// we're forced to take the owned client and return it to be able to reuse the same connection +// later. +async fn determine_and_write_schema_version( + client: VssClient, store_id: String, data_encryption_key: [u8; 32], + key_obfuscator: KeyObfuscator, +) -> io::Result<(VssSchemaVersion, VssClient)> { + // Build the obfuscated `vss_schema_version` key. + let obfuscated_primary_namepace = key_obfuscator.obfuscate(""); + let obfuscated_secondary_namepace = key_obfuscator.obfuscate(""); + let obfuscated_key = key_obfuscator.obfuscate(VSS_SCHEMA_VERSION_KEY); + let store_key = format!( + "{}#{}#{}", + obfuscated_primary_namepace, obfuscated_secondary_namepace, obfuscated_key + ); + + // Try to read the stored schema version. + let request = GetObjectRequest { store_id: store_id.clone(), key: store_key.clone() }; + let resp = match client.get_object(&request).await { + Ok(resp) => Some(resp), + Err(VssError::NoSuchKeyError(..)) => { + // The value is not set. + None + }, + Err(e) => { + let msg = format!("Failed to read schema version: {}", e); + return Err(Error::new(ErrorKind::Other, msg)); + }, + }; + + if let Some(resp) = resp { + // The schema version was present, so just decrypt the stored data. + + // unwrap safety: resp.value must be always present for a non-erroneous VSS response, otherwise + // it is an API-violation which is converted to [`VssError::InternalServerError`] in [`VssClient`] + let storable = Storable::decode(&resp.value.unwrap().value[..]).map_err(|e| { + let msg = format!("Failed to decode schema version: {}", e); + Error::new(ErrorKind::Other, msg) + })?; + + let storable_builder = StorableBuilder::new(RandEntropySource); + // Schema version was added starting with V1, so if set at all, we use the key as `aad` + let aad = store_key.as_bytes(); + let decrypted = storable_builder + .deconstruct(storable, &data_encryption_key, aad) + .map_err(|e| { + let msg = format!("Failed to decode schema version: {}", e); + Error::new(ErrorKind::Other, msg) + })? + .0; + + let schema_version: VssSchemaVersion = Readable::read(&mut io::Cursor::new(decrypted)) + .map_err(|e| { + let msg = format!("Failed to decode schema version: {}", e); + Error::new(ErrorKind::Other, msg) + })?; + Ok((schema_version, client)) + } else { + // The schema version wasn't present, this either means we're running for the first time *or* it's V0 pre-migration (predating writing of the schema version). + + // Check if any `bdk_wallet` data was written by listing keys under the respective + // (unobfuscated) prefix. + const V0_BDK_WALLET_PREFIX: &str = "bdk_wallet#"; + let request = ListKeyVersionsRequest { + store_id: store_id.clone(), + key_prefix: Some(V0_BDK_WALLET_PREFIX.to_string()), + page_token: None, + page_size: None, + }; + + let response = client.list_key_versions(&request).await.map_err(|e| { + let msg = format!("Failed to determine schema version: {}", e); + Error::new(ErrorKind::Other, msg) + })?; + + let wallet_data_present = !response.key_versions.is_empty(); + if wallet_data_present { + // If the wallet data is present, it means we're not running for the first time. + Ok((VssSchemaVersion::V0, client)) + } else { + // We're running for the first time, write the schema version to save unnecessary IOps + // on future startup. + let schema_version = VssSchemaVersion::V1; + let encoded_version = schema_version.encode(); + + let storable_builder = StorableBuilder::new(RandEntropySource); + let vss_version = -1; + let aad = store_key.as_bytes(); + let storable = + storable_builder.build(encoded_version, vss_version, &data_encryption_key, aad); + + let request = PutObjectRequest { + store_id, + global_version: None, + transaction_items: vec![KeyValue { + key: store_key, + version: vss_version, + value: storable.encode_to_vec(), + }], + delete_items: vec![], + }; + + client.put_object(&request).await.map_err(|e| { + let msg = format!("Failed to write schema version: {}", e); + Error::new(ErrorKind::Other, msg) + })?; + + Ok((schema_version, client)) + } + } +} + /// A source for generating entropy/randomness using [`rand`]. pub(crate) struct RandEntropySource; @@ -606,11 +803,10 @@ mod tests { use rand::distr::Alphanumeric; use rand::{rng, Rng, RngCore}; - use vss_client::headers::FixedHeaders; + use vss_client_ng::headers::FixedHeaders; use super::*; use crate::io::test_utils::do_read_write_remove_list_persist; - use crate::logger::Logger; #[test] fn vss_read_write_remove_list_persist() { @@ -620,11 +816,8 @@ mod tests { let mut vss_seed = [0u8; 32]; rng.fill_bytes(&mut vss_seed); let header_provider = Arc::new(FixedHeaders::new(HashMap::new())); - let logger = Arc::new(Logger::new_log_facade()); - let runtime = Arc::new(Runtime::new(logger).unwrap()); let vss_store = - VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider, runtime); - + VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider).unwrap(); do_read_write_remove_list_persist(&vss_store); } @@ -636,10 +829,8 @@ mod tests { let mut vss_seed = [0u8; 32]; rng.fill_bytes(&mut vss_seed); let header_provider = Arc::new(FixedHeaders::new(HashMap::new())); - let logger = Arc::new(Logger::new_log_facade()); - let runtime = Arc::new(Runtime::new(logger).unwrap()); let vss_store = - VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider, runtime); + VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider).unwrap(); do_read_write_remove_list_persist(&vss_store); drop(vss_store) diff --git a/src/lib.rs b/src/lib.rs index 701a14dde..3a4c1df9e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -159,7 +159,7 @@ pub use types::{ }; pub use { bip39, bitcoin, lightning, lightning_invoice, lightning_liquidity, lightning_types, tokio, - vss_client, + vss_client_ng, }; use crate::scoring::setup_background_pathfinding_scores_sync;