diff --git a/Cargo.lock b/Cargo.lock index 4431806248..99d145db88 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4240,12 +4240,14 @@ version = "2.0.24-rc.1" dependencies = [ "anyhow", "axum 0.8.4", + "base64 0.22.1", "epoxy", "futures-util", "gasoline", "indexmap 2.10.0", "namespace", "pegboard", + "pegboard-actor-kv", "rivet-api-builder", "rivet-api-types", "rivet-api-util", diff --git a/engine/artifacts/errors/actor.kv_key_not_found.json b/engine/artifacts/errors/actor.kv_key_not_found.json new file mode 100644 index 0000000000..31e465083b --- /dev/null +++ b/engine/artifacts/errors/actor.kv_key_not_found.json @@ -0,0 +1,5 @@ +{ + "code": "kv_key_not_found", + "group": "actor", + "message": "The KV key does not exist for this actor." +} \ No newline at end of file diff --git a/engine/artifacts/openapi.json b/engine/artifacts/openapi.json index e13da9b43f..ff5cbd4f6b 100644 --- a/engine/artifacts/openapi.json +++ b/engine/artifacts/openapi.json @@ -274,6 +274,49 @@ ] } }, + "/actors/{actor_id}/kv/keys/{key}": { + "get": { + "tags": [ + "actors::kv_get" + ], + "operationId": "actors_kv_get", + "parameters": [ + { + "name": "actor_id", + "in": "path", + "required": true, + "schema": { + "$ref": "#/components/schemas/RivetId" + } + }, + { + "name": "key", + "in": "path", + "required": true, + "schema": { + "type": "string" + } + } + ], + "responses": { + "200": { + "description": "", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ActorsKvGetResponse" + } + } + } + } + }, + "security": [ + { + "bearer_auth": [] + } + ] + } + }, "/datacenters": { "get": { "tags": [ @@ -1020,6 +1063,22 @@ } } }, + "ActorsKvGetResponse": { + "type": "object", + "required": [ + "value", + "update_ts" + ], + "properties": { + "update_ts": { + "type": "integer", + "format": "int64" + }, + "value": { + "type": "string" + } + } + }, "ActorsListNamesResponse": { "type": "object", "required": [ diff --git a/engine/packages/api-peer/Cargo.toml b/engine/packages/api-peer/Cargo.toml index 8b95d7e98a..060e9d6a3a 100644 --- a/engine/packages/api-peer/Cargo.toml +++ b/engine/packages/api-peer/Cargo.toml @@ -8,6 +8,7 @@ license.workspace = true [dependencies] anyhow.workspace = true axum.workspace = true +base64.workspace = true gas.workspace = true epoxy.workspace = true futures-util.workspace = true @@ -27,6 +28,7 @@ tokio.workspace = true tracing.workspace = true namespace.workspace = true pegboard.workspace = true +pegboard-actor-kv.workspace = true universalpubsub.workspace = true uuid.workspace = true utoipa.workspace = true diff --git a/engine/packages/api-peer/src/actors/kv_get.rs b/engine/packages/api-peer/src/actors/kv_get.rs new file mode 100644 index 0000000000..624c3d632f --- /dev/null +++ b/engine/packages/api-peer/src/actors/kv_get.rs @@ -0,0 +1,65 @@ +use anyhow::*; +use base64::Engine; +use base64::prelude::BASE64_STANDARD; +use pegboard_actor_kv as actor_kv; +use rivet_api_builder::ApiCtx; +use rivet_util::Id; +use serde::{Deserialize, Serialize}; +use utoipa::ToSchema; + +#[derive(Deserialize)] +#[serde(deny_unknown_fields)] +pub struct KvGetPath { + pub actor_id: Id, + pub key: String, +} + +#[derive(Debug, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub struct KvGetQuery {} + +#[derive(Serialize, ToSchema)] +#[schema(as = ActorsKvGetResponse)] +pub struct KvGetResponse { + /// Value encoded in base 64. + pub value: String, + pub update_ts: i64, +} + +#[utoipa::path( + get, + operation_id = "actors_kv_get", + path = "/actors/{actor_id}/kv/keys/{key}", + params( + ("actor_id" = Id, Path), + ("key" = String, Path), + ), + responses( + (status = 200, body = KvGetResponse), + ), +)] +#[tracing::instrument(skip_all)] +pub async fn kv_get(ctx: ApiCtx, path: KvGetPath, _query: KvGetQuery) -> Result { + // Decode base64 key + let key_bytes = BASE64_STANDARD + .decode(&path.key) + .context("failed to decode base64 key")?; + + // Get the KV value + let udb = ctx.pools().udb()?; + let (keys, values, metadata) = + actor_kv::get(&*udb, path.actor_id, vec![key_bytes.clone()]).await?; + + // Check if key was found + if keys.is_empty() { + return Err(pegboard::errors::Actor::KvKeyNotFound.build()); + } + + // Encode value as base64 + let value_base64 = BASE64_STANDARD.encode(&values[0]); + + Ok(KvGetResponse { + value: value_base64, + update_ts: metadata[0].create_ts, + }) +} diff --git a/engine/packages/api-peer/src/actors/mod.rs b/engine/packages/api-peer/src/actors/mod.rs index ce36036d8f..9451cd59eb 100644 --- a/engine/packages/api-peer/src/actors/mod.rs +++ b/engine/packages/api-peer/src/actors/mod.rs @@ -1,4 +1,5 @@ pub mod create; pub mod delete; +pub mod kv_get; pub mod list; pub mod list_names; diff --git a/engine/packages/api-peer/src/router.rs b/engine/packages/api-peer/src/router.rs index a4f567bb1d..05fdc31fa8 100644 --- a/engine/packages/api-peer/src/router.rs +++ b/engine/packages/api-peer/src/router.rs @@ -26,6 +26,10 @@ pub async fn router( .route("/actors", post(actors::create::create)) .route("/actors/{actor_id}", delete(actors::delete::delete)) .route("/actors/names", get(actors::list_names::list_names)) + .route( + "/actors/{actor_id}/kv/keys/{key}", + get(actors::kv_get::kv_get), + ) // MARK: Runners .route("/runners", get(runners::list)) .route("/runners/names", get(runners::list_names)) diff --git a/engine/packages/api-public/src/actors/kv_get.rs b/engine/packages/api-public/src/actors/kv_get.rs new file mode 100644 index 0000000000..8a9bd4860c --- /dev/null +++ b/engine/packages/api-public/src/actors/kv_get.rs @@ -0,0 +1,75 @@ +use anyhow::Result; +use axum::response::{IntoResponse, Response}; +use rivet_api_builder::{ + ApiError, + extract::{Extension, Path}, +}; +use rivet_api_util::request_remote_datacenter_raw; +use rivet_util::Id; +use serde::{Deserialize, Serialize}; +use utoipa::ToSchema; + +use crate::ctx::ApiCtx; + +#[derive(Deserialize)] +#[serde(deny_unknown_fields)] +pub struct KvGetPath { + pub actor_id: Id, + pub key: String, +} + +#[derive(Serialize, ToSchema)] +#[schema(as = ActorsKvGetResponse)] +pub struct KvGetResponse { + pub value: String, + pub update_ts: i64, +} + +#[utoipa::path( + get, + operation_id = "actors_kv_get", + path = "/actors/{actor_id}/kv/keys/{key}", + params( + ("actor_id" = Id, Path), + ("key" = String, Path), + ), + responses( + (status = 200, body = KvGetResponse), + ), + security(("bearer_auth" = [])), +)] +#[tracing::instrument(skip_all)] +pub async fn kv_get(Extension(ctx): Extension, Path(path): Path) -> Response { + match kv_get_inner(ctx, path).await { + Ok(response) => response, + Err(err) => ApiError::from(err).into_response(), + } +} + +#[tracing::instrument(skip_all)] +async fn kv_get_inner(ctx: ApiCtx, path: KvGetPath) -> Result { + use axum::Json; + + ctx.auth().await?; + + if path.actor_id.label() == ctx.config().dc_label() { + let peer_path = rivet_api_peer::actors::kv_get::KvGetPath { + actor_id: path.actor_id, + key: path.key, + }; + let peer_query = rivet_api_peer::actors::kv_get::KvGetQuery {}; + let res = rivet_api_peer::actors::kv_get::kv_get(ctx.into(), peer_path, peer_query).await?; + + Ok(Json(res).into_response()) + } else { + request_remote_datacenter_raw( + &ctx, + path.actor_id.label(), + &format!("/actors/{}/kv/keys/{}", path.actor_id, path.key), + axum::http::Method::GET, + Option::<&()>::None, + Option::<&()>::None, + ) + .await + } +} diff --git a/engine/packages/api-public/src/actors/mod.rs b/engine/packages/api-public/src/actors/mod.rs index 9a563baae5..d1adaf1d36 100644 --- a/engine/packages/api-public/src/actors/mod.rs +++ b/engine/packages/api-public/src/actors/mod.rs @@ -1,6 +1,7 @@ pub mod create; pub mod delete; pub mod get_or_create; +pub mod kv_get; pub mod list; pub mod list_names; pub mod utils; diff --git a/engine/packages/api-public/src/router.rs b/engine/packages/api-public/src/router.rs index 06ef91061c..2b71b35c9b 100644 --- a/engine/packages/api-public/src/router.rs +++ b/engine/packages/api-public/src/router.rs @@ -18,6 +18,7 @@ use crate::{actors, ctx, datacenters, health, metadata, namespaces, runner_confi actors::delete::delete, actors::list_names::list_names, actors::get_or_create::get_or_create, + actors::kv_get::kv_get, runners::list, runners::list_names, namespaces::list, @@ -88,6 +89,10 @@ pub async fn router( "/actors/names", axum::routing::get(actors::list_names::list_names), ) + .route( + "/actors/{actor_id}/kv/keys/{key}", + axum::routing::get(actors::kv_get::kv_get), + ) // MARK: Runners .route("/runners", axum::routing::get(runners::list)) .route("/runners/names", axum::routing::get(runners::list_names)) diff --git a/engine/packages/pegboard/src/errors.rs b/engine/packages/pegboard/src/errors.rs index dbd3173e63..62ab7138a3 100644 --- a/engine/packages/pegboard/src/errors.rs +++ b/engine/packages/pegboard/src/errors.rs @@ -63,6 +63,9 @@ pub enum Actor { namespace: String, runner_name: String, }, + + #[error("kv_key_not_found", "The KV key does not exist for this actor.")] + KvKeyNotFound, } #[derive(RivetError, Debug, Clone, Deserialize, Serialize)]