Skip to content

Commit 6d25ba1

Browse files
committed
Introduce two separate VssClients for async/blocking contexts
To avoid any blocking cross-runtime behavior that could arise from reusing a single client's TCP connections in different runtime contexts, we here split out the `VssStore` behavior to use one dedicated `VssClient` per context. I.e., we're now using two connections/connection pools and make sure only the `blocking_client` is used in `KVStoreSync` contexts, and `async_client` in `KVStore` contexts.
1 parent 7b51a75 commit 6d25ba1

File tree

1 file changed

+70
-35
lines changed

1 file changed

+70
-35
lines changed

src/io/vss_store.rs

Lines changed: 70 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,11 @@ impl KVStoreSync for VssStore {
126126
let secondary_namespace = secondary_namespace.to_string();
127127
let key = key.to_string();
128128
let inner = Arc::clone(&self.inner);
129-
let fut =
130-
async move { inner.read_internal(primary_namespace, secondary_namespace, key).await };
129+
let fut = async move {
130+
inner
131+
.read_internal(&inner.blocking_client, primary_namespace, secondary_namespace, key)
132+
.await
133+
};
131134
tokio::task::block_in_place(move || internal_runtime.block_on(fut))
132135
}
133136

@@ -148,6 +151,7 @@ impl KVStoreSync for VssStore {
148151
let fut = async move {
149152
inner
150153
.write_internal(
154+
&inner.blocking_client,
151155
inner_lock_ref,
152156
locking_key,
153157
version,
@@ -178,6 +182,7 @@ impl KVStoreSync for VssStore {
178182
let fut = async move {
179183
inner
180184
.remove_internal(
185+
&inner.blocking_client,
181186
inner_lock_ref,
182187
locking_key,
183188
version,
@@ -199,7 +204,11 @@ impl KVStoreSync for VssStore {
199204
let primary_namespace = primary_namespace.to_string();
200205
let secondary_namespace = secondary_namespace.to_string();
201206
let inner = Arc::clone(&self.inner);
202-
let fut = async move { inner.list_internal(primary_namespace, secondary_namespace).await };
207+
let fut = async move {
208+
inner
209+
.list_internal(&inner.blocking_client, primary_namespace, secondary_namespace)
210+
.await
211+
};
203212
tokio::task::block_in_place(move || internal_runtime.block_on(fut))
204213
}
205214
}
@@ -212,9 +221,11 @@ impl KVStore for VssStore {
212221
let secondary_namespace = secondary_namespace.to_string();
213222
let key = key.to_string();
214223
let inner = Arc::clone(&self.inner);
215-
Box::pin(
216-
async move { inner.read_internal(primary_namespace, secondary_namespace, key).await },
217-
)
224+
Box::pin(async move {
225+
inner
226+
.read_internal(&inner.async_client, primary_namespace, secondary_namespace, key)
227+
.await
228+
})
218229
}
219230
fn write(
220231
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
@@ -228,6 +239,7 @@ impl KVStore for VssStore {
228239
Box::pin(async move {
229240
inner
230241
.write_internal(
242+
&inner.async_client,
231243
inner_lock_ref,
232244
locking_key,
233245
version,
@@ -251,6 +263,7 @@ impl KVStore for VssStore {
251263
Box::pin(async move {
252264
inner
253265
.remove_internal(
266+
&inner.async_client,
254267
inner_lock_ref,
255268
locking_key,
256269
version,
@@ -267,7 +280,9 @@ impl KVStore for VssStore {
267280
let primary_namespace = primary_namespace.to_string();
268281
let secondary_namespace = secondary_namespace.to_string();
269282
let inner = Arc::clone(&self.inner);
270-
Box::pin(async move { inner.list_internal(primary_namespace, secondary_namespace).await })
283+
Box::pin(async move {
284+
inner.list_internal(&inner.async_client, primary_namespace, secondary_namespace).await
285+
})
271286
}
272287
}
273288

@@ -279,7 +294,10 @@ impl Drop for VssStore {
279294
}
280295

281296
struct VssStoreInner {
282-
client: VssClient<CustomRetryPolicy>,
297+
blocking_client: VssClient<CustomRetryPolicy>,
298+
// A secondary client that will only be used for async persistence via `KVStore`, to ensure TCP
299+
// connections aren't shared between our outer and the internal runtime.
300+
async_client: VssClient<CustomRetryPolicy>,
283301
store_id: String,
284302
data_encryption_key: [u8; 32],
285303
key_obfuscator: KeyObfuscator,
@@ -296,22 +314,18 @@ impl VssStoreInner {
296314
let (data_encryption_key, obfuscation_master_key) =
297315
derive_data_encryption_and_obfuscation_keys(&vss_seed);
298316
let key_obfuscator = KeyObfuscator::new(obfuscation_master_key);
299-
let retry_policy = ExponentialBackoffRetryPolicy::new(Duration::from_millis(10))
300-
.with_max_attempts(100)
301-
.with_max_total_delay(Duration::from_secs(60))
302-
.with_max_jitter(Duration::from_millis(50))
303-
.skip_retry_on_error(Box::new(|e: &VssError| {
304-
matches!(
305-
e,
306-
VssError::NoSuchKeyError(..)
307-
| VssError::InvalidRequestError(..)
308-
| VssError::ConflictError(..)
309-
)
310-
}) as _);
317+
let sync_retry_policy = retry_policy();
318+
let blocking_client = VssClient::new_with_headers(
319+
base_url.clone(),
320+
sync_retry_policy,
321+
header_provider.clone(),
322+
);
311323

312-
let client = VssClient::new_with_headers(base_url, retry_policy, header_provider);
324+
let async_retry_policy = retry_policy();
325+
let async_client =
326+
VssClient::new_with_headers(base_url, async_retry_policy, header_provider);
313327
let locks = Mutex::new(HashMap::new());
314-
Self { client, store_id, data_encryption_key, key_obfuscator, locks }
328+
Self { blocking_client, async_client, store_id, data_encryption_key, key_obfuscator, locks }
315329
}
316330

317331
fn get_inner_lock_ref(&self, locking_key: String) -> Arc<tokio::sync::Mutex<u64>> {
@@ -343,7 +357,8 @@ impl VssStoreInner {
343357
}
344358

345359
async fn list_all_keys(
346-
&self, primary_namespace: &str, secondary_namespace: &str,
360+
&self, client: &VssClient<CustomRetryPolicy>, primary_namespace: &str,
361+
secondary_namespace: &str,
347362
) -> io::Result<Vec<String>> {
348363
let mut page_token = None;
349364
let mut keys = vec![];
@@ -356,7 +371,7 @@ impl VssStoreInner {
356371
page_size: None,
357372
};
358373

359-
let response = self.client.list_key_versions(&request).await.map_err(|e| {
374+
let response = client.list_key_versions(&request).await.map_err(|e| {
360375
let msg = format!(
361376
"Failed to list keys in {}/{}: {}",
362377
primary_namespace, secondary_namespace, e
@@ -373,13 +388,14 @@ impl VssStoreInner {
373388
}
374389

375390
async fn read_internal(
376-
&self, primary_namespace: String, secondary_namespace: String, key: String,
391+
&self, client: &VssClient<CustomRetryPolicy>, primary_namespace: String,
392+
secondary_namespace: String, key: String,
377393
) -> io::Result<Vec<u8>> {
378394
check_namespace_key_validity(&primary_namespace, &secondary_namespace, Some(&key), "read")?;
379395

380396
let store_key = self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key);
381397
let request = GetObjectRequest { store_id: self.store_id.clone(), key: store_key.clone() };
382-
let resp = self.client.get_object(&request).await.map_err(|e| {
398+
let resp = client.get_object(&request).await.map_err(|e| {
383399
let msg = format!(
384400
"Failed to read from key {}/{}/{}: {}",
385401
primary_namespace, secondary_namespace, key, e
@@ -408,8 +424,9 @@ impl VssStoreInner {
408424
}
409425

410426
async fn write_internal(
411-
&self, inner_lock_ref: Arc<tokio::sync::Mutex<u64>>, locking_key: String, version: u64,
412-
primary_namespace: String, secondary_namespace: String, key: String, buf: Vec<u8>,
427+
&self, client: &VssClient<CustomRetryPolicy>, inner_lock_ref: Arc<tokio::sync::Mutex<u64>>,
428+
locking_key: String, version: u64, primary_namespace: String, secondary_namespace: String,
429+
key: String, buf: Vec<u8>,
413430
) -> io::Result<()> {
414431
check_namespace_key_validity(
415432
&primary_namespace,
@@ -439,7 +456,7 @@ impl VssStoreInner {
439456
};
440457

441458
self.execute_locked_write(inner_lock_ref, locking_key, version, async move || {
442-
self.client.put_object(&request).await.map_err(|e| {
459+
client.put_object(&request).await.map_err(|e| {
443460
let msg = format!(
444461
"Failed to write to key {}/{}/{}: {}",
445462
primary_namespace, secondary_namespace, key, e
@@ -453,8 +470,9 @@ impl VssStoreInner {
453470
}
454471

455472
async fn remove_internal(
456-
&self, inner_lock_ref: Arc<tokio::sync::Mutex<u64>>, locking_key: String, version: u64,
457-
primary_namespace: String, secondary_namespace: String, key: String,
473+
&self, client: &VssClient<CustomRetryPolicy>, inner_lock_ref: Arc<tokio::sync::Mutex<u64>>,
474+
locking_key: String, version: u64, primary_namespace: String, secondary_namespace: String,
475+
key: String,
458476
) -> io::Result<()> {
459477
check_namespace_key_validity(
460478
&primary_namespace,
@@ -471,7 +489,7 @@ impl VssStoreInner {
471489
key_value: Some(KeyValue { key: obfuscated_key, version: -1, value: vec![] }),
472490
};
473491

474-
self.client.delete_object(&request).await.map_err(|e| {
492+
client.delete_object(&request).await.map_err(|e| {
475493
let msg = format!(
476494
"Failed to delete key {}/{}/{}: {}",
477495
primary_namespace, secondary_namespace, key, e
@@ -485,12 +503,15 @@ impl VssStoreInner {
485503
}
486504

487505
async fn list_internal(
488-
&self, primary_namespace: String, secondary_namespace: String,
506+
&self, client: &VssClient<CustomRetryPolicy>, primary_namespace: String,
507+
secondary_namespace: String,
489508
) -> io::Result<Vec<String>> {
490509
check_namespace_key_validity(&primary_namespace, &secondary_namespace, None, "list")?;
491510

492-
let keys =
493-
self.list_all_keys(&primary_namespace, &secondary_namespace).await.map_err(|e| {
511+
let keys = self
512+
.list_all_keys(client, &primary_namespace, &secondary_namespace)
513+
.await
514+
.map_err(|e| {
494515
let msg = format!(
495516
"Failed to retrieve keys in namespace: {}/{} : {}",
496517
primary_namespace, secondary_namespace, e
@@ -559,6 +580,20 @@ fn derive_data_encryption_and_obfuscation_keys(vss_seed: &[u8; 32]) -> ([u8; 32]
559580
(k1, k2)
560581
}
561582

583+
fn retry_policy() -> CustomRetryPolicy {
584+
ExponentialBackoffRetryPolicy::new(Duration::from_millis(10))
585+
.with_max_attempts(100)
586+
.with_max_total_delay(Duration::from_secs(60))
587+
.with_max_jitter(Duration::from_millis(50))
588+
.skip_retry_on_error(Box::new(|e: &VssError| {
589+
matches!(
590+
e,
591+
VssError::NoSuchKeyError(..)
592+
| VssError::InvalidRequestError(..)
593+
| VssError::ConflictError(..)
594+
)
595+
}) as _)
596+
}
562597
/// A source for generating entropy/randomness using [`rand`].
563598
pub(crate) struct RandEntropySource;
564599

0 commit comments

Comments
 (0)