Skip to content

Commit ee122f2

Browse files
committed
Implement KVStore for VssStore
We implement the async `KVStore` trait for `VssStore`.
1 parent 9d08b27 commit ee122f2

File tree

1 file changed

+234
-49
lines changed

1 file changed

+234
-49
lines changed

src/io/vss_store.rs

Lines changed: 234 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,22 @@
55
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
66
// accordance with one or both of these licenses.
77

8+
use std::boxed::Box;
9+
use std::collections::HashMap;
10+
use std::future::Future;
811
#[cfg(test)]
912
use std::panic::RefUnwindSafe;
10-
use std::sync::Arc;
13+
use std::pin::Pin;
14+
use std::sync::atomic::{AtomicU64, Ordering};
15+
use std::sync::{Arc, Mutex};
1116
use std::time::Duration;
1217

1318
use bitcoin::hashes::{sha256, Hash, HashEngine, Hmac, HmacEngine};
1419
use lightning::io::{self, Error, ErrorKind};
15-
use lightning::util::persist::KVStoreSync;
20+
use lightning::util::persist::{KVStore, KVStoreSync};
1621
use prost::Message;
1722
use rand::RngCore;
23+
use tokio::sync::RwLock;
1824
use vss_client::client::VssClient;
1925
use vss_client::error::VssError;
2026
use vss_client::headers::VssHeaderProvider;
@@ -42,6 +48,9 @@ type CustomRetryPolicy = FilteredRetryPolicy<
4248
/// A [`KVStoreSync`] implementation that writes to and reads from a [VSS](https://github.com/lightningdevkit/vss-server/blob/main/README.md) backend.
4349
pub struct VssStore {
4450
inner: Arc<VssStoreInner>,
51+
// Version counter to ensure that writes are applied in the correct order. It is assumed that read and list
52+
// operations aren't sensitive to the order of execution.
53+
next_version: AtomicU64,
4554
runtime: Arc<Runtime>,
4655
}
4756

@@ -51,7 +60,32 @@ impl VssStore {
5160
header_provider: Arc<dyn VssHeaderProvider>, runtime: Arc<Runtime>,
5261
) -> Self {
5362
let inner = Arc::new(VssStoreInner::new(base_url, store_id, vss_seed, header_provider));
54-
Self { inner, runtime }
63+
let next_version = AtomicU64::new(1);
64+
Self { inner, next_version, runtime }
65+
}
66+
67+
// Same logic as for the obfuscated keys below, but just for locking, using the plaintext keys
68+
fn build_locking_key(
69+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
70+
) -> String {
71+
if primary_namespace.is_empty() {
72+
key.to_owned()
73+
} else {
74+
format!("{}#{}#{}", primary_namespace, secondary_namespace, key)
75+
}
76+
}
77+
78+
fn get_new_version_and_lock_ref(&self, locking_key: String) -> (Arc<RwLock<u64>>, u64) {
79+
let version = self.next_version.fetch_add(1, Ordering::Relaxed);
80+
if version == u64::MAX {
81+
panic!("VssStore version counter overflowed");
82+
}
83+
84+
// Get a reference to the inner lock. We do this early so that the arc can double as an in-flight counter for
85+
// cleaning up unused locks.
86+
let inner_lock_ref = self.inner.get_inner_lock_ref(locking_key);
87+
88+
(inner_lock_ref, version)
5589
}
5690
}
5791

@@ -66,14 +100,34 @@ impl KVStoreSync for VssStore {
66100
fn write(
67101
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
68102
) -> io::Result<()> {
69-
let fut = self.inner.write_internal(primary_namespace, secondary_namespace, key, buf);
103+
let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key);
104+
let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone());
105+
let fut = self.inner.write_internal(
106+
inner_lock_ref,
107+
locking_key,
108+
version,
109+
primary_namespace,
110+
secondary_namespace,
111+
key,
112+
buf,
113+
);
70114
self.runtime.block_on(fut)
71115
}
72116

73117
fn remove(
74118
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
75119
) -> io::Result<()> {
76-
let fut = self.inner.remove_internal(primary_namespace, secondary_namespace, key, lazy);
120+
let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key);
121+
let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone());
122+
let fut = self.inner.remove_internal(
123+
inner_lock_ref,
124+
locking_key,
125+
version,
126+
primary_namespace,
127+
secondary_namespace,
128+
key,
129+
lazy,
130+
);
77131
self.runtime.block_on(fut)
78132
}
79133

@@ -83,11 +137,82 @@ impl KVStoreSync for VssStore {
83137
}
84138
}
85139

140+
impl KVStore for VssStore {
141+
fn read(
142+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
143+
) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, io::Error>> + Send>> {
144+
let primary_namespace = primary_namespace.to_string();
145+
let secondary_namespace = secondary_namespace.to_string();
146+
let key = key.to_string();
147+
let inner = Arc::clone(&self.inner);
148+
Box::pin(async move {
149+
inner.read_internal(&primary_namespace, &secondary_namespace, &key).await
150+
})
151+
}
152+
fn write(
153+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
154+
) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + Send>> {
155+
let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key);
156+
let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone());
157+
let primary_namespace = primary_namespace.to_string();
158+
let secondary_namespace = secondary_namespace.to_string();
159+
let key = key.to_string();
160+
let inner = Arc::clone(&self.inner);
161+
Box::pin(async move {
162+
inner
163+
.write_internal(
164+
inner_lock_ref,
165+
locking_key,
166+
version,
167+
&primary_namespace,
168+
&secondary_namespace,
169+
&key,
170+
buf,
171+
)
172+
.await
173+
})
174+
}
175+
fn remove(
176+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
177+
) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + Send>> {
178+
let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key);
179+
let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone());
180+
let primary_namespace = primary_namespace.to_string();
181+
let secondary_namespace = secondary_namespace.to_string();
182+
let key = key.to_string();
183+
let inner = Arc::clone(&self.inner);
184+
Box::pin(async move {
185+
inner
186+
.remove_internal(
187+
inner_lock_ref,
188+
locking_key,
189+
version,
190+
&primary_namespace,
191+
&secondary_namespace,
192+
&key,
193+
lazy,
194+
)
195+
.await
196+
})
197+
}
198+
fn list(
199+
&self, primary_namespace: &str, secondary_namespace: &str,
200+
) -> Pin<Box<dyn Future<Output = Result<Vec<String>, io::Error>> + Send>> {
201+
let primary_namespace = primary_namespace.to_string();
202+
let secondary_namespace = secondary_namespace.to_string();
203+
let inner = Arc::clone(&self.inner);
204+
Box::pin(async move { inner.list_internal(&primary_namespace, &secondary_namespace).await })
205+
}
206+
}
207+
86208
struct VssStoreInner {
87209
client: VssClient<CustomRetryPolicy>,
88210
store_id: String,
89211
storable_builder: StorableBuilder<RandEntropySource>,
90212
key_obfuscator: KeyObfuscator,
213+
// Per-key locks that ensures that we don't have concurrent writes to the same namespace/key.
214+
// The lock also encapsulates the latest written version per key.
215+
locks: Mutex<HashMap<String, Arc<RwLock<u64>>>>,
91216
}
92217

93218
impl VssStoreInner {
@@ -113,10 +238,18 @@ impl VssStoreInner {
113238
}) as _);
114239

115240
let client = VssClient::new_with_headers(base_url, retry_policy, header_provider);
116-
Self { client, store_id, storable_builder, key_obfuscator }
241+
let locks = Mutex::new(HashMap::new());
242+
Self { client, store_id, storable_builder, key_obfuscator, locks }
117243
}
118244

119-
fn build_key(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> String {
245+
fn get_inner_lock_ref(&self, locking_key: String) -> Arc<RwLock<u64>> {
246+
let mut outer_lock = self.locks.lock().unwrap();
247+
Arc::clone(&outer_lock.entry(locking_key).or_default())
248+
}
249+
250+
fn build_obfuscated_key(
251+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
252+
) -> String {
120253
let obfuscated_key = self.key_obfuscator.obfuscate(key);
121254
if primary_namespace.is_empty() {
122255
obfuscated_key
@@ -171,10 +304,9 @@ impl VssStoreInner {
171304
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
172305
) -> io::Result<Vec<u8>> {
173306
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?;
174-
let request = GetObjectRequest {
175-
store_id: self.store_id.clone(),
176-
key: self.build_key(primary_namespace, secondary_namespace, key),
177-
};
307+
308+
let obfuscated_key = self.build_obfuscated_key(primary_namespace, secondary_namespace, key);
309+
let request = GetObjectRequest { store_id: self.store_id.clone(), key: obfuscated_key };
178310
let resp = self.client.get_object(&request).await.map_err(|e| {
179311
let msg = format!(
180312
"Failed to read from key {}/{}/{}: {}",
@@ -200,55 +332,65 @@ impl VssStoreInner {
200332
}
201333

202334
async fn write_internal(
203-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
335+
&self, inner_lock_ref: Arc<RwLock<u64>>, locking_key: String, version: u64,
336+
primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
204337
) -> io::Result<()> {
205338
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?;
206-
let version = -1;
207-
let storable = self.storable_builder.build(buf, version);
208-
let request = PutObjectRequest {
209-
store_id: self.store_id.clone(),
210-
global_version: None,
211-
transaction_items: vec![KeyValue {
212-
key: self.build_key(primary_namespace, secondary_namespace, key),
213-
version,
214-
value: storable.encode_to_vec(),
215-
}],
216-
delete_items: vec![],
217-
};
218339

219-
self.client.put_object(&request).await.map_err(|e| {
220-
let msg = format!(
221-
"Failed to write to key {}/{}/{}: {}",
222-
primary_namespace, secondary_namespace, key, e
223-
);
224-
Error::new(ErrorKind::Other, msg)
225-
})?;
340+
self.execute_locked_write(inner_lock_ref, locking_key, version, async move || {
341+
let obfuscated_key =
342+
self.build_obfuscated_key(primary_namespace, secondary_namespace, key);
343+
let vss_version = -1;
344+
let storable = self.storable_builder.build(buf, vss_version);
345+
let request = PutObjectRequest {
346+
store_id: self.store_id.clone(),
347+
global_version: None,
348+
transaction_items: vec![KeyValue {
349+
key: obfuscated_key,
350+
version: vss_version,
351+
value: storable.encode_to_vec(),
352+
}],
353+
delete_items: vec![],
354+
};
226355

227-
Ok(())
356+
self.client.put_object(&request).await.map_err(|e| {
357+
let msg = format!(
358+
"Failed to write to key {}/{}/{}: {}",
359+
primary_namespace, secondary_namespace, key, e
360+
);
361+
Error::new(ErrorKind::Other, msg)
362+
})?;
363+
364+
Ok(())
365+
})
366+
.await
228367
}
229368

230369
async fn remove_internal(
231-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool,
370+
&self, inner_lock_ref: Arc<RwLock<u64>>, locking_key: String, version: u64,
371+
primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool,
232372
) -> io::Result<()> {
233373
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "remove")?;
234-
let request = DeleteObjectRequest {
235-
store_id: self.store_id.clone(),
236-
key_value: Some(KeyValue {
237-
key: self.build_key(primary_namespace, secondary_namespace, key),
238-
version: -1,
239-
value: vec![],
240-
}),
241-
};
242374

243-
self.client.delete_object(&request).await.map_err(|e| {
244-
let msg = format!(
245-
"Failed to delete key {}/{}/{}: {}",
246-
primary_namespace, secondary_namespace, key, e
247-
);
248-
Error::new(ErrorKind::Other, msg)
249-
})?;
375+
self.execute_locked_write(inner_lock_ref, locking_key, version, async move || {
376+
let obfuscated_key =
377+
self.build_obfuscated_key(primary_namespace, secondary_namespace, key);
378+
let request = DeleteObjectRequest {
379+
store_id: self.store_id.clone(),
380+
key_value: Some(KeyValue { key: obfuscated_key, version: -1, value: vec![] }),
381+
};
250382

251-
Ok(())
383+
self.client.delete_object(&request).await.map_err(|e| {
384+
let msg = format!(
385+
"Failed to delete key {}/{}/{}: {}",
386+
primary_namespace, secondary_namespace, key, e
387+
);
388+
Error::new(ErrorKind::Other, msg)
389+
})?;
390+
391+
Ok(())
392+
})
393+
.await
252394
}
253395

254396
async fn list_internal(
@@ -267,6 +409,49 @@ impl VssStoreInner {
267409

268410
Ok(keys)
269411
}
412+
413+
async fn execute_locked_write<
414+
F: Future<Output = Result<(), lightning::io::Error>>,
415+
FN: FnOnce() -> F,
416+
>(
417+
&self, inner_lock_ref: Arc<RwLock<u64>>, locking_key: String, version: u64, callback: FN,
418+
) -> Result<(), lightning::io::Error> {
419+
let res = {
420+
let mut last_written_version = inner_lock_ref.write().await;
421+
422+
// Check if we already have a newer version written/removed. This is used in async contexts to realize eventual
423+
// consistency.
424+
let is_stale_version = version <= *last_written_version;
425+
426+
// If the version is not stale, we execute the callback. Otherwise we can and must skip writing.
427+
if is_stale_version {
428+
Ok(())
429+
} else {
430+
callback().await.map(|_| {
431+
*last_written_version = version;
432+
})
433+
}
434+
};
435+
436+
self.clean_locks(&inner_lock_ref, locking_key);
437+
438+
res
439+
}
440+
441+
fn clean_locks(&self, inner_lock_ref: &Arc<RwLock<u64>>, locking_key: String) {
442+
// If there no arcs in use elsewhere, this means that there are no in-flight writes. We can remove the map entry
443+
// to prevent leaking memory. The two arcs that are expected are the one in the map and the one held here in
444+
// inner_lock_ref. The outer lock is obtained first, to avoid a new arc being cloned after we've already
445+
// counted.
446+
let mut outer_lock = self.locks.lock().unwrap();
447+
448+
let strong_count = Arc::strong_count(&inner_lock_ref);
449+
debug_assert!(strong_count >= 2, "Unexpected VssStore strong count");
450+
451+
if strong_count == 2 {
452+
outer_lock.remove(&locking_key);
453+
}
454+
}
270455
}
271456

272457
fn derive_data_encryption_and_obfuscation_keys(vss_seed: &[u8; 32]) -> ([u8; 32], [u8; 32]) {

0 commit comments

Comments
 (0)