Skip to content

Commit 9012e95

Browse files
committed
Optimized InMemoryBackendImpl
1 parent 066b6f7 commit 9012e95

File tree

9 files changed

+393
-1132
lines changed

9 files changed

+393
-1132
lines changed

rust/Cargo.lock

Lines changed: 25 additions & 796 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ cargo build --release
2525
cargo run -- server/vss-server-config.toml
2626
```
2727

28-
**Note:** For testing puropose you can edit vss-server-config.toml to use `store_type` as in-memory instead of postgresql `store_type = "memory"`
28+
**Note:** For testing purposes you can edit `vss-server-config.toml` to use `store_type` as in-memory instead of PostgreSQL: `store_type = "in_memory"`
2929
4. VSS endpoint should be reachable at `http://localhost:8080/vss`.
3030

3131
### Configuration

rust/impls/src/in_memory_store.rs

Lines changed: 351 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,351 @@
1+
use std::collections::HashMap;
2+
use std::sync::{Arc, Mutex};
3+
4+
use async_trait::async_trait;
5+
use bytes::Bytes;
6+
use chrono::prelude::Utc;
7+
8+
use crate::postgres_store::{
9+
VssDbRecord, LIST_KEY_VERSIONS_MAX_PAGE_SIZE, MAX_PUT_REQUEST_ITEM_COUNT,
10+
};
11+
use api::error::VssError;
12+
use api::kv_store::{KvStore, GLOBAL_VERSION_KEY};
13+
use api::types::{
14+
DeleteObjectRequest, DeleteObjectResponse, GetObjectRequest, GetObjectResponse, KeyValue,
15+
ListKeyVersionsRequest, ListKeyVersionsResponse, PutObjectRequest, PutObjectResponse,
16+
};
17+
18+
/// In-memory backend for VSS, for testing purposes only.
19+
pub struct InMemoryBackendImpl {
20+
store: Arc<Mutex<HashMap<String, VssDbRecord>>>,
21+
}
22+
23+
impl InMemoryBackendImpl {
24+
/// Creates a new in-memory backend.
25+
pub fn new() -> Self {
26+
Self { store: Arc::new(Mutex::new(HashMap::new())) }
27+
}
28+
29+
fn build_vss_record(&self, user_token: String, store_id: String, kv: KeyValue) -> VssDbRecord {
30+
let now = Utc::now();
31+
VssDbRecord {
32+
user_token,
33+
store_id,
34+
key: kv.key,
35+
value: kv.value.to_vec(),
36+
version: kv.version,
37+
created_at: now,
38+
last_updated_at: now,
39+
}
40+
}
41+
42+
fn build_key(user_token: &str, store_id: &str, key: &str) -> String {
43+
format!("{}#{}#{}", user_token, store_id, key)
44+
}
45+
46+
fn get_current_global_version(
47+
&self, guard: &HashMap<String, VssDbRecord>, user_token: &str, store_id: &str,
48+
) -> i64 {
49+
let global_key = Self::build_key(user_token, store_id, GLOBAL_VERSION_KEY);
50+
guard.get(&global_key).map(|r| r.version).unwrap_or(0)
51+
}
52+
53+
fn set_global_version(
54+
&self, guard: &mut HashMap<String, VssDbRecord>, user_token: String, store_id: String,
55+
new_version: i64,
56+
) {
57+
let global_key = Self::build_key(&user_token, &store_id, GLOBAL_VERSION_KEY);
58+
let now = Utc::now();
59+
let record = VssDbRecord {
60+
user_token,
61+
store_id,
62+
key: GLOBAL_VERSION_KEY.to_string(),
63+
value: vec![],
64+
version: new_version,
65+
created_at: now,
66+
last_updated_at: now,
67+
};
68+
guard.insert(global_key, record);
69+
}
70+
}
71+
72+
fn execute_put_object(
73+
store: &mut HashMap<String, VssDbRecord>, record: VssDbRecord,
74+
) -> Result<(), VssError> {
75+
let key = format!("{}#{}#{}", record.user_token, record.store_id, record.key);
76+
let now = Utc::now();
77+
78+
if let Some(existing) = store.get_mut(&key) {
79+
if existing.version >= record.version {
80+
return Err(VssError::ConflictError(format!(
81+
"Version conflict on put for key {}",
82+
record.key
83+
)));
84+
}
85+
existing.version = record.version;
86+
existing.value = record.value;
87+
existing.last_updated_at = now;
88+
} else {
89+
store.insert(key, record);
90+
}
91+
Ok(())
92+
}
93+
94+
fn execute_delete_object(
95+
store: &mut HashMap<String, VssDbRecord>, record: VssDbRecord,
96+
) -> Result<(), VssError> {
97+
let key = format!("{}#{}#{}", record.user_token, record.store_id, record.key);
98+
if let Some(existing) = store.get(&key) {
99+
if existing.version != record.version {
100+
return Ok(());
101+
}
102+
store.remove(&key);
103+
}
104+
Ok(())
105+
}
106+
107+
#[async_trait]
108+
impl KvStore for InMemoryBackendImpl {
109+
async fn get(
110+
&self, user_token: String, request: GetObjectRequest,
111+
) -> Result<GetObjectResponse, VssError> {
112+
let key = Self::build_key(&user_token, &request.store_id, &request.key);
113+
let guard = self.store.lock().unwrap();
114+
115+
if let Some(record) = guard.get(&key) {
116+
Ok(GetObjectResponse {
117+
value: Some(KeyValue {
118+
key: record.key.clone(),
119+
value: Bytes::from(record.value.clone()),
120+
version: record.version,
121+
}),
122+
})
123+
} else if request.key == GLOBAL_VERSION_KEY {
124+
let current_global =
125+
self.get_current_global_version(&guard, &user_token, &request.store_id);
126+
Ok(GetObjectResponse {
127+
value: Some(KeyValue {
128+
key: GLOBAL_VERSION_KEY.to_string(),
129+
value: Bytes::new(),
130+
version: current_global,
131+
}),
132+
})
133+
} else {
134+
Err(VssError::NoSuchKeyError("Requested key not found.".to_string()))
135+
}
136+
}
137+
138+
async fn put(
139+
&self, user_token: String, request: PutObjectRequest,
140+
) -> Result<PutObjectResponse, VssError> {
141+
if request.transaction_items.len() + request.delete_items.len() > MAX_PUT_REQUEST_ITEM_COUNT
142+
{
143+
return Err(VssError::InvalidRequestError(format!(
144+
"Number of write items per request should be less than equal to {}",
145+
MAX_PUT_REQUEST_ITEM_COUNT
146+
)));
147+
}
148+
149+
let store_id = request.store_id.clone();
150+
let mut guard = self.store.lock().unwrap();
151+
152+
// Handling global_version precondition
153+
let current_global = self.get_current_global_version(&guard, &user_token, &store_id);
154+
if let Some(expected_global) = request.global_version {
155+
if current_global != expected_global {
156+
return Err(VssError::ConflictError(format!(
157+
"Global version conflict: expected {}, current {}",
158+
expected_global, current_global
159+
)));
160+
}
161+
}
162+
163+
// Check for conflicts on puts
164+
for kv in &request.transaction_items {
165+
let key = Self::build_key(&user_token, &store_id, &kv.key);
166+
if let Some(existing) = guard.get(&key) {
167+
if existing.version >= kv.version {
168+
return Err(VssError::ConflictError(format!(
169+
"Version conflict on put for key {}",
170+
kv.key
171+
)));
172+
}
173+
}
174+
}
175+
176+
// Apply updates
177+
let vss_put_records: Vec<VssDbRecord> = request
178+
.transaction_items
179+
.into_iter()
180+
.map(|kv| self.build_vss_record(user_token.clone(), store_id.clone(), kv))
181+
.collect();
182+
183+
let vss_delete_records: Vec<VssDbRecord> = request
184+
.delete_items
185+
.into_iter()
186+
.map(|kv| self.build_vss_record(user_token.clone(), store_id.clone(), kv))
187+
.collect();
188+
189+
let mut mutated = false;
190+
for vss_record in vss_put_records {
191+
execute_put_object(&mut guard, vss_record)?;
192+
mutated = true;
193+
}
194+
for vss_record in vss_delete_records {
195+
// execute_delete_object is no-op on version mismatch, but call anyway
196+
execute_delete_object(&mut guard, vss_record)?;
197+
mutated = true;
198+
}
199+
200+
if mutated || request.global_version.is_some() {
201+
let new_global = current_global + 1;
202+
self.set_global_version(&mut guard, user_token.clone(), store_id.clone(), new_global);
203+
}
204+
205+
Ok(PutObjectResponse {})
206+
}
207+
208+
async fn delete(
209+
&self, user_token: String, request: DeleteObjectRequest,
210+
) -> Result<DeleteObjectResponse, VssError> {
211+
let key_value = request.key_value.ok_or_else(|| {
212+
VssError::InvalidRequestError("key_value missing in DeleteObjectRequest".to_string())
213+
})?;
214+
let store_id = request.store_id.clone();
215+
let mut guard = self.store.lock().unwrap();
216+
217+
let current_global = self.get_current_global_version(&guard, &user_token, &store_id);
218+
219+
// Check for version conflict
220+
let key = Self::build_key(&user_token, &store_id, &key_value.key);
221+
let mutated = if let Some(existing) = guard.get(&key) {
222+
if existing.version != key_value.version {
223+
false
224+
} else {
225+
true
226+
}
227+
} else {
228+
false
229+
};
230+
231+
if mutated {
232+
let vss_record = self.build_vss_record(user_token.clone(), store_id.clone(), key_value);
233+
execute_delete_object(&mut guard, vss_record)?;
234+
let new_global = current_global + 1;
235+
self.set_global_version(&mut guard, user_token.clone(), store_id.clone(), new_global);
236+
}
237+
238+
Ok(DeleteObjectResponse {})
239+
}
240+
241+
async fn list_key_versions(
242+
&self, user_token: String, request: ListKeyVersionsRequest,
243+
) -> Result<ListKeyVersionsResponse, VssError> {
244+
let store_id = request.store_id;
245+
let key_prefix = request.key_prefix.unwrap_or_default();
246+
let page_token = request.page_token.unwrap_or_default();
247+
let page_size = request.page_size.unwrap_or(i32::MAX);
248+
let limit = std::cmp::min(page_size, LIST_KEY_VERSIONS_MAX_PAGE_SIZE) as usize;
249+
250+
let mut global_version = None;
251+
if page_token.is_empty() {
252+
let get_global_version_request = GetObjectRequest {
253+
store_id: store_id.clone(),
254+
key: GLOBAL_VERSION_KEY.to_string(),
255+
};
256+
let get_response = self.get(user_token.clone(), get_global_version_request).await?;
257+
global_version = Some(get_response.value.unwrap().version);
258+
}
259+
260+
let key_versions: Vec<KeyValue> = {
261+
let guard = self.store.lock().unwrap();
262+
let mut key_versions: Vec<KeyValue> = guard
263+
.iter()
264+
.filter(|(k, _)| {
265+
let parts: Vec<&str> = k.split('#').collect();
266+
if parts.len() < 3 {
267+
return false;
268+
}
269+
parts[0] == user_token.as_str()
270+
&& parts[1] == store_id.as_str()
271+
&& parts[2].starts_with(&key_prefix)
272+
&& parts[2] > page_token.as_str()
273+
&& parts[2] != GLOBAL_VERSION_KEY
274+
})
275+
.map(|(_, record)| KeyValue {
276+
key: record.key.clone(),
277+
value: Bytes::new(),
278+
version: record.version,
279+
})
280+
.collect();
281+
282+
key_versions.sort_by(|a, b| a.key.cmp(&b.key));
283+
key_versions.into_iter().take(limit).collect()
284+
};
285+
286+
let next_page_token = if key_versions.len() == limit {
287+
key_versions.last().map(|kv| kv.key.clone())
288+
} else {
289+
None
290+
};
291+
292+
Ok(ListKeyVersionsResponse { key_versions, next_page_token, global_version })
293+
}
294+
}
295+
296+
#[cfg(test)]
297+
mod tests {
298+
use super::*;
299+
use api::kv_store::INITIAL_RECORD_VERSION;
300+
use bytes::Bytes;
301+
use tokio::test;
302+
303+
#[test]
304+
async fn test_in_memory_crud() {
305+
let store = InMemoryBackendImpl::new();
306+
let user_token = "test_user".to_string();
307+
let store_id = "test_store".to_string();
308+
309+
// Put
310+
let put_request = PutObjectRequest {
311+
store_id: store_id.clone(),
312+
transaction_items: vec![KeyValue {
313+
key: "key1".to_string(),
314+
value: Bytes::from("value1"),
315+
version: INITIAL_RECORD_VERSION as i64,
316+
}],
317+
delete_items: vec![],
318+
global_version: None,
319+
};
320+
store.put(user_token.clone(), put_request).await.unwrap();
321+
322+
// Get
323+
let get_request = GetObjectRequest { store_id: store_id.clone(), key: "key1".to_string() };
324+
let response = store.get(user_token.clone(), get_request).await.unwrap();
325+
let key_value = response.value.unwrap();
326+
assert_eq!(key_value.value, Bytes::from("value1"));
327+
let current_version = key_value.version;
328+
329+
// List
330+
let list_request = ListKeyVersionsRequest {
331+
store_id: store_id.clone(),
332+
key_prefix: None,
333+
page_size: Some(1),
334+
page_token: None,
335+
};
336+
let response = store.list_key_versions(user_token.clone(), list_request).await.unwrap();
337+
assert_eq!(response.key_versions.len(), 1);
338+
assert_eq!(response.key_versions[0].key, "key1");
339+
340+
// Delete
341+
let delete_request = DeleteObjectRequest {
342+
store_id: store_id.clone(),
343+
key_value: Some(KeyValue {
344+
key: "key1".to_string(),
345+
value: Bytes::new(),
346+
version: current_version,
347+
}),
348+
};
349+
store.delete(user_token.clone(), delete_request).await.unwrap();
350+
}
351+
}

rust/impls/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,11 @@
1111
#![deny(rustdoc::private_intra_doc_links)]
1212
#![deny(missing_docs)]
1313

14+
/// Contains in-memory backend implementation for VSS, for testing purposes only.
15+
pub mod in_memory_store;
1416
mod migrations;
1517
/// Contains [PostgreSQL](https://www.postgresql.org/) based backend implementation for VSS.
1618
pub mod postgres_store;
17-
/// Contains in-memory backend implementation for VSS, for testing purposes only.
18-
pub mod memory_store;
1919

2020
#[macro_use]
2121
extern crate api;

0 commit comments

Comments
 (0)