Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions engine/artifacts/errors/actor.kv_key_not_found.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

59 changes: 59 additions & 0 deletions engine/artifacts/openapi.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions engine/packages/api-peer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ license.workspace = true
[dependencies]
anyhow.workspace = true
axum.workspace = true
base64.workspace = true
gas.workspace = true
epoxy.workspace = true
futures-util.workspace = true
Expand All @@ -27,6 +28,7 @@ tokio.workspace = true
tracing.workspace = true
namespace.workspace = true
pegboard.workspace = true
pegboard-actor-kv.workspace = true
universalpubsub.workspace = true
uuid.workspace = true
utoipa.workspace = true
65 changes: 65 additions & 0 deletions engine/packages/api-peer/src/actors/kv_get.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use anyhow::*;
use base64::Engine;
use base64::prelude::BASE64_STANDARD;
use pegboard_actor_kv as actor_kv;
use rivet_api_builder::ApiCtx;
use rivet_util::Id;
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;

#[derive(Deserialize)]
#[serde(deny_unknown_fields)]
pub struct KvGetPath {
pub actor_id: Id,
pub key: String,
}

#[derive(Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct KvGetQuery {}

#[derive(Serialize, ToSchema)]
#[schema(as = ActorsKvGetResponse)]
pub struct KvGetResponse {
/// Value encoded in base 64.
pub value: String,
pub update_ts: i64,
}

#[utoipa::path(
get,
operation_id = "actors_kv_get",
path = "/actors/{actor_id}/kv/keys/{key}",
params(
("actor_id" = Id, Path),
("key" = String, Path),
),
responses(
(status = 200, body = KvGetResponse),
),
)]
#[tracing::instrument(skip_all)]
pub async fn kv_get(ctx: ApiCtx, path: KvGetPath, _query: KvGetQuery) -> Result<KvGetResponse> {
// Decode base64 key
let key_bytes = BASE64_STANDARD
.decode(&path.key)
.context("failed to decode base64 key")?;

// Get the KV value
let udb = ctx.pools().udb()?;
let (keys, values, metadata) =
actor_kv::get(&*udb, path.actor_id, vec![key_bytes.clone()]).await?;

// Check if key was found
if keys.is_empty() {
return Err(pegboard::errors::Actor::KvKeyNotFound.build());
}

// Encode value as base64
let value_base64 = BASE64_STANDARD.encode(&values[0]);

Ok(KvGetResponse {
value: value_base64,
update_ts: metadata[0].create_ts,
})
}
1 change: 1 addition & 0 deletions engine/packages/api-peer/src/actors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod create;
pub mod delete;
pub mod kv_get;
pub mod list;
pub mod list_names;
4 changes: 4 additions & 0 deletions engine/packages/api-peer/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ pub async fn router(
.route("/actors", post(actors::create::create))
.route("/actors/{actor_id}", delete(actors::delete::delete))
.route("/actors/names", get(actors::list_names::list_names))
.route(
"/actors/{actor_id}/kv/keys/{key}",
get(actors::kv_get::kv_get),
)
// MARK: Runners
.route("/runners", get(runners::list))
.route("/runners/names", get(runners::list_names))
Expand Down
75 changes: 75 additions & 0 deletions engine/packages/api-public/src/actors/kv_get.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use anyhow::Result;
use axum::response::{IntoResponse, Response};
use rivet_api_builder::{
ApiError,
extract::{Extension, Path},
};
use rivet_api_util::request_remote_datacenter_raw;
use rivet_util::Id;
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;

use crate::ctx::ApiCtx;

#[derive(Deserialize)]
#[serde(deny_unknown_fields)]
pub struct KvGetPath {
pub actor_id: Id,
pub key: String,
}

#[derive(Serialize, ToSchema)]
#[schema(as = ActorsKvGetResponse)]
pub struct KvGetResponse {
pub value: String,
pub update_ts: i64,
}

#[utoipa::path(
get,
operation_id = "actors_kv_get",
path = "/actors/{actor_id}/kv/keys/{key}",
params(
("actor_id" = Id, Path),
("key" = String, Path),
),
responses(
(status = 200, body = KvGetResponse),
),
security(("bearer_auth" = [])),
)]
#[tracing::instrument(skip_all)]
pub async fn kv_get(Extension(ctx): Extension<ApiCtx>, Path(path): Path<KvGetPath>) -> Response {
match kv_get_inner(ctx, path).await {
Ok(response) => response,
Err(err) => ApiError::from(err).into_response(),
}
}

#[tracing::instrument(skip_all)]
async fn kv_get_inner(ctx: ApiCtx, path: KvGetPath) -> Result<Response> {
use axum::Json;

ctx.auth().await?;

if path.actor_id.label() == ctx.config().dc_label() {
let peer_path = rivet_api_peer::actors::kv_get::KvGetPath {
actor_id: path.actor_id,
key: path.key,
};
let peer_query = rivet_api_peer::actors::kv_get::KvGetQuery {};
let res = rivet_api_peer::actors::kv_get::kv_get(ctx.into(), peer_path, peer_query).await?;

Ok(Json(res).into_response())
} else {
request_remote_datacenter_raw(
&ctx,
path.actor_id.label(),
&format!("/actors/{}/kv/keys/{}", path.actor_id, path.key),
axum::http::Method::GET,
Option::<&()>::None,
Option::<&()>::None,
)
.await
}
}
1 change: 1 addition & 0 deletions engine/packages/api-public/src/actors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod create;
pub mod delete;
pub mod get_or_create;
pub mod kv_get;
pub mod list;
pub mod list_names;
pub mod utils;
5 changes: 5 additions & 0 deletions engine/packages/api-public/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::{actors, ctx, datacenters, health, metadata, namespaces, runner_confi
actors::delete::delete,
actors::list_names::list_names,
actors::get_or_create::get_or_create,
actors::kv_get::kv_get,
runners::list,
runners::list_names,
namespaces::list,
Expand Down Expand Up @@ -88,6 +89,10 @@ pub async fn router(
"/actors/names",
axum::routing::get(actors::list_names::list_names),
)
.route(
"/actors/{actor_id}/kv/keys/{key}",
axum::routing::get(actors::kv_get::kv_get),
)
// MARK: Runners
.route("/runners", axum::routing::get(runners::list))
.route("/runners/names", axum::routing::get(runners::list_names))
Expand Down
3 changes: 3 additions & 0 deletions engine/packages/pegboard/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ pub enum Actor {
namespace: String,
runner_name: String,
},

#[error("kv_key_not_found", "The KV key does not exist for this actor.")]
KvKeyNotFound,
}

#[derive(RivetError, Debug, Clone, Deserialize, Serialize)]
Expand Down
Loading