Skip to content

Commit b4254e3

Browse files
committed
Introduce two separate VssClients for async/blocking contexts
To avoid any blocking cross-runtime behavior that could arise from reusing a single client's TCP connections in different runtime contexts, we here split out the `VssStore` behavior to use one dedicated `VssClient` per context. I.e., we're now using two connections/connection pools and make sure only the `blocking_client` is used in `KVStoreSync` contexts, and `async_client` in `KVStore` contexts.
1 parent 2e7d710 commit b4254e3

File tree

1 file changed

+84
-37
lines changed

1 file changed

+84
-37
lines changed

src/io/vss_store.rs

Lines changed: 84 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -97,24 +97,22 @@ impl VssStore {
9797
let (data_encryption_key, obfuscation_master_key) =
9898
derive_data_encryption_and_obfuscation_keys(&vss_seed);
9999
let key_obfuscator = KeyObfuscator::new(obfuscation_master_key);
100-
let retry_policy = ExponentialBackoffRetryPolicy::new(Duration::from_millis(10))
101-
.with_max_attempts(100)
102-
.with_max_total_delay(Duration::from_secs(60))
103-
.with_max_jitter(Duration::from_millis(50))
104-
.skip_retry_on_error(Box::new(|e: &VssError| {
105-
matches!(
106-
e,
107-
VssError::NoSuchKeyError(..)
108-
| VssError::InvalidRequestError(..)
109-
| VssError::ConflictError(..)
110-
)
111-
}) as _);
112100

113-
let client = VssClient::new_with_headers(base_url, retry_policy, header_provider);
101+
let sync_retry_policy = retry_policy();
102+
let blocking_client = VssClient::new_with_headers(
103+
base_url.clone(),
104+
sync_retry_policy,
105+
header_provider.clone(),
106+
);
107+
108+
let async_retry_policy = retry_policy();
109+
let async_client =
110+
VssClient::new_with_headers(base_url, async_retry_policy, header_provider);
114111

115112
let inner = Arc::new(VssStoreInner::new(
116113
schema_version,
117-
client,
114+
blocking_client,
115+
async_client,
118116
store_id,
119117
data_encryption_key,
120118
key_obfuscator,
@@ -163,8 +161,11 @@ impl KVStoreSync for VssStore {
163161
let secondary_namespace = secondary_namespace.to_string();
164162
let key = key.to_string();
165163
let inner = Arc::clone(&self.inner);
166-
let fut =
167-
async move { inner.read_internal(primary_namespace, secondary_namespace, key).await };
164+
let fut = async move {
165+
inner
166+
.read_internal(&inner.blocking_client, primary_namespace, secondary_namespace, key)
167+
.await
168+
};
168169
tokio::task::block_in_place(move || internal_runtime.block_on(fut))
169170
}
170171

@@ -185,6 +186,7 @@ impl KVStoreSync for VssStore {
185186
let fut = async move {
186187
inner
187188
.write_internal(
189+
&inner.blocking_client,
188190
inner_lock_ref,
189191
locking_key,
190192
version,
@@ -215,6 +217,7 @@ impl KVStoreSync for VssStore {
215217
let fut = async move {
216218
inner
217219
.remove_internal(
220+
&inner.blocking_client,
218221
inner_lock_ref,
219222
locking_key,
220223
version,
@@ -236,7 +239,11 @@ impl KVStoreSync for VssStore {
236239
let primary_namespace = primary_namespace.to_string();
237240
let secondary_namespace = secondary_namespace.to_string();
238241
let inner = Arc::clone(&self.inner);
239-
let fut = async move { inner.list_internal(primary_namespace, secondary_namespace).await };
242+
let fut = async move {
243+
inner
244+
.list_internal(&inner.blocking_client, primary_namespace, secondary_namespace)
245+
.await
246+
};
240247
tokio::task::block_in_place(move || internal_runtime.block_on(fut))
241248
}
242249
}
@@ -249,9 +256,11 @@ impl KVStore for VssStore {
249256
let secondary_namespace = secondary_namespace.to_string();
250257
let key = key.to_string();
251258
let inner = Arc::clone(&self.inner);
252-
Box::pin(
253-
async move { inner.read_internal(primary_namespace, secondary_namespace, key).await },
254-
)
259+
Box::pin(async move {
260+
inner
261+
.read_internal(&inner.async_client, primary_namespace, secondary_namespace, key)
262+
.await
263+
})
255264
}
256265
fn write(
257266
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
@@ -265,6 +274,7 @@ impl KVStore for VssStore {
265274
Box::pin(async move {
266275
inner
267276
.write_internal(
277+
&inner.async_client,
268278
inner_lock_ref,
269279
locking_key,
270280
version,
@@ -288,6 +298,7 @@ impl KVStore for VssStore {
288298
Box::pin(async move {
289299
inner
290300
.remove_internal(
301+
&inner.async_client,
291302
inner_lock_ref,
292303
locking_key,
293304
version,
@@ -304,7 +315,9 @@ impl KVStore for VssStore {
304315
let primary_namespace = primary_namespace.to_string();
305316
let secondary_namespace = secondary_namespace.to_string();
306317
let inner = Arc::clone(&self.inner);
307-
Box::pin(async move { inner.list_internal(primary_namespace, secondary_namespace).await })
318+
Box::pin(async move {
319+
inner.list_internal(&inner.async_client, primary_namespace, secondary_namespace).await
320+
})
308321
}
309322
}
310323

@@ -317,7 +330,10 @@ impl Drop for VssStore {
317330

318331
struct VssStoreInner {
319332
schema_version: VssSchemaVersion,
320-
client: VssClient<CustomRetryPolicy>,
333+
blocking_client: VssClient<CustomRetryPolicy>,
334+
// A secondary client that will only be used for async persistence via `KVStore`, to ensure TCP
335+
// connections aren't shared between our outer and the internal runtime.
336+
async_client: VssClient<CustomRetryPolicy>,
321337
store_id: String,
322338
data_encryption_key: [u8; 32],
323339
key_obfuscator: KeyObfuscator,
@@ -328,11 +344,20 @@ struct VssStoreInner {
328344

329345
impl VssStoreInner {
330346
pub(crate) fn new(
331-
schema_version: VssSchemaVersion, client: VssClient<CustomRetryPolicy>, store_id: String,
347+
schema_version: VssSchemaVersion, blocking_client: VssClient<CustomRetryPolicy>,
348+
async_client: VssClient<CustomRetryPolicy>, store_id: String,
332349
data_encryption_key: [u8; 32], key_obfuscator: KeyObfuscator,
333350
) -> Self {
334351
let locks = Mutex::new(HashMap::new());
335-
Self { schema_version, client, store_id, data_encryption_key, key_obfuscator, locks }
352+
Self {
353+
schema_version,
354+
blocking_client,
355+
async_client,
356+
store_id,
357+
data_encryption_key,
358+
key_obfuscator,
359+
locks,
360+
}
336361
}
337362

338363
fn get_inner_lock_ref(&self, locking_key: String) -> Arc<tokio::sync::Mutex<u64>> {
@@ -388,7 +413,8 @@ impl VssStoreInner {
388413
}
389414

390415
async fn list_all_keys(
391-
&self, primary_namespace: &str, secondary_namespace: &str,
416+
&self, client: &VssClient<CustomRetryPolicy>, primary_namespace: &str,
417+
secondary_namespace: &str,
392418
) -> io::Result<Vec<String>> {
393419
let mut page_token = None;
394420
let mut keys = vec![];
@@ -401,7 +427,7 @@ impl VssStoreInner {
401427
page_size: None,
402428
};
403429

404-
let response = self.client.list_key_versions(&request).await.map_err(|e| {
430+
let response = client.list_key_versions(&request).await.map_err(|e| {
405431
let msg = format!(
406432
"Failed to list keys in {}/{}: {}",
407433
primary_namespace, secondary_namespace, e
@@ -418,13 +444,14 @@ impl VssStoreInner {
418444
}
419445

420446
async fn read_internal(
421-
&self, primary_namespace: String, secondary_namespace: String, key: String,
447+
&self, client: &VssClient<CustomRetryPolicy>, primary_namespace: String,
448+
secondary_namespace: String, key: String,
422449
) -> io::Result<Vec<u8>> {
423450
check_namespace_key_validity(&primary_namespace, &secondary_namespace, Some(&key), "read")?;
424451

425452
let store_key = self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key);
426453
let request = GetObjectRequest { store_id: self.store_id.clone(), key: store_key.clone() };
427-
let resp = self.client.get_object(&request).await.map_err(|e| {
454+
let resp = client.get_object(&request).await.map_err(|e| {
428455
let msg = format!(
429456
"Failed to read from key {}/{}/{}: {}",
430457
primary_namespace, secondary_namespace, key, e
@@ -453,8 +480,9 @@ impl VssStoreInner {
453480
}
454481

455482
async fn write_internal(
456-
&self, inner_lock_ref: Arc<tokio::sync::Mutex<u64>>, locking_key: String, version: u64,
457-
primary_namespace: String, secondary_namespace: String, key: String, buf: Vec<u8>,
483+
&self, client: &VssClient<CustomRetryPolicy>, inner_lock_ref: Arc<tokio::sync::Mutex<u64>>,
484+
locking_key: String, version: u64, primary_namespace: String, secondary_namespace: String,
485+
key: String, buf: Vec<u8>,
458486
) -> io::Result<()> {
459487
check_namespace_key_validity(
460488
&primary_namespace,
@@ -482,7 +510,7 @@ impl VssStoreInner {
482510
};
483511

484512
self.execute_locked_write(inner_lock_ref, locking_key, version, async move || {
485-
self.client.put_object(&request).await.map_err(|e| {
513+
client.put_object(&request).await.map_err(|e| {
486514
let msg = format!(
487515
"Failed to write to key {}/{}/{}: {}",
488516
primary_namespace, secondary_namespace, key, e
@@ -496,8 +524,9 @@ impl VssStoreInner {
496524
}
497525

498526
async fn remove_internal(
499-
&self, inner_lock_ref: Arc<tokio::sync::Mutex<u64>>, locking_key: String, version: u64,
500-
primary_namespace: String, secondary_namespace: String, key: String,
527+
&self, client: &VssClient<CustomRetryPolicy>, inner_lock_ref: Arc<tokio::sync::Mutex<u64>>,
528+
locking_key: String, version: u64, primary_namespace: String, secondary_namespace: String,
529+
key: String,
501530
) -> io::Result<()> {
502531
check_namespace_key_validity(
503532
&primary_namespace,
@@ -514,7 +543,7 @@ impl VssStoreInner {
514543
key_value: Some(KeyValue { key: obfuscated_key, version: -1, value: vec![] }),
515544
};
516545

517-
self.client.delete_object(&request).await.map_err(|e| {
546+
client.delete_object(&request).await.map_err(|e| {
518547
let msg = format!(
519548
"Failed to delete key {}/{}/{}: {}",
520549
primary_namespace, secondary_namespace, key, e
@@ -528,12 +557,15 @@ impl VssStoreInner {
528557
}
529558

530559
async fn list_internal(
531-
&self, primary_namespace: String, secondary_namespace: String,
560+
&self, client: &VssClient<CustomRetryPolicy>, primary_namespace: String,
561+
secondary_namespace: String,
532562
) -> io::Result<Vec<String>> {
533563
check_namespace_key_validity(&primary_namespace, &secondary_namespace, None, "list")?;
534564

535-
let keys =
536-
self.list_all_keys(&primary_namespace, &secondary_namespace).await.map_err(|e| {
565+
let keys = self
566+
.list_all_keys(client, &primary_namespace, &secondary_namespace)
567+
.await
568+
.map_err(|e| {
537569
let msg = format!(
538570
"Failed to retrieve keys in namespace: {}/{} : {}",
539571
primary_namespace, secondary_namespace, e
@@ -602,6 +634,21 @@ fn derive_data_encryption_and_obfuscation_keys(vss_seed: &[u8; 32]) -> ([u8; 32]
602634
(k1, k2)
603635
}
604636

637+
fn retry_policy() -> CustomRetryPolicy {
638+
ExponentialBackoffRetryPolicy::new(Duration::from_millis(10))
639+
.with_max_attempts(100)
640+
.with_max_total_delay(Duration::from_secs(60))
641+
.with_max_jitter(Duration::from_millis(50))
642+
.skip_retry_on_error(Box::new(|e: &VssError| {
643+
matches!(
644+
e,
645+
VssError::NoSuchKeyError(..)
646+
| VssError::InvalidRequestError(..)
647+
| VssError::ConflictError(..)
648+
)
649+
}) as _)
650+
}
651+
605652
/// A source for generating entropy/randomness using [`rand`].
606653
pub(crate) struct RandEntropySource;
607654

0 commit comments

Comments
 (0)