Skip to content

Commit 42bf4e8

Browse files
committed
feat(api-public): expose actor kv api
1 parent 3c5d900 commit 42bf4e8

File tree

11 files changed

+222
-0
lines changed

11 files changed

+222
-0
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/artifacts/errors/actor.kv_key_not_found.json

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/artifacts/openapi.json

Lines changed: 59 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/packages/api-peer/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ license.workspace = true
88
[dependencies]
99
anyhow.workspace = true
1010
axum.workspace = true
11+
base64.workspace = true
1112
gas.workspace = true
1213
epoxy.workspace = true
1314
futures-util.workspace = true
@@ -27,6 +28,7 @@ tokio.workspace = true
2728
tracing.workspace = true
2829
namespace.workspace = true
2930
pegboard.workspace = true
31+
pegboard-actor-kv.workspace = true
3032
universalpubsub.workspace = true
3133
uuid.workspace = true
3234
utoipa.workspace = true
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
use anyhow::*;
2+
use base64::Engine;
3+
use base64::prelude::BASE64_STANDARD;
4+
use pegboard_actor_kv as actor_kv;
5+
use rivet_api_builder::ApiCtx;
6+
use rivet_util::Id;
7+
use serde::{Deserialize, Serialize};
8+
use utoipa::ToSchema;
9+
10+
#[derive(Deserialize)]
11+
#[serde(deny_unknown_fields)]
12+
pub struct KvGetPath {
13+
pub actor_id: Id,
14+
pub key: String,
15+
}
16+
17+
#[derive(Debug, Deserialize, Serialize)]
18+
#[serde(deny_unknown_fields)]
19+
pub struct KvGetQuery {}
20+
21+
#[derive(Serialize, ToSchema)]
22+
#[schema(as = ActorsKvGetResponse)]
23+
pub struct KvGetResponse {
24+
/// Value encoded in base 64.
25+
pub value: String,
26+
pub update_ts: i64,
27+
}
28+
29+
#[utoipa::path(
30+
get,
31+
operation_id = "actors_kv_get",
32+
path = "/actors/{actor_id}/kv/keys/{key}",
33+
params(
34+
("actor_id" = Id, Path),
35+
("key" = String, Path),
36+
),
37+
responses(
38+
(status = 200, body = KvGetResponse),
39+
),
40+
)]
41+
#[tracing::instrument(skip_all)]
42+
pub async fn kv_get(ctx: ApiCtx, path: KvGetPath, _query: KvGetQuery) -> Result<KvGetResponse> {
43+
// Decode base64 key
44+
let key_bytes = BASE64_STANDARD
45+
.decode(&path.key)
46+
.context("failed to decode base64 key")?;
47+
48+
// Get the KV value
49+
let udb = ctx.pools().udb()?;
50+
let (keys, values, metadata) =
51+
actor_kv::get(&*udb, path.actor_id, vec![key_bytes.clone()]).await?;
52+
53+
// Check if key was found
54+
if keys.is_empty() {
55+
return Err(pegboard::errors::Actor::KvKeyNotFound.build());
56+
}
57+
58+
// Encode value as base64
59+
let value_base64 = BASE64_STANDARD.encode(&values[0]);
60+
61+
Ok(KvGetResponse {
62+
value: value_base64,
63+
update_ts: metadata[0].create_ts,
64+
})
65+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
pub mod create;
22
pub mod delete;
3+
pub mod kv_get;
34
pub mod list;
45
pub mod list_names;

engine/packages/api-peer/src/router.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ pub async fn router(
2626
.route("/actors", post(actors::create::create))
2727
.route("/actors/{actor_id}", delete(actors::delete::delete))
2828
.route("/actors/names", get(actors::list_names::list_names))
29+
.route(
30+
"/actors/{actor_id}/kv/keys/{key}",
31+
get(actors::kv_get::kv_get),
32+
)
2933
// MARK: Runners
3034
.route("/runners", get(runners::list))
3135
.route("/runners/names", get(runners::list_names))
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
use anyhow::Result;
2+
use axum::response::{IntoResponse, Response};
3+
use rivet_api_builder::{
4+
ApiError,
5+
extract::{Extension, Path},
6+
};
7+
use rivet_api_util::request_remote_datacenter_raw;
8+
use rivet_util::Id;
9+
use serde::{Deserialize, Serialize};
10+
use utoipa::ToSchema;
11+
12+
use crate::ctx::ApiCtx;
13+
14+
#[derive(Deserialize)]
15+
#[serde(deny_unknown_fields)]
16+
pub struct KvGetPath {
17+
pub actor_id: Id,
18+
pub key: String,
19+
}
20+
21+
#[derive(Serialize, ToSchema)]
22+
#[schema(as = ActorsKvGetResponse)]
23+
pub struct KvGetResponse {
24+
pub value: String,
25+
pub update_ts: i64,
26+
}
27+
28+
#[utoipa::path(
29+
get,
30+
operation_id = "actors_kv_get",
31+
path = "/actors/{actor_id}/kv/keys/{key}",
32+
params(
33+
("actor_id" = Id, Path),
34+
("key" = String, Path),
35+
),
36+
responses(
37+
(status = 200, body = KvGetResponse),
38+
),
39+
security(("bearer_auth" = [])),
40+
)]
41+
#[tracing::instrument(skip_all)]
42+
pub async fn kv_get(Extension(ctx): Extension<ApiCtx>, Path(path): Path<KvGetPath>) -> Response {
43+
match kv_get_inner(ctx, path).await {
44+
Ok(response) => response,
45+
Err(err) => ApiError::from(err).into_response(),
46+
}
47+
}
48+
49+
#[tracing::instrument(skip_all)]
50+
async fn kv_get_inner(ctx: ApiCtx, path: KvGetPath) -> Result<Response> {
51+
use axum::Json;
52+
53+
ctx.auth().await?;
54+
55+
if path.actor_id.label() == ctx.config().dc_label() {
56+
let peer_path = rivet_api_peer::actors::kv_get::KvGetPath {
57+
actor_id: path.actor_id,
58+
key: path.key,
59+
};
60+
let peer_query = rivet_api_peer::actors::kv_get::KvGetQuery {};
61+
let res = rivet_api_peer::actors::kv_get::kv_get(ctx.into(), peer_path, peer_query).await?;
62+
63+
Ok(Json(res).into_response())
64+
} else {
65+
request_remote_datacenter_raw(
66+
&ctx,
67+
path.actor_id.label(),
68+
&format!("/actors/{}/kv/keys/{}", path.actor_id, path.key),
69+
axum::http::Method::GET,
70+
Option::<&()>::None,
71+
Option::<&()>::None,
72+
)
73+
.await
74+
}
75+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
pub mod create;
22
pub mod delete;
33
pub mod get_or_create;
4+
pub mod kv_get;
45
pub mod list;
56
pub mod list_names;
67
pub mod utils;

engine/packages/api-public/src/router.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use crate::{actors, ctx, datacenters, health, metadata, namespaces, runner_confi
1818
actors::delete::delete,
1919
actors::list_names::list_names,
2020
actors::get_or_create::get_or_create,
21+
actors::kv_get::kv_get,
2122
runners::list,
2223
runners::list_names,
2324
namespaces::list,
@@ -88,6 +89,10 @@ pub async fn router(
8889
"/actors/names",
8990
axum::routing::get(actors::list_names::list_names),
9091
)
92+
.route(
93+
"/actors/{actor_id}/kv/keys/{key}",
94+
axum::routing::get(actors::kv_get::kv_get),
95+
)
9196
// MARK: Runners
9297
.route("/runners", axum::routing::get(runners::list))
9398
.route("/runners/names", axum::routing::get(runners::list_names))

0 commit comments

Comments
 (0)