Skip to content

Commit 780efca

Browse files
committed
Implement grpc server
1 parent 23170e2 commit 780efca

File tree

20 files changed

+1712
-14
lines changed

20 files changed

+1712
-14
lines changed

Cargo.lock

Lines changed: 868 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ members = [
55
"crates/storage",
66
"crates/index",
77
"crates/server",
8+
"crates/grpc_server",
89
]
910

1011
# You can define shared dependencies for all crates here

crates/api/src/lib.rs

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,27 @@ fn generate_point_id() -> u64 {
1919
pub struct VectorDb {
2020
storage: Arc<dyn StorageEngine>,
2121
index: Arc<RwLock<dyn VectorIndex>>, // Using a RwLock instead of Mutex to improve concurrency
22+
dimension: usize,
2223
}
2324

2425
impl VectorDb {
25-
fn _new(storage: Arc<dyn StorageEngine>, index: Arc<RwLock<dyn VectorIndex>>) -> Self {
26-
Self { storage, index }
26+
fn _new(
27+
storage: Arc<dyn StorageEngine>,
28+
index: Arc<RwLock<dyn VectorIndex>>,
29+
dimension: usize,
30+
) -> Self {
31+
Self {
32+
storage,
33+
index,
34+
dimension,
35+
}
2736
}
2837

2938
//TODO: Make this an atomic operation
3039
pub fn insert(&self, vector: DenseVector, payload: Payload) -> Result<PointId, DbError> {
40+
if vector.len() != self.dimension {
41+
return Err(DbError::DimensionMismatch);
42+
}
3143
// Generate a new point id
3244
let point_id = generate_point_id();
3345
self.storage
@@ -44,13 +56,13 @@ impl VectorDb {
4456
}
4557

4658
//TODO: Make this an atomic operation
47-
pub fn delete(&self, id: PointId) -> Result<(), DbError> {
59+
pub fn delete(&self, id: PointId) -> Result<bool, DbError> {
4860
// Remove from storage
4961
self.storage.delete_point(id)?;
5062
// Remove from index
5163
let mut index = self.index.write().map_err(|_| DbError::LockError)?;
52-
index.delete(id)?;
53-
Ok(())
64+
let point_found = index.delete(id)?;
65+
Ok(point_found)
5466
}
5567

5668
pub fn get(&self, id: PointId) -> Result<Option<Point>, DbError> {
@@ -105,7 +117,7 @@ pub fn init_api(config: DbConfig) -> Result<VectorDb, DbError> {
105117
};
106118

107119
// Init the db
108-
let db = VectorDb::_new(storage, index);
120+
let db = VectorDb::_new(storage, index, config.dimension);
109121

110122
Ok(db)
111123
}
@@ -147,6 +159,22 @@ mod tests {
147159
assert_eq!(point.payload.as_ref().unwrap(), &payload);
148160
}
149161

162+
#[test]
163+
fn test_dimension_mismatch() {
164+
let db = create_test_db();
165+
let v1 = vec![1.0, 2.0, 3.0];
166+
let v2 = vec![1.0, 2.0];
167+
let payload = Payload {};
168+
169+
let res1 = db.insert(v1, payload);
170+
assert!(res1.is_ok());
171+
172+
// Insert vector of dimension 2 != 3
173+
let res2 = db.insert(v2, payload);
174+
assert!(res2.is_err());
175+
assert_eq!(res2.unwrap_err(), DbError::DimensionMismatch);
176+
}
177+
150178
#[test]
151179
fn test_delete() {
152180
let db = create_test_db();
@@ -156,6 +184,12 @@ mod tests {
156184
// Insert a point
157185
let id = db.insert(vector, payload).unwrap();
158186

187+
// try deleting a point that does not exist
188+
let found = db.delete(id + 1);
189+
assert!(found.is_ok());
190+
assert!(!found.unwrap());
191+
192+
// delete the point
159193
assert!(db.get(id).unwrap().is_some());
160194
db.delete(id).unwrap();
161195
assert!(db.get(id).unwrap().is_none());

crates/defs/src/error.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,18 @@
1-
#[derive(Debug)]
1+
#[derive(Debug, PartialEq, Eq)]
22
pub enum DbError {
33
ParseError,
44
StorageError(String),
55
SerializationError(String),
66
DeserializationError,
77
IndexError(String),
88
LockError,
9+
DimensionMismatch,
910
}
11+
12+
impl std::fmt::Display for DbError {
13+
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
14+
write!(f, "{:?}", self)
15+
}
16+
}
17+
18+
impl std::error::Error for DbError {}

crates/grpc_server/.sample.env

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
GRPC_SERVER_ROOT_PASSWORD=123 # required
2+
GRPC_SERVER_DIMENSION=3 # required
3+
4+
GRPC_SERVER_HOST=localhost # defaults to 127.0.0.1 aka localhost
5+
GRPC_SERVER_PORT=8080 # defaults to 8080
6+
GRPC_SERVER_STORAGE_TYPE=inmemory # (inmemory/rocksdb) defaults to 'inmemory'
7+
GRPC_SERVER_INDEX_TYPE=flat # defaults to flat
8+
GRPC_SERVER_DATA_PATH=data # defaults to a temporary directory
9+
GRPC_SERVER_LOGGING=true # defaults to true

crates/grpc_server/Cargo.toml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
[package]
2+
name = "grpc_server"
3+
version = "0.1.0"
4+
edition = "2024"
5+
6+
[dependencies]
7+
prost = "0.14.1"
8+
tokio = { version = "1.47.1", features = ["macros", "rt-multi-thread"] }
9+
tonic = "0.14.2"
10+
tonic-prost-build = "0.14.2"
11+
12+
api = { path = "../api" }
13+
storage = { path = "../storage" }
14+
index = { path = "../index" }
15+
defs = { path = "../defs" }
16+
17+
tonic-prost = "0.14.2"
18+
prost-types = "0.14.1"
19+
dotenv = "0.15.0"
20+
tempfile = "3.23.0"
21+
tracing = "0.1.41"
22+
tracing-subscriber = "0.3.20"
23+
tokio-stream = "0.1.17"
24+
25+
[build-dependencies]
26+
tonic-build = "0.14.2"
27+
tonic-prost-build = "0.14.2"

crates/grpc_server/README.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
### Build
2+
3+
Clone the repository and run `cargo build --bin grpc_server` to build the binary of the gRPC server crate.
4+
5+
You can than then start the gRPC server by running:
6+
7+
```bash
8+
cargo run --bin grpc_server
9+
```
10+
11+
### Configuration
12+
13+
Use the [.sample.env](.sample.env) shown below as a reference to set your environment variables in a `.env` file.
14+
15+
```bash
16+
GRPC_SERVER_ROOT_PASSWORD=123 # required
17+
GRPC_SERVER_DIMENSION=3 # required
18+
19+
GRPC_SERVER_HOST=localhost # defaults to 127.0.0.1 aka localhost
20+
GRPC_SERVER_PORT=8080 # defaults to 8080
21+
GRPC_SERVER_STORAGE_TYPE=inmemory # (inmemory/rocksdb) defaults to 'inmemory'
22+
GRPC_SERVER_INDEX_TYPE=flat # defaults to flat
23+
GRPC_SERVER_DATA_PATH=data # defaults to a temporary directory
24+
GRPC_SERVER_LOGGING=true # defaults to true
25+
26+
27+
```
28+
29+
30+
### Testing
31+
32+
The [vector-db.proto](proto/vector-db.proto) can be imported into any gRPC client.

crates/grpc_server/build.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
fn main() -> Result<(), Box<dyn std::error::Error>> {
2+
tonic_prost_build::compile_protos("proto/vector-db.proto")?;
3+
Ok(())
4+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
syntax = "proto3";
2+
3+
package vectordb;
4+
5+
import "google/protobuf/struct.proto";
6+
import "google/protobuf/empty.proto";
7+
8+
9+
service VectorDB {
10+
//Insert a vector with a payload and return the assigned PointID
11+
rpc InsertVector(InsertVectorRequest) returns (PointID) {}
12+
13+
//Delete a vector by its PointID
14+
rpc DeletePoint(PointID) returns (google.protobuf.Empty) {}
15+
16+
//Get a vector and its payload by PointID
17+
rpc GetPoint(PointID) returns (Point) {}
18+
19+
//Search for the k nearest vectors to a target vector given a distance function
20+
rpc SearchPoints(SearchRequest) returns (SearchResponse) {}
21+
}
22+
23+
24+
message InsertVectorRequest {
25+
DenseVector vector = 1;
26+
optional google.protobuf.Struct payload = 2;
27+
}
28+
29+
30+
message SearchRequest {
31+
DenseVector query_vector = 1;
32+
Similarity similarity = 2;
33+
int64 limit = 3;
34+
}
35+
36+
37+
message SearchResponse {
38+
repeated PointID result_point_ids = 1;
39+
}
40+
41+
message DenseVector {
42+
repeated float values = 1;
43+
}
44+
45+
message Point {
46+
PointID id = 1;
47+
optional google.protobuf.Struct payload = 2;
48+
DenseVector vector = 3;
49+
}
50+
51+
message PointID {
52+
uint64 id = 1;
53+
}
54+
55+
enum Similarity{
56+
Euclidean = 0;
57+
Manhattan = 1;
58+
Hamming = 2;
59+
Cosine = 3;
60+
}

crates/grpc_server/src/config.rs

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
use crate::constants::{
2+
self, DEFAULT_PORT, ENV_DATA_PATH, ENV_DIMENSION, ENV_INDEX_TYPE, ENV_LOGGING, ENV_PORT,
3+
ENV_ROOT_PASSWORD, ENV_STORAGE_TYPE,
4+
};
5+
use crate::errors;
6+
use api;
7+
use dotenv::dotenv;
8+
use index::IndexType;
9+
use std::net::SocketAddr;
10+
use std::path::PathBuf;
11+
use std::{env, fs};
12+
use storage;
13+
use tempfile::tempdir;
14+
use tracing::{Level, event};
15+
16+
pub struct GRPCServerConfig {
17+
pub addr: SocketAddr,
18+
pub root_password: String,
19+
pub db_config: api::DbConfig,
20+
pub logging: bool,
21+
}
22+
23+
impl GRPCServerConfig {
24+
pub fn load_config() -> Result<GRPCServerConfig, Box<dyn std::error::Error>> {
25+
dotenv().ok();
26+
27+
// fetch server host; default to localhost if not defined
28+
let host = env::var(constants::ENV_HOST)
29+
.inspect_err(|_| {
30+
event!(Level::WARN, "Host not defined, defaulting to 'localhost'");
31+
})
32+
.unwrap_or("127.0.0.1".to_string());
33+
34+
// fetch server port; default to 8080 if not defined
35+
let port: u32 = env::var(ENV_PORT)
36+
.inspect_err(|_| {
37+
event!(
38+
Level::WARN,
39+
"Port not defined, defaulting to {}",
40+
DEFAULT_PORT
41+
);
42+
})
43+
.unwrap_or(DEFAULT_PORT.to_string())
44+
.parse()
45+
.unwrap_or(DEFAULT_PORT.parse::<u32>().unwrap());
46+
47+
// fetch server root password; return err if not defined
48+
let root_password = env::var(ENV_ROOT_PASSWORD).map_err(|_| {
49+
errors::ConfigError::MissingRequiredEnvVar(ENV_ROOT_PASSWORD.to_string())
50+
})?;
51+
52+
// fetch server storage type
53+
let storage_type_str = env::var(ENV_STORAGE_TYPE)
54+
.inspect_err(|_| {
55+
event!(
56+
Level::WARN,
57+
"Storage Type not defined, defaulting to InMemory"
58+
)
59+
})
60+
.unwrap_or_default();
61+
let storage_type = match storage_type_str.as_str() {
62+
"inmemory" => storage::StorageType::InMemory,
63+
"rocksdb" => storage::StorageType::RocksDb,
64+
_ => storage::StorageType::InMemory, // default to InMemory if not specified
65+
};
66+
67+
// fetch server index type
68+
let index_type_str = env::var(ENV_INDEX_TYPE)
69+
.inspect_err(|_| event!(Level::WARN, "Index Type not defined, defaulting to flat"))
70+
.unwrap_or("flat".to_string())
71+
.to_lowercase();
72+
let index_type = match index_type_str.as_str() {
73+
"flat" => IndexType::Flat,
74+
"kdtree" => IndexType::KDTree,
75+
"hnsw" => IndexType::HNSW,
76+
_ => IndexType::Flat, // default to Flat if not specified
77+
};
78+
79+
// fetch dimension size
80+
let dimension: usize = env::var(ENV_DIMENSION)
81+
.map_err(|_| errors::ConfigError::MissingRequiredEnvVar(ENV_DIMENSION.to_string()))?
82+
.parse()
83+
.map_err(|_| errors::ConfigError::InvalidDimension)?;
84+
85+
// fetch data path; create tempdir if not specified
86+
let data_path: PathBuf;
87+
if let Ok(data_path_str) = env::var(ENV_DATA_PATH) {
88+
data_path = PathBuf::from(data_path_str);
89+
fs::create_dir_all(&data_path).map_err(|_| errors::ConfigError::InvalidDataPath)?;
90+
} else {
91+
let tempbuf = tempdir().unwrap().path().to_path_buf().join("vectordb");
92+
fs::create_dir_all(&tempbuf)?;
93+
event!(
94+
Level::WARN,
95+
"Data Path not specified, using temporary directory: {:?}",
96+
tempbuf.clone()
97+
);
98+
data_path = tempbuf;
99+
}
100+
101+
// create db config for api
102+
let db_config = api::DbConfig {
103+
storage_type,
104+
index_type,
105+
data_path,
106+
dimension,
107+
};
108+
109+
// create socket address for grpc server
110+
let addr: SocketAddr = format!("{}:{}", host, port).parse()?;
111+
112+
// check if logging is enabled
113+
let mut logging: bool = true; // default to logging enabled
114+
if let Ok(logging_str) = env::var(ENV_LOGGING) {
115+
logging = logging_str.parse().unwrap_or(true);
116+
}
117+
118+
Ok(GRPCServerConfig {
119+
addr,
120+
root_password,
121+
db_config,
122+
logging,
123+
})
124+
}
125+
}

0 commit comments

Comments
 (0)