From 780efca6de7c6bd56bbb22fe89d0e8ebc76ba92d Mon Sep 17 00:00:00 2001 From: Tanmay Arya Date: Thu, 2 Oct 2025 15:21:39 +0530 Subject: [PATCH] Implement grpc server --- Cargo.lock | 873 ++++++++++++++++++++++- Cargo.toml | 1 + crates/api/src/lib.rs | 46 +- crates/defs/src/error.rs | 11 +- crates/grpc_server/.sample.env | 9 + crates/grpc_server/Cargo.toml | 27 + crates/grpc_server/README.md | 32 + crates/grpc_server/build.rs | 4 + crates/grpc_server/proto/vector-db.proto | 60 ++ crates/grpc_server/src/config.rs | 125 ++++ crates/grpc_server/src/constants.rs | 14 + crates/grpc_server/src/errors.rs | 14 + crates/grpc_server/src/interceptors.rs | 34 + crates/grpc_server/src/lib.rs | 9 + crates/grpc_server/src/main.rs | 30 + crates/grpc_server/src/service.rs | 162 +++++ crates/grpc_server/src/tests.rs | 253 +++++++ crates/grpc_server/src/utils.rs | 18 + crates/index/src/lib.rs | 2 +- crates/storage/src/lib.rs | 2 +- 20 files changed, 1712 insertions(+), 14 deletions(-) create mode 100644 crates/grpc_server/.sample.env create mode 100644 crates/grpc_server/Cargo.toml create mode 100644 crates/grpc_server/README.md create mode 100644 crates/grpc_server/build.rs create mode 100644 crates/grpc_server/proto/vector-db.proto create mode 100644 crates/grpc_server/src/config.rs create mode 100644 crates/grpc_server/src/constants.rs create mode 100644 crates/grpc_server/src/errors.rs create mode 100644 crates/grpc_server/src/interceptors.rs create mode 100644 crates/grpc_server/src/lib.rs create mode 100644 crates/grpc_server/src/main.rs create mode 100644 crates/grpc_server/src/service.rs create mode 100644 crates/grpc_server/src/tests.rs create mode 100644 crates/grpc_server/src/utils.rs diff --git a/Cargo.lock b/Cargo.lock index 1b3e627..51236fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,21 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "addr2line" +version = "0.25.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b5d307320b3181d6d7954e663bd7c774a838b8220fe0593c86d9fb09f498b4b" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler2" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" + [[package]] name = "aho-corasick" version = "1.1.3" @@ -11,6 +26,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "anyhow" +version = "1.0.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" + [[package]] name = "api" version = "0.1.0" @@ -21,6 +42,87 @@ dependencies = [ "tempfile", ] +[[package]] +name = "async-trait" +version = "0.1.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + +[[package]] +name = "axum" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a18ed336352031311f4e0b4dd2ff392d4fbb370777c9d18d7fc9d7359f73871" +dependencies = [ + "axum-core", + "bytes", + "futures-util", + "http", + "http-body", + "http-body-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "serde_core", + "sync_wrapper", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59446ce19cd142f8833f856eb31f3eb097812d1479ab224f54d72428ca21ea22" +dependencies = [ + "bytes", + "futures-core", + "http", + "http-body", + "http-body-util", + "mime", + "pin-project-lite", + "sync_wrapper", + "tower-layer", + "tower-service", +] + +[[package]] +name = "backtrace" +version = "0.3.76" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb531853791a215d7c62a30daf0dde835f381ab5de4589cfe7c649d2cbe92bd6" +dependencies = [ + "addr2line", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", + "windows-link", +] + +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + [[package]] name = "bincode" version = "1.3.3" @@ -81,6 +183,12 @@ version = "2.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b8e56985ec62d17e9c1001dc89c88ecd7dc08e47eba5ec7c29c7b5eeecde967" +[[package]] +name = "bytes" +version = "1.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" + [[package]] name = "bzip2-sys" version = "0.1.13+1.0.8" @@ -136,12 +244,24 @@ dependencies = [ "serde", ] +[[package]] +name = "dotenv" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" + [[package]] name = "either" version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +[[package]] +name = "equivalent" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" + [[package]] name = "errno" version = "0.3.13" @@ -158,6 +278,57 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" +[[package]] +name = "fixedbitset" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" + +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + +[[package]] +name = "futures-channel" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" +dependencies = [ + "futures-core", +] + +[[package]] +name = "futures-core" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" + +[[package]] +name = "futures-sink" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" + +[[package]] +name = "futures-task" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" + +[[package]] +name = "futures-util" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" +dependencies = [ + "futures-core", + "futures-task", + "pin-project-lite", + "pin-utils", +] + [[package]] name = "getrandom" version = "0.3.3" @@ -167,15 +338,177 @@ dependencies = [ "cfg-if", "libc", "r-efi", - "wasi", + "wasi 0.14.2+wasi-0.2.4", ] +[[package]] +name = "gimli" +version = "0.32.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e629b9b98ef3dd8afe6ca2bd0f89306cec16d43d907889945bc5d6687f2f13c7" + [[package]] name = "glob" version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8d1add55171497b4705a648c6b583acafb01d58050a51727785f0b2c8e0a2b2" +[[package]] +name = "grpc_server" +version = "0.1.0" +dependencies = [ + "api", + "defs", + "dotenv", + "index", + "prost", + "prost-types", + "storage", + "tempfile", + "tokio", + "tokio-stream", + "tonic", + "tonic-build", + "tonic-prost", + "tonic-prost-build", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "h2" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3c0b69cfcb4e1b9f1bf2f53f95f766e4661169728ec61cd3fe5a0166f2d1386" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "hashbrown" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5419bdc4f6a9207fbeba6d11b604d481addf78ecd10c11ad51e76c2f6482748d" + +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + +[[package]] +name = "http" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4a85d31aea989eead29a3aaf9e1115a180df8282431156e533de47660892565" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + +[[package]] +name = "http-body" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" +dependencies = [ + "bytes", + "http", +] + +[[package]] +name = "http-body-util" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a" +dependencies = [ + "bytes", + "futures-core", + "http", + "http-body", + "pin-project-lite", +] + +[[package]] +name = "httparse" +version = "1.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" + +[[package]] +name = "httpdate" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" + +[[package]] +name = "hyper" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb3aa54a13a0dfe7fbe3a59e0c76093041720fdc77b110cc0fc260fafb4dc51e" +dependencies = [ + "atomic-waker", + "bytes", + "futures-channel", + "futures-core", + "h2", + "http", + "http-body", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "pin-utils", + "smallvec", + "tokio", + "want", +] + +[[package]] +name = "hyper-timeout" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" +dependencies = [ + "hyper", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + +[[package]] +name = "hyper-util" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c6995591a8f1380fcb4ba966a252a4b29188d51d2b89e3a252f5305be65aea8" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "http", + "http-body", + "hyper", + "libc", + "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", +] + [[package]] name = "index" version = "0.1.0" @@ -183,6 +516,27 @@ dependencies = [ "defs", ] +[[package]] +name = "indexmap" +version = "2.11.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b0f83760fb341a774ed326568e19f5a863af4a952def8c39f9ab92fd95b88e5" +dependencies = [ + "equivalent", + "hashbrown", +] + +[[package]] +name = "io-uring" +version = "0.7.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "046fa2d4d00aea763528b4950358d0ead425372445dc8ff86312b3c69ff7727b" +dependencies = [ + "bitflags 2.9.1", + "cfg-if", + "libc", +] + [[package]] name = "itertools" version = "0.13.0" @@ -192,6 +546,12 @@ dependencies = [ "either", ] +[[package]] +name = "itoa" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" + [[package]] name = "jobserver" version = "0.1.33" @@ -263,6 +623,12 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd945864f07fe9f5371a27ad7b52a172b4b499999f1d97574c9fa68373937e12" +[[package]] +name = "log" +version = "0.4.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432" + [[package]] name = "lz4-sys" version = "1.11.1+lz4-1.10.0" @@ -273,18 +639,56 @@ dependencies = [ "libc", ] +[[package]] +name = "matchit" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" + [[package]] name = "memchr" version = "2.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0" +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + [[package]] name = "minimal-lexical" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" +[[package]] +name = "miniz_oxide" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fa76a2c86f704bdb222d66965fb3d63269ce38518b83cb0575fca855ebb6316" +dependencies = [ + "adler2", +] + +[[package]] +name = "mio" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78bed444cc8a2160f01cbcf811ef18cac863ad68ae8ca62092e8db51d51c761c" +dependencies = [ + "libc", + "wasi 0.11.1+wasi-snapshot-preview1", + "windows-sys 0.59.0", +] + +[[package]] +name = "multimap" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" + [[package]] name = "nom" version = "7.1.3" @@ -295,6 +699,24 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nu-ansi-term" +version = "0.50.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4a28e057d01f97e61255210fcff094d74ed0466038633e95017f5beb68e4399" +dependencies = [ + "windows-sys 0.52.0", +] + +[[package]] +name = "object" +version = "0.37.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff76201f031d8863c38aa7f905eca4f53abbfa15f609db4277d44cd8938f33fe" +dependencies = [ + "memchr", +] + [[package]] name = "once_cell" version = "1.21.3" @@ -307,6 +729,54 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" +[[package]] +name = "percent-encoding" +version = "2.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" + +[[package]] +name = "petgraph" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3672b37090dbd86368a4145bc067582552b29c27377cad4e0a306c97f9bd7772" +dependencies = [ + "fixedbitset", + "indexmap", +] + +[[package]] +name = "pin-project" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" + +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "pkg-config" version = "0.3.32" @@ -332,6 +802,80 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7231bd9b3d3d33c86b58adbac74b5ec0ad9f496b19d22801d773636feaa95f3d" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-build" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac6c3320f9abac597dcbc668774ef006702672474aad53c6d596b62e487b40b1" +dependencies = [ + "heck", + "itertools", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease", + "prost", + "prost-types", + "pulldown-cmark", + "pulldown-cmark-to-cmark", + "regex", + "syn", + "tempfile", +] + +[[package]] +name = "prost-derive" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9120690fafc389a67ba3803df527d0ec9cbbc9cc45e4cc20b332996dfb672425" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "prost-types" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9b4db3d6da204ed77bb26ba83b6122a73aeb2e87e25fbf7ad2e84c4ccbf8f72" +dependencies = [ + "prost", +] + +[[package]] +name = "pulldown-cmark" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e8bbe1a966bd2f362681a44f6edce3c2310ac21e4d5067a6e7ec396297a6ea0" +dependencies = [ + "bitflags 2.9.1", + "memchr", + "unicase", +] + +[[package]] +name = "pulldown-cmark-to-cmark" +version = "21.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5b6a0769a491a08b31ea5c62494a8f144ee0987d86d670a8af4df1e1b7cde75" +dependencies = [ + "pulldown-cmark", +] + [[package]] name = "quote" version = "1.0.40" @@ -386,6 +930,12 @@ dependencies = [ "librocksdb-sys", ] +[[package]] +name = "rustc-demangle" +version = "0.1.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace" + [[package]] name = "rustc-hash" version = "1.1.0" @@ -413,18 +963,28 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.219" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" +dependencies = [ + "serde_core", + "serde_derive", +] + +[[package]] +name = "serde_core" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.219" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", @@ -440,12 +1000,43 @@ dependencies = [ "index", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "slab" +version = "0.4.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a2ae44ef20feb57a68b23d846850f861394c2e02dc425a50098ae8c90267589" + +[[package]] +name = "smallvec" +version = "1.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" + +[[package]] +name = "socket2" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "233504af464074f9d066d7b5416c5f9b894a5862a6506e306f7b816cdd6f1807" +dependencies = [ + "libc", + "windows-sys 0.59.0", +] + [[package]] name = "storage" version = "0.1.0" @@ -468,6 +1059,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" + [[package]] name = "tempfile" version = "3.23.0" @@ -481,18 +1078,269 @@ dependencies = [ "windows-sys 0.60.2", ] +[[package]] +name = "thread_local" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "tokio" +version = "1.47.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89e49afdadebb872d3145a5638b59eb0691ea23e46ca484037cfab3b76b95038" +dependencies = [ + "backtrace", + "bytes", + "io-uring", + "libc", + "mio", + "pin-project-lite", + "slab", + "socket2", + "tokio-macros", + "windows-sys 0.59.0", +] + +[[package]] +name = "tokio-macros" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tokio-stream" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tokio-util" +version = "0.7.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14307c986784f72ef81c89db7d9e28d6ac26d16213b109ea501696195e6e3ce5" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tonic" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb7613188ce9f7df5bfe185db26c5814347d110db17920415cf2fbcad85e7203" +dependencies = [ + "async-trait", + "axum", + "base64", + "bytes", + "h2", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "socket2", + "sync_wrapper", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tonic-build" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c40aaccc9f9eccf2cd82ebc111adc13030d23e887244bc9cfa5d1d636049de3" +dependencies = [ + "prettyplease", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tonic-prost" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66bd50ad6ce1252d87ef024b3d64fe4c3cf54a86fb9ef4c631fdd0ded7aeaa67" +dependencies = [ + "bytes", + "prost", + "tonic", +] + +[[package]] +name = "tonic-prost-build" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4a16cba4043dc3ff43fcb3f96b4c5c154c64cbd18ca8dce2ab2c6a451d058a2" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build", + "prost-types", + "quote", + "syn", + "tempfile", + "tonic-build", +] + +[[package]] +name = "tower" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" +dependencies = [ + "futures-core", + "futures-util", + "indexmap", + "pin-project-lite", + "slab", + "sync_wrapper", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" + +[[package]] +name = "tower-service" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" + +[[package]] +name = "tracing" +version = "0.1.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" +dependencies = [ + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81383ab64e72a7a8b8e13130c49e3dab29def6d0c7d76a03087b3cf71c5c6903" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9d12581f227e93f094d3af2ae690a574abb8a2b9b7a96e7cfe9647b2b617678" +dependencies = [ + "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2054a14f5307d601f88daf0553e1cbf472acc4f2c51afab632431cdcd72124d5" +dependencies = [ + "nu-ansi-term", + "sharded-slab", + "smallvec", + "thread_local", + "tracing-core", + "tracing-log", +] + +[[package]] +name = "try-lock" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" + +[[package]] +name = "unicase" +version = "2.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75b844d17643ee918803943289730bec8aac480150456169e647ed0b576ba539" + [[package]] name = "unicode-ident" version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" +[[package]] +name = "valuable" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" + [[package]] name = "vcpkg" version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" +[[package]] +name = "want" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa7760aed19e106de2c7c0b581b509f2f25d3dacaf737cb82ac61bc6d760b0e" +dependencies = [ + "try-lock", +] + +[[package]] +name = "wasi" +version = "0.11.1+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" + [[package]] name = "wasi" version = "0.14.2+wasi-0.2.4" @@ -502,6 +1350,21 @@ dependencies = [ "wit-bindgen-rt", ] +[[package]] +name = "windows-link" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45e46c0661abb7180e7b9c281db115305d49ca1709ab8242adf09666d2173c65" + +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets 0.52.6", +] + [[package]] name = "windows-sys" version = "0.59.0" diff --git a/Cargo.toml b/Cargo.toml index 670d962..dc7d639 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ members = [ "crates/storage", "crates/index", "crates/server", + "crates/grpc_server", ] # You can define shared dependencies for all crates here diff --git a/crates/api/src/lib.rs b/crates/api/src/lib.rs index 3a7c34c..fa100ad 100644 --- a/crates/api/src/lib.rs +++ b/crates/api/src/lib.rs @@ -19,15 +19,27 @@ fn generate_point_id() -> u64 { pub struct VectorDb { storage: Arc, index: Arc>, // Using a RwLock instead of Mutex to improve concurrency + dimension: usize, } impl VectorDb { - fn _new(storage: Arc, index: Arc>) -> Self { - Self { storage, index } + fn _new( + storage: Arc, + index: Arc>, + dimension: usize, + ) -> Self { + Self { + storage, + index, + dimension, + } } //TODO: Make this an atomic operation pub fn insert(&self, vector: DenseVector, payload: Payload) -> Result { + if vector.len() != self.dimension { + return Err(DbError::DimensionMismatch); + } // Generate a new point id let point_id = generate_point_id(); self.storage @@ -44,13 +56,13 @@ impl VectorDb { } //TODO: Make this an atomic operation - pub fn delete(&self, id: PointId) -> Result<(), DbError> { + pub fn delete(&self, id: PointId) -> Result { // Remove from storage self.storage.delete_point(id)?; // Remove from index let mut index = self.index.write().map_err(|_| DbError::LockError)?; - index.delete(id)?; - Ok(()) + let point_found = index.delete(id)?; + Ok(point_found) } pub fn get(&self, id: PointId) -> Result, DbError> { @@ -105,7 +117,7 @@ pub fn init_api(config: DbConfig) -> Result { }; // Init the db - let db = VectorDb::_new(storage, index); + let db = VectorDb::_new(storage, index, config.dimension); Ok(db) } @@ -147,6 +159,22 @@ mod tests { assert_eq!(point.payload.as_ref().unwrap(), &payload); } + #[test] + fn test_dimension_mismatch() { + let db = create_test_db(); + let v1 = vec![1.0, 2.0, 3.0]; + let v2 = vec![1.0, 2.0]; + let payload = Payload {}; + + let res1 = db.insert(v1, payload); + assert!(res1.is_ok()); + + // Insert vector of dimension 2 != 3 + let res2 = db.insert(v2, payload); + assert!(res2.is_err()); + assert_eq!(res2.unwrap_err(), DbError::DimensionMismatch); + } + #[test] fn test_delete() { let db = create_test_db(); @@ -156,6 +184,12 @@ mod tests { // Insert a point let id = db.insert(vector, payload).unwrap(); + // try deleting a point that does not exist + let found = db.delete(id + 1); + assert!(found.is_ok()); + assert!(!found.unwrap()); + + // delete the point assert!(db.get(id).unwrap().is_some()); db.delete(id).unwrap(); assert!(db.get(id).unwrap().is_none()); diff --git a/crates/defs/src/error.rs b/crates/defs/src/error.rs index 1079c5e..4192974 100644 --- a/crates/defs/src/error.rs +++ b/crates/defs/src/error.rs @@ -1,4 +1,4 @@ -#[derive(Debug)] +#[derive(Debug, PartialEq, Eq)] pub enum DbError { ParseError, StorageError(String), @@ -6,4 +6,13 @@ pub enum DbError { DeserializationError, IndexError(String), LockError, + DimensionMismatch, } + +impl std::fmt::Display for DbError { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + +impl std::error::Error for DbError {} diff --git a/crates/grpc_server/.sample.env b/crates/grpc_server/.sample.env new file mode 100644 index 0000000..eae7ab8 --- /dev/null +++ b/crates/grpc_server/.sample.env @@ -0,0 +1,9 @@ +GRPC_SERVER_ROOT_PASSWORD=123 # required +GRPC_SERVER_DIMENSION=3 # required + +GRPC_SERVER_HOST=localhost # defaults to 127.0.0.1 aka localhost +GRPC_SERVER_PORT=8080 # defaults to 8080 +GRPC_SERVER_STORAGE_TYPE=inmemory # (inmemory/rocksdb) defaults to 'inmemory' +GRPC_SERVER_INDEX_TYPE=flat # defaults to flat +GRPC_SERVER_DATA_PATH=data # defaults to a temporary directory +GRPC_SERVER_LOGGING=true # defaults to true diff --git a/crates/grpc_server/Cargo.toml b/crates/grpc_server/Cargo.toml new file mode 100644 index 0000000..24c7379 --- /dev/null +++ b/crates/grpc_server/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "grpc_server" +version = "0.1.0" +edition = "2024" + +[dependencies] +prost = "0.14.1" +tokio = { version = "1.47.1", features = ["macros", "rt-multi-thread"] } +tonic = "0.14.2" +tonic-prost-build = "0.14.2" + +api = { path = "../api" } +storage = { path = "../storage" } +index = { path = "../index" } +defs = { path = "../defs" } + +tonic-prost = "0.14.2" +prost-types = "0.14.1" +dotenv = "0.15.0" +tempfile = "3.23.0" +tracing = "0.1.41" +tracing-subscriber = "0.3.20" +tokio-stream = "0.1.17" + +[build-dependencies] +tonic-build = "0.14.2" +tonic-prost-build = "0.14.2" diff --git a/crates/grpc_server/README.md b/crates/grpc_server/README.md new file mode 100644 index 0000000..a3c36ed --- /dev/null +++ b/crates/grpc_server/README.md @@ -0,0 +1,32 @@ +### Build + +Clone the repository and run `cargo build --bin grpc_server` to build the binary of the gRPC server crate. + +You can than then start the gRPC server by running: + +```bash +cargo run --bin grpc_server +``` + +### Configuration + +Use the [.sample.env](.sample.env) shown below as a reference to set your environment variables in a `.env` file. + +```bash +GRPC_SERVER_ROOT_PASSWORD=123 # required +GRPC_SERVER_DIMENSION=3 # required + +GRPC_SERVER_HOST=localhost # defaults to 127.0.0.1 aka localhost +GRPC_SERVER_PORT=8080 # defaults to 8080 +GRPC_SERVER_STORAGE_TYPE=inmemory # (inmemory/rocksdb) defaults to 'inmemory' +GRPC_SERVER_INDEX_TYPE=flat # defaults to flat +GRPC_SERVER_DATA_PATH=data # defaults to a temporary directory +GRPC_SERVER_LOGGING=true # defaults to true + + +``` + + +### Testing + +The [vector-db.proto](proto/vector-db.proto) can be imported into any gRPC client. diff --git a/crates/grpc_server/build.rs b/crates/grpc_server/build.rs new file mode 100644 index 0000000..8290b86 --- /dev/null +++ b/crates/grpc_server/build.rs @@ -0,0 +1,4 @@ +fn main() -> Result<(), Box> { + tonic_prost_build::compile_protos("proto/vector-db.proto")?; + Ok(()) +} diff --git a/crates/grpc_server/proto/vector-db.proto b/crates/grpc_server/proto/vector-db.proto new file mode 100644 index 0000000..a0e5f97 --- /dev/null +++ b/crates/grpc_server/proto/vector-db.proto @@ -0,0 +1,60 @@ +syntax = "proto3"; + +package vectordb; + +import "google/protobuf/struct.proto"; +import "google/protobuf/empty.proto"; + + +service VectorDB { + //Insert a vector with a payload and return the assigned PointID + rpc InsertVector(InsertVectorRequest) returns (PointID) {} + + //Delete a vector by its PointID + rpc DeletePoint(PointID) returns (google.protobuf.Empty) {} + + //Get a vector and its payload by PointID + rpc GetPoint(PointID) returns (Point) {} + + //Search for the k nearest vectors to a target vector given a distance function + rpc SearchPoints(SearchRequest) returns (SearchResponse) {} +} + + +message InsertVectorRequest { + DenseVector vector = 1; + optional google.protobuf.Struct payload = 2; +} + + +message SearchRequest { + DenseVector query_vector = 1; + Similarity similarity = 2; + int64 limit = 3; +} + + +message SearchResponse { + repeated PointID result_point_ids = 1; +} + +message DenseVector { + repeated float values = 1; +} + +message Point { + PointID id = 1; + optional google.protobuf.Struct payload = 2; + DenseVector vector = 3; +} + +message PointID { + uint64 id = 1; +} + +enum Similarity{ + Euclidean = 0; + Manhattan = 1; + Hamming = 2; + Cosine = 3; +} diff --git a/crates/grpc_server/src/config.rs b/crates/grpc_server/src/config.rs new file mode 100644 index 0000000..6d9137c --- /dev/null +++ b/crates/grpc_server/src/config.rs @@ -0,0 +1,125 @@ +use crate::constants::{ + self, DEFAULT_PORT, ENV_DATA_PATH, ENV_DIMENSION, ENV_INDEX_TYPE, ENV_LOGGING, ENV_PORT, + ENV_ROOT_PASSWORD, ENV_STORAGE_TYPE, +}; +use crate::errors; +use api; +use dotenv::dotenv; +use index::IndexType; +use std::net::SocketAddr; +use std::path::PathBuf; +use std::{env, fs}; +use storage; +use tempfile::tempdir; +use tracing::{Level, event}; + +pub struct GRPCServerConfig { + pub addr: SocketAddr, + pub root_password: String, + pub db_config: api::DbConfig, + pub logging: bool, +} + +impl GRPCServerConfig { + pub fn load_config() -> Result> { + dotenv().ok(); + + // fetch server host; default to localhost if not defined + let host = env::var(constants::ENV_HOST) + .inspect_err(|_| { + event!(Level::WARN, "Host not defined, defaulting to 'localhost'"); + }) + .unwrap_or("127.0.0.1".to_string()); + + // fetch server port; default to 8080 if not defined + let port: u32 = env::var(ENV_PORT) + .inspect_err(|_| { + event!( + Level::WARN, + "Port not defined, defaulting to {}", + DEFAULT_PORT + ); + }) + .unwrap_or(DEFAULT_PORT.to_string()) + .parse() + .unwrap_or(DEFAULT_PORT.parse::().unwrap()); + + // fetch server root password; return err if not defined + let root_password = env::var(ENV_ROOT_PASSWORD).map_err(|_| { + errors::ConfigError::MissingRequiredEnvVar(ENV_ROOT_PASSWORD.to_string()) + })?; + + // fetch server storage type + let storage_type_str = env::var(ENV_STORAGE_TYPE) + .inspect_err(|_| { + event!( + Level::WARN, + "Storage Type not defined, defaulting to InMemory" + ) + }) + .unwrap_or_default(); + let storage_type = match storage_type_str.as_str() { + "inmemory" => storage::StorageType::InMemory, + "rocksdb" => storage::StorageType::RocksDb, + _ => storage::StorageType::InMemory, // default to InMemory if not specified + }; + + // fetch server index type + let index_type_str = env::var(ENV_INDEX_TYPE) + .inspect_err(|_| event!(Level::WARN, "Index Type not defined, defaulting to flat")) + .unwrap_or("flat".to_string()) + .to_lowercase(); + let index_type = match index_type_str.as_str() { + "flat" => IndexType::Flat, + "kdtree" => IndexType::KDTree, + "hnsw" => IndexType::HNSW, + _ => IndexType::Flat, // default to Flat if not specified + }; + + // fetch dimension size + let dimension: usize = env::var(ENV_DIMENSION) + .map_err(|_| errors::ConfigError::MissingRequiredEnvVar(ENV_DIMENSION.to_string()))? + .parse() + .map_err(|_| errors::ConfigError::InvalidDimension)?; + + // fetch data path; create tempdir if not specified + let data_path: PathBuf; + if let Ok(data_path_str) = env::var(ENV_DATA_PATH) { + data_path = PathBuf::from(data_path_str); + fs::create_dir_all(&data_path).map_err(|_| errors::ConfigError::InvalidDataPath)?; + } else { + let tempbuf = tempdir().unwrap().path().to_path_buf().join("vectordb"); + fs::create_dir_all(&tempbuf)?; + event!( + Level::WARN, + "Data Path not specified, using temporary directory: {:?}", + tempbuf.clone() + ); + data_path = tempbuf; + } + + // create db config for api + let db_config = api::DbConfig { + storage_type, + index_type, + data_path, + dimension, + }; + + // create socket address for grpc server + let addr: SocketAddr = format!("{}:{}", host, port).parse()?; + + // check if logging is enabled + let mut logging: bool = true; // default to logging enabled + if let Ok(logging_str) = env::var(ENV_LOGGING) { + logging = logging_str.parse().unwrap_or(true); + } + + Ok(GRPCServerConfig { + addr, + root_password, + db_config, + logging, + }) + } +} diff --git a/crates/grpc_server/src/constants.rs b/crates/grpc_server/src/constants.rs new file mode 100644 index 0000000..4286a32 --- /dev/null +++ b/crates/grpc_server/src/constants.rs @@ -0,0 +1,14 @@ +use defs::Similarity; +use defs::Similarity::{Cosine, Euclidean, Hamming, Manhattan}; +pub const ENV_HOST: &str = "GRPC_SERVER_HOST"; +pub const ENV_PORT: &str = "GRPC_SERVER_PORT"; +pub const ENV_ROOT_PASSWORD: &str = "GRPC_SERVER_ROOT_PASSWORD"; +pub const ENV_STORAGE_TYPE: &str = "GRPC_SERVER_STORAGE_TYPE"; +pub const ENV_INDEX_TYPE: &str = "GRPC_SERVER_INDEX_TYPE"; +pub const ENV_DIMENSION: &str = "GRPC_SERVER_DIMENSION"; +pub const ENV_DATA_PATH: &str = "GRPC_SERVER_DATA_PATH"; +pub const ENV_LOGGING: &str = "GRPC_SERVER_LOGGING"; + +pub const DEFAULT_PORT: &str = "8080"; + +pub const SIMILARITY_PROTOBUFF_MAP: [Similarity; 4] = [Euclidean, Manhattan, Hamming, Cosine]; diff --git a/crates/grpc_server/src/errors.rs b/crates/grpc_server/src/errors.rs new file mode 100644 index 0000000..c7342bb --- /dev/null +++ b/crates/grpc_server/src/errors.rs @@ -0,0 +1,14 @@ +#[derive(Debug)] +pub enum ConfigError { + MissingRequiredEnvVar(String), + InvalidDimension, + InvalidDataPath, +} + +impl std::error::Error for ConfigError {} + +impl std::fmt::Display for ConfigError { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} diff --git a/crates/grpc_server/src/interceptors.rs b/crates/grpc_server/src/interceptors.rs new file mode 100644 index 0000000..d85ab0b --- /dev/null +++ b/crates/grpc_server/src/interceptors.rs @@ -0,0 +1,34 @@ +use tonic::{Status, service::Interceptor}; +use tracing::{Level, event}; + +#[derive(Clone)] +pub struct AuthInterceptor { + root_password: String, +} + +impl Interceptor for AuthInterceptor { + fn call(&mut self, req: tonic::Request<()>) -> Result, Status> { + let auth_token = match req.metadata().get("authorization") { + Some(t) => t, + None => return Err(Status::unauthenticated("Invalid credentials")), + }; + if auth_token + .to_str() + .unwrap_or_default() + .strip_prefix("Bearer ") + .unwrap_or_default() + == self.root_password + { + Ok(req) + } else { + event!(Level::WARN, "Unauthorized Request"); + Err(Status::unauthenticated("Invalid credentials")) + } + } +} + +impl AuthInterceptor { + pub fn new(root_password: String) -> AuthInterceptor { + AuthInterceptor { root_password } + } +} diff --git a/crates/grpc_server/src/lib.rs b/crates/grpc_server/src/lib.rs new file mode 100644 index 0000000..a14557d --- /dev/null +++ b/crates/grpc_server/src/lib.rs @@ -0,0 +1,9 @@ +pub mod config; +pub mod constants; +pub mod errors; +pub mod interceptors; +pub mod service; +pub mod utils; + +#[cfg(test)] +pub mod tests; diff --git a/crates/grpc_server/src/main.rs b/crates/grpc_server/src/main.rs new file mode 100644 index 0000000..83db6dd --- /dev/null +++ b/crates/grpc_server/src/main.rs @@ -0,0 +1,30 @@ +use grpc_server::config::GRPCServerConfig; +use grpc_server::service::{VectorDBService, run_server}; +use grpc_server::utils::ServerEndpoint; +use std::panic; + +#[tokio::main] +async fn main() -> Result<(), Box> { + tracing_subscriber::fmt::init(); + + // load config from environment from environment variables + let config = GRPCServerConfig::load_config() + .inspect_err(|err| panic!("Failed to load config: {}", err)) + .unwrap(); + + let vector_db_api = api::init_api(config.db_config) + .inspect_err(|err| panic!("Failed to Init API: {:?}", err)) + .unwrap(); + + let vector_db_service = VectorDBService { + vector_db: vector_db_api, + logging: config.logging, + }; + run_server( + vector_db_service, + ServerEndpoint::Address(config.addr), + config.root_password, + ) + .await?; + Ok(()) +} diff --git a/crates/grpc_server/src/service.rs b/crates/grpc_server/src/service.rs new file mode 100644 index 0000000..1f0eac9 --- /dev/null +++ b/crates/grpc_server/src/service.rs @@ -0,0 +1,162 @@ +use crate::interceptors; +use crate::utils::log_rpc; +use crate::{constants::SIMILARITY_PROTOBUFF_MAP, utils::ServerEndpoint}; +use tonic::{Request, Response, Status, service::InterceptorLayer, transport::Server}; +use tracing::{Level, event}; +use vectordb::{ + DenseVector, InsertVectorRequest, Point, PointId, SearchRequest, SearchResponse, + vector_db_server::{VectorDb, VectorDbServer}, +}; + +pub mod vectordb { + tonic::include_proto!("vectordb"); +} + +pub struct VectorDBService { + pub vector_db: api::VectorDb, + pub logging: bool, +} + +#[tonic::async_trait] +impl VectorDb for VectorDBService { + async fn insert_vector( + &self, + request: Request, + ) -> Result, Status> { + log_rpc("insert_vector", self.logging); + + let inner_request = request.into_inner(); + + let dense_vector = inner_request.vector; + if dense_vector.is_none() { + return Err(Status::invalid_argument("dense_vector is empty")); + } + + // TODO: Implement payload handling once its defined + // fetch payload and default to empty struct otherwise + // let payload = inner_request.payload.unwrap_or_default(); + + let point_id = self + .vector_db + .insert(dense_vector.unwrap().values, defs::Payload {}); + + let res = + point_id.map_err(|e| Status::internal(format!("failed to insert vector: {:?}", e)))?; + + Ok(Response::new(PointId { id: res })) + } + + async fn get_point(&self, request: Request) -> Result, Status> { + log_rpc("get_point", self.logging); + + let inner_request = request.into_inner(); + + let point_id = inner_request.id; + let point_opt = self + .vector_db + .get(point_id) + .map_err(|e| Status::aborted(format!("point not found {:?}", e)))?; + + // return error if not found + let point = point_opt.ok_or(Status::not_found(format!("point not found: {}", point_id)))?; + + Ok(Response::new(Point { + id: Some(PointId { id: point.id }), + vector: Some(DenseVector { + values: point.vector.unwrap_or_default(), + }), + payload: None, + })) + } + + async fn search_points( + &self, + request: Request, + ) -> Result, Status> { + log_rpc("search_points", self.logging); + + let search_request = request.into_inner(); + + // extract request arguments + let query_vect = search_request + .query_vector + .ok_or(Status::invalid_argument("Invalid query_vector"))?; + let similarity = SIMILARITY_PROTOBUFF_MAP + .get(search_request.similarity as usize) + .ok_or(Status::internal("Invalid similarity"))?; + let limit = search_request.limit; + + let result_point_ids = self + .vector_db + .search(query_vect.values, *similarity, limit as usize) + .map_err(|_| Status::internal("Internal server error"))?; + + // create a mapped vector of PointIds + let result = result_point_ids + .into_iter() + .map(|id| PointId { id }) + .collect(); + + Ok(Response::new(SearchResponse { + result_point_ids: result, + })) + } + + async fn delete_point(&self, request: Request) -> Result, Status> { + log_rpc("delete_point", self.logging); + + let point_id = request.into_inner().id; + + match self.vector_db.delete(point_id) { + Ok(found) => { + if found { + Ok(Response::new(())) + } else { + Err(Status::not_found("Point not found")) + } + } + Err(_) => Err(Status::internal("Error deleting point")), + } + } +} + +pub async fn run_server( + vector_db_service: VectorDBService, + endpoint: ServerEndpoint, + root_password: String, +) -> Result<(), Box> { + event!(Level::INFO, "Starting gRPC server at: {:?}", endpoint); + + let auth_interceptor = interceptors::AuthInterceptor::new(root_password); + + let router = Server::builder() + .layer(InterceptorLayer::new(auth_interceptor)) + .add_service(VectorDbServer::new(vector_db_service)); + + match endpoint { + ServerEndpoint::Address(addr) => { + router.serve(addr).await.map_err(|err| { + event!( + Level::ERROR, + "Failed to start gRPC server with address: {:?}", + err + ); + Status::internal(format!("Failed to start server with address: {}", err)) + })?; + } + ServerEndpoint::Listener(listener) => { + router + .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)) + .await + .map_err(|err| { + event!( + Level::ERROR, + "Failed to start gRPC server with listener: {:?}", + err + ); + Status::internal(format!("Failed to start server with listener: {}", err)) + })?; + } + } + Ok(()) +} diff --git a/crates/grpc_server/src/tests.rs b/crates/grpc_server/src/tests.rs new file mode 100644 index 0000000..6a477db --- /dev/null +++ b/crates/grpc_server/src/tests.rs @@ -0,0 +1,253 @@ +use crate::config::GRPCServerConfig; +use crate::service::vectordb::vector_db_client::VectorDbClient; +use crate::service::vectordb::{DenseVector, InsertVectorRequest, PointId, SearchRequest}; +use crate::service::{VectorDBService, run_server}; +use crate::utils::ServerEndpoint; +use api; +use api::DbConfig; +use index::IndexType; +use prost_types::Struct; +use storage::StorageType; +use tempfile::tempdir; +use tokio; +use tonic::transport::Channel; + +// Inspired from https://github.com/hyperium/tonic/discussions/924#discussioncomment-9854088 + +// TODO: figure out a way to either: +// - assign different ports for different tests; when cargo test is run with multiple threads -> address in use error +// - use a shared instance of the server +// currently tests must be run with --test-threads=1 + +async fn start_test_server() -> Result<(), Box> { + // using a temporary directory for db datapath + let temp_dir = tempdir().unwrap(); + + let db_config = DbConfig { + storage_type: StorageType::RocksDb, + index_type: IndexType::Flat, + data_path: temp_dir.path().to_path_buf(), + dimension: 3, + }; + + let config = GRPCServerConfig { + addr: "127.0.0.1:8080".parse()?, + root_password: "123".to_string(), + logging: false, + db_config, + }; + + let vector_db_api = api::init_api(config.db_config)?; + + let vector_db_service = VectorDBService { + vector_db: vector_db_api, + logging: config.logging, + }; + + let listener = tokio::net::TcpListener::bind(config.addr).await?; + + tokio::spawn(async move { + let _ = run_server( + vector_db_service, + ServerEndpoint::Listener(listener), + config.root_password, + ) + .await + .inspect_err(|err| panic!("Could not start test server : {:?}", err)); + }); + + Ok(()) +} + +async fn create_test_client() -> Result, Box> { + let channel = tonic::transport::Channel::from_static("http://127.0.0.1:8080") + .connect() + .await?; + Ok(VectorDbClient::new(channel)) +} + +#[tokio::test] +async fn test_grpc_server_start() { + start_test_server().await.unwrap(); + let mut client = create_test_client().await.unwrap(); + + // insert a test vector + let test_vec = vec![1.0, 2.0, 3.0]; + + let mut request = tonic::Request::new(InsertVectorRequest { + vector: Some(DenseVector { + values: test_vec.clone(), + }), + payload: Some(Struct::default()), + }); + request + .metadata_mut() + .insert("authorization", "Bearer 123".parse().unwrap()); + let status = client.insert_vector(request).await.is_ok(); + assert!(status); +} + +#[tokio::test] +async fn test_insert_vector_rpc() { + start_test_server().await.unwrap(); + let mut client = create_test_client().await.unwrap(); + + // insert a test vector + let test_vec = vec![1.0, 2.0, 3.0]; + + let mut request = tonic::Request::new(InsertVectorRequest { + vector: Some(DenseVector { + values: test_vec.clone(), + }), + payload: Some(Struct::default()), + }); + request + .metadata_mut() + .insert("authorization", "Bearer 123".parse().unwrap()); + let resp = client.insert_vector(request).await; + + // check if request is successful + assert!(resp.is_ok()); + + // check if the vector is actually present in the database + let mut request = tonic::Request::new(PointId { + id: resp.unwrap().into_inner().id, + }); + request + .metadata_mut() + .insert("authorization", "Bearer 123".parse().unwrap()); + let resp = client.get_point(request).await; + + // check if request is successful + assert!(resp.is_ok()); + let point = resp.unwrap().into_inner(); + assert_eq!(point.vector.unwrap().values, test_vec); + + // insert a new vector with mismatched dimensions + let mut request = tonic::Request::new(InsertVectorRequest { + vector: Some(DenseVector { + values: vec![1.0, 2.0], + }), + payload: Some(Struct::default()), + }); + request + .metadata_mut() + .insert("authorization", "Bearer 123".parse().unwrap()); + let resp = client.insert_vector(request).await; + + // request must fail + assert!(resp.is_err()); +} + +#[tokio::test] +async fn test_delete_vector_rpc() { + start_test_server().await.unwrap(); + let mut client = create_test_client().await.unwrap(); + + // insert a test vector + let test_vec = vec![1.0, 2.0, 3.0]; + let mut request = tonic::Request::new(InsertVectorRequest { + vector: Some(DenseVector { + values: test_vec.clone(), + }), + payload: Some(Struct::default()), + }); + request + .metadata_mut() + .insert("authorization", "Bearer 123".parse().unwrap()); + let resp = client.insert_vector(request).await; + + // check if request is successful + assert!(resp.is_ok()); + let point = resp.unwrap().into_inner(); + + // delete the vector + let mut request = tonic::Request::new(PointId { id: point.id }); + request + .metadata_mut() + .insert("authorization", "Bearer 123".parse().unwrap()); + let resp = client.delete_point(request).await; + + // check if request is successful + assert!(resp.is_ok()); + + // verify that the vector is deleted + let mut request = tonic::Request::new(PointId { id: point.id }); + request + .metadata_mut() + .insert("authorization", "Bearer 123".parse().unwrap()); + let resp = client.get_point(request).await; + + // request must fail since the vector is deleted + assert!(resp.is_err()); +} + +#[tokio::test] +async fn test_search_vector_rpc() { + start_test_server().await.unwrap(); + let mut client = create_test_client().await.unwrap(); + + // insert a test vector + let test_vec = vec![1.0, 2.0, 3.0]; + let mut request = tonic::Request::new(InsertVectorRequest { + vector: Some(DenseVector { + values: test_vec.clone(), + }), + payload: Some(Struct::default()), + }); + request + .metadata_mut() + .insert("authorization", "Bearer 123".parse().unwrap()); + let resp = client.insert_vector(request).await; + + // check if request is successful + assert!(resp.is_ok()); + let point = resp.unwrap().into_inner(); + + let query_vec = vec![2.0, 2.0, 2.0]; + + // search for the vector + let mut request = tonic::Request::new(SearchRequest { + query_vector: Some(DenseVector { + values: query_vec.clone(), + }), + similarity: 0, // euclidean distance + limit: 1, + }); + request + .metadata_mut() + .insert("authorization", "Bearer 123".parse().unwrap()); + let resp = client.search_points(request).await; + + // check if request is successful + assert!(resp.is_ok()); + let result = resp.unwrap().into_inner(); + + // 1 vector has to be returned + assert_eq!(result.result_point_ids.len(), 1); + + // check if the returned point id matches the inserted point id + assert_eq!(result.result_point_ids[0], PointId { id: point.id }); +} + +#[tokio::test] +async fn test_unauthorized_rpc() { + start_test_server().await.unwrap(); + let mut client = create_test_client().await.unwrap(); + + // insert a test vector + let test_vec = vec![1.0, 2.0, 3.0]; + let mut request = tonic::Request::new(InsertVectorRequest { + vector: Some(DenseVector { + values: test_vec.clone(), + }), + payload: Some(Struct::default()), + }); + request + .metadata_mut() + .insert("authorization", "Bearer 43121".parse().unwrap()); // wrong auth token + let resp = client.insert_vector(request).await; + + // request must fail + assert!(resp.is_err()); +} diff --git a/crates/grpc_server/src/utils.rs b/crates/grpc_server/src/utils.rs new file mode 100644 index 0000000..8f7cf7b --- /dev/null +++ b/crates/grpc_server/src/utils.rs @@ -0,0 +1,18 @@ +use std::net::SocketAddr; + +use tokio::net::TcpListener; +use tracing; + +#[inline(always)] // doing function inlining for optimization +pub fn log_rpc(rpc: &str, logging: bool) { + if logging { + tracing::event!(tracing::Level::INFO, "{} RPC called", rpc); + } +} + +// Either socket address or a premade listener +#[derive(Debug)] +pub enum ServerEndpoint { + Address(SocketAddr), + Listener(TcpListener), +} diff --git a/crates/index/src/lib.rs b/crates/index/src/lib.rs index ef93755..cd363b0 100644 --- a/crates/index/src/lib.rs +++ b/crates/index/src/lib.rs @@ -2,7 +2,7 @@ use defs::{DbError, DenseVector, IndexedVector, PointId, Similarity}; pub mod flat; -pub trait VectorIndex { +pub trait VectorIndex: Send + Sync { fn insert(&mut self, vector: IndexedVector) -> Result<(), DbError>; // Returns true if point id existed and is deleted, else returns false diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index 36068fb..91d7fa3 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use crate::rocks_db::RocksDbStorage; -pub trait StorageEngine { +pub trait StorageEngine: Send + Sync { fn insert_point( &self, id: PointId,