From 0bf37eba1b47c5d24edcaa354efc5a75be74fd8f Mon Sep 17 00:00:00 2001 From: Harsh Dev Pathak Date: Sun, 26 Oct 2025 07:11:41 +0530 Subject: [PATCH 1/4] chore: formatted and pub(crate) visibility --- rust/impls/src/migrations.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/impls/src/migrations.rs b/rust/impls/src/migrations.rs index bab951b..02d3f54 100644 --- a/rust/impls/src/migrations.rs +++ b/rust/impls/src/migrations.rs @@ -5,7 +5,7 @@ pub(crate) const MIGRATION_LOG_COLUMN: &str = "upgrade_from"; pub(crate) const CHECK_DB_STMT: &str = "SELECT 1 FROM pg_database WHERE datname = $1"; pub(crate) const INIT_DB_CMD: &str = "CREATE DATABASE"; #[cfg(test)] -const DROP_DB_CMD: &str = "DROP DATABASE"; +pub(crate) const DROP_DB_CMD: &str = "DROP DATABASE"; pub(crate) const GET_VERSION_STMT: &str = "SELECT db_version FROM vss_db_version;"; pub(crate) const UPDATE_VERSION_STMT: &str = "UPDATE vss_db_version SET db_version=$1;"; pub(crate) const LOG_MIGRATION_STMT: &str = "INSERT INTO vss_db_upgrades VALUES($1);"; @@ -36,4 +36,4 @@ pub(crate) const MIGRATIONS: &[&str] = &[ );", ]; #[cfg(test)] -const DUMMY_MIGRATION: &str = "SELECT 1 WHERE FALSE;"; +pub(crate) const DUMMY_MIGRATION: &str = "SELECT 1 WHERE FALSE;"; From 02a1b4e787d94251aed87b915bc4a27a8b3fe727 Mon Sep 17 00:00:00 2001 From: Harsh Dev Pathak Date: Sun, 26 Oct 2025 07:13:41 +0530 Subject: [PATCH 2/4] feat: Added in-memory storage for testing purposes --- rust/README.md | 5 + rust/impls/src/in_memory_store.rs | 414 +++++++++++++++++++++++++++++ rust/impls/src/lib.rs | 2 + rust/server/src/main.rs | 51 +++- rust/server/src/util/config.rs | 1 + rust/server/vss-server-config.toml | 1 + 6 files changed, 462 insertions(+), 12 deletions(-) create mode 100644 rust/impls/src/in_memory_store.rs diff --git a/rust/README.md b/rust/README.md index 7b9b96c..2dab9a1 100644 --- a/rust/README.md +++ b/rust/README.md @@ -24,6 +24,11 @@ cargo build --release ``` cargo run -- server/vss-server-config.toml ``` + + **Note:** For testing purposes, you can pass `--in-memory` to use in-memory instead of PostgreSQL + ``` + cargo run -- server/vss-server-config.toml --in-memory + ``` 4. VSS endpoint should be reachable at `http://localhost:8080/vss`. ### Configuration diff --git a/rust/impls/src/in_memory_store.rs b/rust/impls/src/in_memory_store.rs new file mode 100644 index 0000000..e5af76a --- /dev/null +++ b/rust/impls/src/in_memory_store.rs @@ -0,0 +1,414 @@ +use crate::postgres_store::{ + VssDbRecord, LIST_KEY_VERSIONS_MAX_PAGE_SIZE, MAX_PUT_REQUEST_ITEM_COUNT, +}; +use api::error::VssError; +use api::kv_store::{KvStore, GLOBAL_VERSION_KEY, INITIAL_RECORD_VERSION}; +use api::types::{ + DeleteObjectRequest, DeleteObjectResponse, GetObjectRequest, GetObjectResponse, KeyValue, + ListKeyVersionsRequest, ListKeyVersionsResponse, PutObjectRequest, PutObjectResponse, +}; +use async_trait::async_trait; +use bytes::Bytes; +use chrono::prelude::Utc; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::RwLock; + +fn build_storage_key(user_token: &str, store_id: &str, key: &str) -> String { + format!("{}#{}#{}", user_token, store_id, key) +} + +/// In-memory implementation of the VSS Store. +pub struct InMemoryBackendImpl { + store: Arc>>, +} + +impl InMemoryBackendImpl { + /// Creates an in-memory instance. + pub fn new() -> Self { + Self { store: Arc::new(RwLock::new(HashMap::new())) } + } + + fn get_current_global_version( + &self, guard: &HashMap, user_token: &str, store_id: &str, + ) -> i64 { + let global_key = build_storage_key(user_token, store_id, GLOBAL_VERSION_KEY); + guard.get(&global_key).map(|r| r.version).unwrap_or(0) + } +} + +// Validation functions - check if operations can succeed without modifying data +fn validate_put_operation( + store: &HashMap, user_token: &str, store_id: &str, key_value: &KeyValue, +) -> Result<(), VssError> { + let key = build_storage_key(user_token, store_id, &key_value.key); + + if key_value.version == -1 { + // Non-conditional upsert always succeeds + Ok(()) + } else if key_value.version == 0 { + if store.contains_key(&key) { + Err(VssError::ConflictError(format!( + "Key {} already exists for conditional insert", + key_value.key + ))) + } else { + Ok(()) + } + } else { + if let Some(existing) = store.get(&key) { + if existing.version == key_value.version { + Ok(()) + } else { + Err(VssError::ConflictError(format!( + "Version mismatch for key {}: expected {}, found {}", + key_value.key, key_value.version, existing.version + ))) + } + } else { + Err(VssError::ConflictError(format!( + "Key {} does not exist for conditional update", + key_value.key + ))) + } + } +} + +fn validate_delete_operation( + store: &HashMap, user_token: &str, store_id: &str, key_value: &KeyValue, +) -> Result<(), VssError> { + let key = build_storage_key(user_token, store_id, &key_value.key); + + if key_value.version == -1 { + // Non-conditional delete always succeeds + Ok(()) + } else { + if let Some(existing) = store.get(&key) { + if existing.version == key_value.version { + Ok(()) + } else { + Err(VssError::ConflictError(format!( + "Version mismatch for delete key {}: expected {}, found {}", + key_value.key, key_value.version, existing.version + ))) + } + } else { + Err(VssError::ConflictError(format!( + "Key {} does not exist for conditional delete", + key_value.key + ))) + } + } +} + +fn execute_non_conditional_upsert( + store: &mut HashMap, user_token: &str, store_id: &str, key_value: KeyValue, +) { + let key = build_storage_key(user_token, store_id, &key_value.key); + let now = Utc::now(); + + match store.entry(key) { + std::collections::hash_map::Entry::Occupied(mut occ) => { + let existing = occ.get_mut(); + existing.version = INITIAL_RECORD_VERSION as i64; + existing.value = key_value.value.to_vec(); + existing.last_updated_at = now; + }, + std::collections::hash_map::Entry::Vacant(vac) => { + let new_record = VssDbRecord { + user_token: user_token.to_string(), + store_id: store_id.to_string(), + key: key_value.key, + value: key_value.value.to_vec(), + version: INITIAL_RECORD_VERSION as i64, + created_at: now, + last_updated_at: now, + }; + vac.insert(new_record); + }, + } +} + +fn execute_conditional_insert( + store: &mut HashMap, user_token: &str, store_id: &str, key_value: KeyValue, +) { + let key = build_storage_key(user_token, store_id, &key_value.key); + let now = Utc::now(); + + let new_record = VssDbRecord { + user_token: user_token.to_string(), + store_id: store_id.to_string(), + key: key_value.key, + value: key_value.value.to_vec(), + version: INITIAL_RECORD_VERSION as i64, + created_at: now, + last_updated_at: now, + }; + store.insert(key, new_record); +} + +fn execute_conditional_update( + store: &mut HashMap, user_token: &str, store_id: &str, key_value: KeyValue, +) { + let key = build_storage_key(user_token, store_id, &key_value.key); + let now = Utc::now(); + + if let Some(existing) = store.get_mut(&key) { + existing.version = key_value.version.saturating_add(1); + existing.value = key_value.value.to_vec(); + existing.last_updated_at = now; + } +} + +fn execute_put_object( + store: &mut HashMap, user_token: &str, store_id: &str, key_value: KeyValue, +) { + if key_value.version == -1 { + execute_non_conditional_upsert(store, user_token, store_id, key_value); + } else if key_value.version == 0 { + execute_conditional_insert(store, user_token, store_id, key_value); + } else { + execute_conditional_update(store, user_token, store_id, key_value); + } +} + +fn execute_delete_object( + store: &mut HashMap, user_token: &str, store_id: &str, + key_value: &KeyValue, +) { + let key = build_storage_key(user_token, store_id, &key_value.key); + store.remove(&key); +} + +#[async_trait] +impl KvStore for InMemoryBackendImpl { + async fn get( + &self, user_token: String, request: GetObjectRequest, + ) -> Result { + let key = build_storage_key(&user_token, &request.store_id, &request.key); + let guard = self.store.read().await; + + if let Some(record) = guard.get(&key) { + Ok(GetObjectResponse { + value: Some(KeyValue { + key: record.key.clone(), + value: Bytes::from(record.value.clone()), + version: record.version, + }), + }) + } else if request.key == GLOBAL_VERSION_KEY { + Ok(GetObjectResponse { + value: Some(KeyValue { + key: GLOBAL_VERSION_KEY.to_string(), + value: Bytes::new(), + version: 0, + }), + }) + } else { + Err(VssError::NoSuchKeyError("Requested key not found.".to_string())) + } + } + + async fn put( + &self, user_token: String, request: PutObjectRequest, + ) -> Result { + if request.transaction_items.len() + request.delete_items.len() > MAX_PUT_REQUEST_ITEM_COUNT + { + return Err(VssError::InvalidRequestError(format!( + "Number of write items per request should be less than equal to {}", + MAX_PUT_REQUEST_ITEM_COUNT + ))); + } + + let store_id = request.store_id.clone(); + let mut guard = self.store.write().await; + + if let Some(version) = request.global_version { + validate_put_operation( + &guard, + &user_token, + &store_id, + &KeyValue { key: GLOBAL_VERSION_KEY.to_string(), value: Bytes::new(), version }, + )?; + } + + for key_value in &request.transaction_items { + validate_put_operation(&guard, &user_token, &store_id, key_value)?; + } + + for key_value in &request.delete_items { + validate_delete_operation(&guard, &user_token, &store_id, key_value)?; + } + + for key_value in request.transaction_items { + execute_put_object(&mut guard, &user_token, &store_id, key_value); + } + + for key_value in &request.delete_items { + execute_delete_object(&mut guard, &user_token, &store_id, key_value); + } + + if let Some(version) = request.global_version { + execute_put_object( + &mut guard, + &user_token, + &store_id, + KeyValue { key: GLOBAL_VERSION_KEY.to_string(), value: Bytes::new(), version }, + ); + } + + Ok(PutObjectResponse {}) + } + + async fn delete( + &self, user_token: String, request: DeleteObjectRequest, + ) -> Result { + let key_value = request.key_value.ok_or_else(|| { + VssError::InvalidRequestError("key_value missing in DeleteObjectRequest".to_string()) + })?; + + let store_id = request.store_id.clone(); + let mut guard = self.store.write().await; + + execute_delete_object(&mut guard, &user_token, &store_id, &key_value); + + Ok(DeleteObjectResponse {}) + } + + async fn list_key_versions( + &self, user_token: String, request: ListKeyVersionsRequest, + ) -> Result { + let store_id = request.store_id; + let key_prefix = request.key_prefix.unwrap_or("".to_string()); + let page_token_option = request.page_token; + let page_size = request.page_size.unwrap_or(i32::MAX); + let limit = std::cmp::min(page_size, LIST_KEY_VERSIONS_MAX_PAGE_SIZE) as usize; + + let (keys_with_versions, global_version) = { + let guard = self.store.read().await; + + let mut global_version: Option = None; + if page_token_option.is_none() { + global_version = + Some(self.get_current_global_version(&guard, &user_token, &store_id)); + } + + let storage_prefix = format!("{}#{}#", user_token, store_id); + let mut temp: Vec<(String, i64)> = Vec::new(); + + for (storage_key, r) in guard.iter() { + if !storage_key.starts_with(&storage_prefix) { + continue; + } + let key = &storage_key[storage_prefix.len()..]; + if key == GLOBAL_VERSION_KEY { + continue; + } + if !key_prefix.is_empty() && !key.starts_with(&key_prefix) { + continue; + } + temp.push((key.to_string(), r.version)); + } + + (temp, global_version) + }; + + let mut keys_with_versions = keys_with_versions; + keys_with_versions.sort_by(|a, b| a.0.cmp(&b.0)); + + let start_idx = if page_token_option.is_none() { + 0 + } else if page_token_option.as_deref() == Some("") { + keys_with_versions.len() + } else { + let token = page_token_option.as_deref().unwrap(); + keys_with_versions + .iter() + .position(|(k, _)| k.as_str() > token) + .unwrap_or(keys_with_versions.len()) + }; + + let page_items: Vec = keys_with_versions + .iter() + .skip(start_idx) + .take(limit) + .map(|(key, version)| KeyValue { + key: key.clone(), + value: Bytes::new(), + version: *version, + }) + .collect(); + + let next_page_token = if page_items.is_empty() { + Some("".to_string()) + } else { + page_items.last().map(|kv| kv.key.clone()) + }; + + Ok(ListKeyVersionsResponse { key_versions: page_items, next_page_token, global_version }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use api::define_kv_store_tests; + use api::types::{GetObjectRequest, KeyValue, PutObjectRequest}; + use bytes::Bytes; + use tokio::test; + + define_kv_store_tests!(InMemoryKvStoreTest, InMemoryBackendImpl, InMemoryBackendImpl::new()); + + #[test] + async fn test_in_memory_crud() { + let store = InMemoryBackendImpl::new(); + let user_token = "test_user".to_string(); + let store_id = "test_store".to_string(); + + let put_request = PutObjectRequest { + store_id: store_id.clone(), + transaction_items: vec![KeyValue { + key: "key1".to_string(), + value: Bytes::from("value1"), + version: 0, + }], + delete_items: vec![], + global_version: None, + }; + store.put(user_token.clone(), put_request).await.unwrap(); + + let get_request = GetObjectRequest { store_id: store_id.clone(), key: "key1".to_string() }; + let response = store.get(user_token.clone(), get_request).await.unwrap(); + let key_value = response.value.unwrap(); + assert_eq!(key_value.value, Bytes::from("value1")); + assert_eq!(key_value.version, 1, "Expected version 1 after put"); + + let list_request = ListKeyVersionsRequest { + store_id: store_id.clone(), + key_prefix: None, + page_size: Some(1), + page_token: None, + }; + let response = store.list_key_versions(user_token.clone(), list_request).await.unwrap(); + assert_eq!(response.key_versions.len(), 1); + assert_eq!(response.key_versions[0].key, "key1"); + assert_eq!(response.key_versions[0].version, 1); + + let delete_request = DeleteObjectRequest { + store_id: store_id.clone(), + key_value: Some(KeyValue { key: "key1".to_string(), value: Bytes::new(), version: 1 }), + }; + store.delete(user_token.clone(), delete_request).await.unwrap(); + + let get_request = GetObjectRequest { store_id: store_id.clone(), key: "key1".to_string() }; + assert!(matches!( + store.get(user_token.clone(), get_request).await, + Err(VssError::NoSuchKeyError(_)) + )); + + let global_request = + GetObjectRequest { store_id: store_id.clone(), key: GLOBAL_VERSION_KEY.to_string() }; + let response = store.get(user_token.clone(), global_request).await.unwrap(); + assert_eq!(response.value.unwrap().version, 0, "Expected global_version=0"); + } +} diff --git a/rust/impls/src/lib.rs b/rust/impls/src/lib.rs index 27844d0..1aaf4d0 100644 --- a/rust/impls/src/lib.rs +++ b/rust/impls/src/lib.rs @@ -11,6 +11,8 @@ #![deny(rustdoc::private_intra_doc_links)] #![deny(missing_docs)] +/// Contains in-memory backend implementation for VSS, for testing purposes only. +pub mod in_memory_store; mod migrations; /// Contains [PostgreSQL](https://www.postgresql.org/) based backend implementation for VSS. pub mod postgres_store; diff --git a/rust/server/src/main.rs b/rust/server/src/main.rs index 5a78be6..6279580 100644 --- a/rust/server/src/main.rs +++ b/rust/server/src/main.rs @@ -20,6 +20,7 @@ use hyper_util::rt::TokioIo; use crate::vss_service::VssService; use api::auth::{Authorizer, NoopAuthorizer}; use api::kv_store::KvStore; +use impls::in_memory_store::InMemoryBackendImpl; use impls::postgres_store::PostgresBackendImpl; use std::sync::Arc; @@ -28,12 +29,15 @@ pub(crate) mod vss_service; fn main() { let args: Vec = std::env::args().collect(); - if args.len() != 2 { - eprintln!("Usage: {} ", args[0]); + if args.len() < 2 { + eprintln!("Usage: {} [--in-memory]", args[0]); std::process::exit(1); } - let config = match util::config::load_config(&args[1]) { + let config_path = &args[1]; + let use_in_memory = args.contains(&"--in-memory".to_string()); + + let mut config = match util::config::load_config(config_path) { Ok(cfg) => cfg, Err(e) => { eprintln!("Failed to load configuration: {}", e); @@ -41,6 +45,12 @@ fn main() { }, }; + // Override the `store_type` if --in-memory flag passed + if use_in_memory { + println!("Overriding backend type: using in-memory backend (via --in-memory flag)"); + config.server_config.store_type = "in_memory".to_string(); + } + let addr: SocketAddr = match format!("{}:{}", config.server_config.host, config.server_config.port).parse() { Ok(addr) => addr, @@ -67,15 +77,32 @@ fn main() { }, }; let authorizer = Arc::new(NoopAuthorizer {}); - let postgresql_config = config.postgresql_config.expect("PostgreSQLConfig must be defined in config file."); - let endpoint = postgresql_config.to_postgresql_endpoint(); - let db_name = postgresql_config.database; - let store = Arc::new( - PostgresBackendImpl::new(&endpoint, &db_name) - .await - .unwrap(), - ); - println!("Connected to PostgreSQL backend with DSN: {}/{}", endpoint, db_name); + let store: Arc = match config.server_config.store_type.as_str() { + "postgres" => { + let pg_config = config.postgresql_config + .expect("PostgreSQL configuration required for postgres backend"); + let endpoint = pg_config.to_postgresql_endpoint(); + let db_name = pg_config.database; + match PostgresBackendImpl::new(&endpoint, &db_name).await { + Ok(backend) => { + println!("Connected to PostgreSQL backend with DSN: {}/{}", endpoint, db_name); + Arc::new(backend) + }, + Err(e) => { + eprintln!("Failed to connect to PostgreSQL backend: {}", e); + std::process::exit(1); + }, + } + }, + "in_memory" => { + println!("Using in-memory backend for testing"); + Arc::new(InMemoryBackendImpl::new()) + }, + _ => { + eprintln!("Invalid backend_type: {}. Must be 'postgres' or 'in_memory'", config.server_config.store_type); + std::process::exit(1); + }, + }; let rest_svc_listener = TcpListener::bind(&addr).await.expect("Failed to bind listening port"); println!("Listening for incoming connections on {}", addr); diff --git a/rust/server/src/util/config.rs b/rust/server/src/util/config.rs index cf70daf..87b0aa0 100644 --- a/rust/server/src/util/config.rs +++ b/rust/server/src/util/config.rs @@ -10,6 +10,7 @@ pub(crate) struct Config { pub(crate) struct ServerConfig { pub(crate) host: String, pub(crate) port: u16, + pub(crate) store_type: String, // "postgresql" or "in_memory" } #[derive(Deserialize)] diff --git a/rust/server/vss-server-config.toml b/rust/server/vss-server-config.toml index 8a013b5..b030851 100644 --- a/rust/server/vss-server-config.toml +++ b/rust/server/vss-server-config.toml @@ -1,6 +1,7 @@ [server_config] host = "127.0.0.1" port = 8080 +store_type = "postgres" # "postgres" for using postgresql and "in_memory" for testing purpooses [postgresql_config] username = "postgres" # Optional in TOML, can be overridden by env var `VSS_POSTGRESQL_USERNAME` From c2a140ed830e7fe07f6d452cbefa6fe84e9b9a2d Mon Sep 17 00:00:00 2001 From: Harsh Dev Pathak Date: Wed, 12 Nov 2025 09:29:07 +0530 Subject: [PATCH 3/4] feat: Added CI for in_memory_testing --- .../workflows/build-and-test-in-memory.yml | 68 ++++++++++++ .github/workflows/ldk-node-integration.yml | 104 ++++++++++++++++-- rust/server/Cargo.toml | 4 + 3 files changed, 166 insertions(+), 10 deletions(-) create mode 100644 .github/workflows/build-and-test-in-memory.yml diff --git a/.github/workflows/build-and-test-in-memory.yml b/.github/workflows/build-and-test-in-memory.yml new file mode 100644 index 0000000..007a52d --- /dev/null +++ b/.github/workflows/build-and-test-in-memory.yml @@ -0,0 +1,68 @@ +name: In-Memory VSS Server CI + +on: + push: + branches: [ main ] + pull_request: + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + test-in-memory: + runs-on: ubuntu-latest + timeout-minutes: 6 + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Create in-memory config + run: | + mkdir -p rust/server + cat > rust/server/vss-server-config.toml < server.log 2>&1 & + echo "Server PID: $!" + + - name: Wait for server + run: | + for i in {1..15}; do + if curl -s http://127.0.0.1:8080 > /dev/null; then + echo "Server is up!" + exit 0 + fi + sleep 1 + done + echo "Server failed. Dumping log:" + cat rust/server.log + exit 1 + + - name: HTTP Smoke Test + run: | + curl -f \ + -H "Authorization: Bearer test_user" \ + --data-binary @<(echo "0A04746573741A150A026B3110FFFFFFFFFFFFFFFFFF011A046B317631" | xxd -r -p) \ + http://127.0.0.1:8080/vss/putObjects + + RESPONSE=$(curl -f \ + -H "Authorization: Bearer test_user" \ + --data-binary @<(echo "0A047465737412026B31" | xxd -r -p) \ + http://127.0.0.1:8080/vss/getObject) + + - name: Run In-Memory unit tests + working-directory: rust + run: cargo test --package impls --lib -- in_memory_store::tests --nocapture \ No newline at end of file diff --git a/.github/workflows/ldk-node-integration.yml b/.github/workflows/ldk-node-integration.yml index f6bcf7d..abcc083 100644 --- a/.github/workflows/ldk-node-integration.yml +++ b/.github/workflows/ldk-node-integration.yml @@ -7,14 +7,13 @@ concurrency: cancel-in-progress: true jobs: - build-and-test: + test-postgres: runs-on: ubuntu-latest - + timeout-minutes: 30 services: postgres: image: postgres:latest - ports: - - 5432:5432 + ports: [5432:5432] env: POSTGRES_DB: postgres POSTGRES_USER: postgres @@ -30,20 +29,105 @@ jobs: uses: actions/checkout@v3 with: path: vss-server + - name: Checkout LDK Node uses: actions/checkout@v3 with: repository: lightningdevkit/ldk-node path: ldk-node - - name: Build and Deploy VSS Server + - name: Create Postgres config + run: | + mkdir -p vss-server/rust/server + cat > vss-server/rust/server/vss-server-config.toml < server.log 2>&1 & + echo "Server PID: $!" + + - name: Wait for VSS + run: | + for i in {1..30}; do + if curl -s http://127.0.0.1:8080/vss > /dev/null; then + echo "VSS ready" + exit 0 + fi + sleep 2 + done + echo "VSS failed:" + cat vss-server/rust/vss.log + exit 1 + - name: Run LDK Node Integration tests + working-directory: ldk-node run: | - cd ldk-node - export TEST_VSS_BASE_URL="http://localhost:8080/vss" + export TEST_VSS_BASE_URL="http://127.0.0.1:8080/vss" RUSTFLAGS="--cfg vss_test" cargo test io::vss_store RUSTFLAGS="--cfg vss_test" cargo test --test integration_tests_vss + + test-in-memory: + runs-on: ubuntu-latest + timeout-minutes: 30 + + steps: + - name: Checkout code + uses: actions/checkout@v3 + with: + path: vss-server + + - name: Checkout LDK Node + uses: actions/checkout@v3 + with: + repository: lightningdevkit/ldk-node + path: ldk-node + + - name: Create In-Memory config + run: | + mkdir -p vss-server/rust/server + cat > vss-server/rust/server/vss-server-config.toml < server.log 2>&1 & + echo "Server PID: $!" + + - name: Wait for VSS + run: | + for i in {1..30}; do + if curl -s http://127.0.0.1:8080/vss > /dev/null; then + echo "VSS ready" + exit 0 + fi + sleep 1 + done + echo "VSS failed:" + cat vss-server/rust/vss.log + exit 1 + + - name: Run LDK Node Integration tests + working-directory: ldk-node + run: | + export TEST_VSS_BASE_URL="http://127.0.0.1:8080/vss" + RUSTFLAGS="--cfg vss_test" cargo test io::vss_store + RUSTFLAGS="--cfg vss_test" cargo test --test integration_tests_vss \ No newline at end of file diff --git a/rust/server/Cargo.toml b/rust/server/Cargo.toml index 2a0e6f1..5b26e80 100644 --- a/rust/server/Cargo.toml +++ b/rust/server/Cargo.toml @@ -15,3 +15,7 @@ prost = { version = "0.11.6", default-features = false, features = ["std"] } bytes = "1.4.0" serde = { version = "1.0.203", default-features = false, features = ["derive"] } toml = { version = "0.8.9", default-features = false, features = ["parse"] } + +[[bin]] +name = "vss-server" +path = "src/main.rs" \ No newline at end of file From df90b7cb6987ffb0b580cdf28272c294d9dd8255 Mon Sep 17 00:00:00 2001 From: Harsh Dev Pathak Date: Fri, 14 Nov 2025 16:40:09 +0530 Subject: [PATCH 4/4] chore: Improved in_memory impl --- rust/impls/src/in_memory_store.rs | 171 ++++++++++------------------- rust/server/src/main.rs | 2 - rust/server/vss-server-config.toml | 2 +- 3 files changed, 60 insertions(+), 115 deletions(-) diff --git a/rust/impls/src/in_memory_store.rs b/rust/impls/src/in_memory_store.rs index e5af76a..5d09f64 100644 --- a/rust/impls/src/in_memory_store.rs +++ b/rust/impls/src/in_memory_store.rs @@ -10,9 +10,9 @@ use api::types::{ use async_trait::async_trait; use bytes::Bytes; use chrono::prelude::Utc; -use std::collections::HashMap; +use std::collections::BTreeMap; use std::sync::Arc; -use tokio::sync::RwLock; +use tokio::sync::Mutex; fn build_storage_key(user_token: &str, store_id: &str, key: &str) -> String { format!("{}#{}#{}", user_token, store_id, key) @@ -20,31 +20,29 @@ fn build_storage_key(user_token: &str, store_id: &str, key: &str) -> String { /// In-memory implementation of the VSS Store. pub struct InMemoryBackendImpl { - store: Arc>>, + store: Arc>>, } impl InMemoryBackendImpl { /// Creates an in-memory instance. pub fn new() -> Self { - Self { store: Arc::new(RwLock::new(HashMap::new())) } + Self { store: Arc::new(Mutex::new(BTreeMap::new())) } } fn get_current_global_version( - &self, guard: &HashMap, user_token: &str, store_id: &str, + &self, guard: &BTreeMap, user_token: &str, store_id: &str, ) -> i64 { let global_key = build_storage_key(user_token, store_id, GLOBAL_VERSION_KEY); guard.get(&global_key).map(|r| r.version).unwrap_or(0) } } -// Validation functions - check if operations can succeed without modifying data fn validate_put_operation( - store: &HashMap, user_token: &str, store_id: &str, key_value: &KeyValue, + store: &BTreeMap, user_token: &str, store_id: &str, key_value: &KeyValue, ) -> Result<(), VssError> { let key = build_storage_key(user_token, store_id, &key_value.key); if key_value.version == -1 { - // Non-conditional upsert always succeeds Ok(()) } else if key_value.version == 0 { if store.contains_key(&key) { @@ -75,12 +73,11 @@ fn validate_put_operation( } fn validate_delete_operation( - store: &HashMap, user_token: &str, store_id: &str, key_value: &KeyValue, + store: &BTreeMap, user_token: &str, store_id: &str, key_value: &KeyValue, ) -> Result<(), VssError> { let key = build_storage_key(user_token, store_id, &key_value.key); if key_value.version == -1 { - // Non-conditional delete always succeeds Ok(()) } else { if let Some(existing) = store.get(&key) { @@ -101,20 +98,25 @@ fn validate_delete_operation( } } -fn execute_non_conditional_upsert( - store: &mut HashMap, user_token: &str, store_id: &str, key_value: KeyValue, +fn execute_put_object( + store: &mut BTreeMap, user_token: &str, store_id: &str, + key_value: KeyValue, ) { let key = build_storage_key(user_token, store_id, &key_value.key); let now = Utc::now(); match store.entry(key) { - std::collections::hash_map::Entry::Occupied(mut occ) => { + std::collections::btree_map::Entry::Occupied(mut occ) => { let existing = occ.get_mut(); - existing.version = INITIAL_RECORD_VERSION as i64; + existing.version = if key_value.version == -1 { + INITIAL_RECORD_VERSION as i64 + } else { + existing.version.saturating_add(1) + }; existing.value = key_value.value.to_vec(); existing.last_updated_at = now; }, - std::collections::hash_map::Entry::Vacant(vac) => { + std::collections::btree_map::Entry::Vacant(vac) => { let new_record = VssDbRecord { user_token: user_token.to_string(), store_id: store_id.to_string(), @@ -129,51 +131,8 @@ fn execute_non_conditional_upsert( } } -fn execute_conditional_insert( - store: &mut HashMap, user_token: &str, store_id: &str, key_value: KeyValue, -) { - let key = build_storage_key(user_token, store_id, &key_value.key); - let now = Utc::now(); - - let new_record = VssDbRecord { - user_token: user_token.to_string(), - store_id: store_id.to_string(), - key: key_value.key, - value: key_value.value.to_vec(), - version: INITIAL_RECORD_VERSION as i64, - created_at: now, - last_updated_at: now, - }; - store.insert(key, new_record); -} - -fn execute_conditional_update( - store: &mut HashMap, user_token: &str, store_id: &str, key_value: KeyValue, -) { - let key = build_storage_key(user_token, store_id, &key_value.key); - let now = Utc::now(); - - if let Some(existing) = store.get_mut(&key) { - existing.version = key_value.version.saturating_add(1); - existing.value = key_value.value.to_vec(); - existing.last_updated_at = now; - } -} - -fn execute_put_object( - store: &mut HashMap, user_token: &str, store_id: &str, key_value: KeyValue, -) { - if key_value.version == -1 { - execute_non_conditional_upsert(store, user_token, store_id, key_value); - } else if key_value.version == 0 { - execute_conditional_insert(store, user_token, store_id, key_value); - } else { - execute_conditional_update(store, user_token, store_id, key_value); - } -} - fn execute_delete_object( - store: &mut HashMap, user_token: &str, store_id: &str, + store: &mut BTreeMap, user_token: &str, store_id: &str, key_value: &KeyValue, ) { let key = build_storage_key(user_token, store_id, &key_value.key); @@ -186,9 +145,9 @@ impl KvStore for InMemoryBackendImpl { &self, user_token: String, request: GetObjectRequest, ) -> Result { let key = build_storage_key(&user_token, &request.store_id, &request.key); - let guard = self.store.read().await; + let guard = self.store.lock().await; - if let Some(record) = guard.get(&key) { + let result = if let Some(record) = guard.get(&key) { Ok(GetObjectResponse { value: Some(KeyValue { key: record.key.clone(), @@ -197,6 +156,7 @@ impl KvStore for InMemoryBackendImpl { }), }) } else if request.key == GLOBAL_VERSION_KEY { + // Non-zero global version is handled above; this is only for initial version 0. Ok(GetObjectResponse { value: Some(KeyValue { key: GLOBAL_VERSION_KEY.to_string(), @@ -206,7 +166,9 @@ impl KvStore for InMemoryBackendImpl { }) } else { Err(VssError::NoSuchKeyError("Requested key not found.".to_string())) - } + }; + + result } async fn put( @@ -221,7 +183,7 @@ impl KvStore for InMemoryBackendImpl { } let store_id = request.store_id.clone(); - let mut guard = self.store.write().await; + let mut guard = self.store.lock().await; if let Some(version) = request.global_version { validate_put_operation( @@ -268,7 +230,7 @@ impl KvStore for InMemoryBackendImpl { })?; let store_id = request.store_id.clone(); - let mut guard = self.store.write().await; + let mut guard = self.store.lock().await; execute_delete_object(&mut guard, &user_token, &store_id, &key_value); @@ -278,71 +240,56 @@ impl KvStore for InMemoryBackendImpl { async fn list_key_versions( &self, user_token: String, request: ListKeyVersionsRequest, ) -> Result { - let store_id = request.store_id; - let key_prefix = request.key_prefix.unwrap_or("".to_string()); - let page_token_option = request.page_token; + let store_id = request.store_id.clone(); + let key_prefix = request.key_prefix.clone().unwrap_or_default(); let page_size = request.page_size.unwrap_or(i32::MAX); let limit = std::cmp::min(page_size, LIST_KEY_VERSIONS_MAX_PAGE_SIZE) as usize; - let (keys_with_versions, global_version) = { - let guard = self.store.read().await; + let offset: usize = + request.page_token.as_ref().and_then(|s| s.parse::().ok()).unwrap_or(0); - let mut global_version: Option = None; - if page_token_option.is_none() { - global_version = - Some(self.get_current_global_version(&guard, &user_token, &store_id)); - } + let guard = self.store.lock().await; + + let mut global_version: Option = None; + if offset == 0 { + global_version = Some(self.get_current_global_version(&guard, &user_token, &store_id)); + } - let storage_prefix = format!("{}#{}#", user_token, store_id); - let mut temp: Vec<(String, i64)> = Vec::new(); + let storage_prefix = format!("{}#{}#", user_token, store_id); + let prefix_len = storage_prefix.len(); + + let mut all_items: Vec = guard + .iter() + .filter(|(storage_key, _)| storage_key.starts_with(&storage_prefix)) + .filter_map(|(storage_key, record)| { + let key = &storage_key[prefix_len..]; - for (storage_key, r) in guard.iter() { - if !storage_key.starts_with(&storage_prefix) { - continue; - } - let key = &storage_key[storage_prefix.len()..]; if key == GLOBAL_VERSION_KEY { - continue; + return None; } + if !key_prefix.is_empty() && !key.starts_with(&key_prefix) { - continue; + return None; } - temp.push((key.to_string(), r.version)); - } - (temp, global_version) - }; - - let mut keys_with_versions = keys_with_versions; - keys_with_versions.sort_by(|a, b| a.0.cmp(&b.0)); - - let start_idx = if page_token_option.is_none() { - 0 - } else if page_token_option.as_deref() == Some("") { - keys_with_versions.len() - } else { - let token = page_token_option.as_deref().unwrap(); - keys_with_versions - .iter() - .position(|(k, _)| k.as_str() > token) - .unwrap_or(keys_with_versions.len()) - }; - - let page_items: Vec = keys_with_versions - .iter() - .skip(start_idx) - .take(limit) - .map(|(key, version)| KeyValue { - key: key.clone(), - value: Bytes::new(), - version: *version, + Some(KeyValue { + key: key.to_string(), + value: Bytes::new(), + version: record.version, + }) }) .collect(); + all_items.sort_by(|a, b| a.key.cmp(&b.key)); + + let page_items: Vec = + all_items.iter().skip(offset).take(limit).cloned().collect(); + + let next_offset = offset + page_items.len(); let next_page_token = if page_items.is_empty() { Some("".to_string()) } else { - page_items.last().map(|kv| kv.key.clone()) + Some(next_offset.to_string()) }; Ok(ListKeyVersionsResponse { key_versions: page_items, next_page_token, global_version }) diff --git a/rust/server/src/main.rs b/rust/server/src/main.rs index 6279580..0c586a1 100644 --- a/rust/server/src/main.rs +++ b/rust/server/src/main.rs @@ -45,9 +45,7 @@ fn main() { }, }; - // Override the `store_type` if --in-memory flag passed if use_in_memory { - println!("Overriding backend type: using in-memory backend (via --in-memory flag)"); config.server_config.store_type = "in_memory".to_string(); } diff --git a/rust/server/vss-server-config.toml b/rust/server/vss-server-config.toml index b030851..6223d5e 100644 --- a/rust/server/vss-server-config.toml +++ b/rust/server/vss-server-config.toml @@ -1,7 +1,7 @@ [server_config] host = "127.0.0.1" port = 8080 -store_type = "postgres" # "postgres" for using postgresql and "in_memory" for testing purpooses +store_type = "postgres" # "postgres" for using postgresql and "in_memory" for testing purposes [postgresql_config] username = "postgres" # Optional in TOML, can be overridden by env var `VSS_POSTGRESQL_USERNAME`