Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions engine/artifacts/errors/actor.kv_key_not_found.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

59 changes: 59 additions & 0 deletions engine/artifacts/openapi.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions engine/packages/api-peer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ license.workspace = true
[dependencies]
anyhow.workspace = true
axum.workspace = true
base64.workspace = true
gas.workspace = true
epoxy.workspace = true
futures-util.workspace = true
Expand All @@ -27,6 +28,7 @@ tokio.workspace = true
tracing.workspace = true
namespace.workspace = true
pegboard.workspace = true
pegboard-actor-kv.workspace = true
universalpubsub.workspace = true
uuid.workspace = true
utoipa.workspace = true
67 changes: 67 additions & 0 deletions engine/packages/api-peer/src/actors/kv_get.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use anyhow::*;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use pegboard_actor_kv as actor_kv;
use rivet_api_builder::ApiCtx;
use rivet_util::Id;
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;

#[derive(Deserialize)]
#[serde(deny_unknown_fields)]
pub struct KvGetPath {
pub actor_id: Id,
pub key: String,
}

#[derive(Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct KvGetQuery {}

#[derive(Serialize, ToSchema)]
#[schema(as = ActorsKvGetResponse)]
pub struct KvGetResponse {
/// Value encoded in base 64.
pub value: String,
pub update_ts: i64,
}

#[utoipa::path(
get,
operation_id = "actors_kv_get",
path = "/actors/{actor_id}/kv/keys/{key}",
params(
("actor_id" = Id, Path),
("key" = String, Path),
),
responses(
(status = 200, body = KvGetResponse),
),
)]
#[tracing::instrument(skip_all)]
pub async fn kv_get(ctx: ApiCtx, path: KvGetPath, _query: KvGetQuery) -> Result<KvGetResponse> {
// Decode base64 key
let key_bytes = BASE64_STANDARD
.decode(&path.key)
.context("failed to decode base64 key")?;

// Get the KV value
let udb = ctx.pools().udb()?;
let (keys, values, metadata) =
actor_kv::get(&*udb, path.actor_id, vec![key_bytes.clone()]).await?;

// Check if key was found
if keys.is_empty() {
return Err(pegboard::errors::Actor::KvKeyNotFound.build());
}

// Encode value as base64
let value_base64 = BASE64_STANDARD.encode(&values[0]);

Ok(KvGetResponse {
value: value_base64,
// NOTE: Intentionally uses different name in public API. `create_ts` is actually
// `update_ts`.
update_ts: metadata[0].create_ts,
})
}
1 change: 1 addition & 0 deletions engine/packages/api-peer/src/actors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod create;
pub mod delete;
pub mod kv_get;
pub mod list;
pub mod list_names;
4 changes: 4 additions & 0 deletions engine/packages/api-peer/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ pub async fn router(
.route("/actors", post(actors::create::create))
.route("/actors/{actor_id}", delete(actors::delete::delete))
.route("/actors/names", get(actors::list_names::list_names))
.route(
"/actors/{actor_id}/kv/keys/{key}",
get(actors::kv_get::kv_get),
)
// MARK: Runners
.route("/runners", get(runners::list))
.route("/runners/names", get(runners::list_names))
Expand Down
1 change: 1 addition & 0 deletions engine/packages/api-public/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ serde.workspace = true
tokio.workspace = true
tower-http.workspace = true
tracing.workspace = true
urlencoding.workspace = true
utoipa.workspace = true

[build-dependencies]
Expand Down
75 changes: 75 additions & 0 deletions engine/packages/api-public/src/actors/kv_get.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use anyhow::Result;
use axum::response::{IntoResponse, Response};
use rivet_api_builder::{
ApiError,
extract::{Extension, Path},
};
use rivet_api_util::request_remote_datacenter_raw;
use rivet_util::Id;
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;

use crate::ctx::ApiCtx;

#[derive(Deserialize)]
#[serde(deny_unknown_fields)]
pub struct KvGetPath {
pub actor_id: Id,
pub key: String,
}

#[derive(Serialize, ToSchema)]
#[schema(as = ActorsKvGetResponse)]
pub struct KvGetResponse {
pub value: String,
pub update_ts: i64,
}

#[utoipa::path(
get,
operation_id = "actors_kv_get",
path = "/actors/{actor_id}/kv/keys/{key}",
params(
("actor_id" = Id, Path),
("key" = String, Path),
),
responses(
(status = 200, body = KvGetResponse),
),
security(("bearer_auth" = [])),
)]
#[tracing::instrument(skip_all)]
pub async fn kv_get(Extension(ctx): Extension<ApiCtx>, Path(path): Path<KvGetPath>) -> Response {
match kv_get_inner(ctx, path).await {
Ok(response) => response,
Err(err) => ApiError::from(err).into_response(),
}
}

#[tracing::instrument(skip_all)]
async fn kv_get_inner(ctx: ApiCtx, path: KvGetPath) -> Result<Response> {
use axum::Json;

ctx.auth().await?;

if path.actor_id.label() == ctx.config().dc_label() {
let peer_path = rivet_api_peer::actors::kv_get::KvGetPath {
actor_id: path.actor_id,
key: path.key,
};
let peer_query = rivet_api_peer::actors::kv_get::KvGetQuery {};
let res = rivet_api_peer::actors::kv_get::kv_get(ctx.into(), peer_path, peer_query).await?;

Ok(Json(res).into_response())
} else {
request_remote_datacenter_raw(
&ctx,
path.actor_id.label(),
&format!("/actors/{}/kv/keys/{}", path.actor_id, urlencoding::encode(&path.key)),
axum::http::Method::GET,
Option::<&()>::None,
Option::<&()>::None,
)
.await
}
}
1 change: 1 addition & 0 deletions engine/packages/api-public/src/actors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod create;
pub mod delete;
pub mod get_or_create;
pub mod kv_get;
pub mod list;
pub mod list_names;
pub mod utils;
5 changes: 5 additions & 0 deletions engine/packages/api-public/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::{actors, ctx, datacenters, health, metadata, namespaces, runner_confi
actors::delete::delete,
actors::list_names::list_names,
actors::get_or_create::get_or_create,
actors::kv_get::kv_get,
runners::list,
runners::list_names,
namespaces::list,
Expand Down Expand Up @@ -88,6 +89,10 @@ pub async fn router(
"/actors/names",
axum::routing::get(actors::list_names::list_names),
)
.route(
"/actors/{actor_id}/kv/keys/{key}",
axum::routing::get(actors::kv_get::kv_get),
)
// MARK: Runners
.route("/runners", axum::routing::get(runners::list))
.route("/runners/names", axum::routing::get(runners::list_names))
Expand Down
3 changes: 3 additions & 0 deletions engine/packages/pegboard/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ pub enum Actor {
namespace: String,
runner_name: String,
},

#[error("kv_key_not_found", "The KV key does not exist for this actor.")]
KvKeyNotFound,
}

#[derive(RivetError, Debug, Clone, Deserialize, Serialize)]
Expand Down
2 changes: 1 addition & 1 deletion examples/cursors-raw-websocket/src/backend/registry.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import type {
} from "rivetkit";
import { lookupInRegistry } from "rivetkit";
import type { Client } from "rivetkit/client";
import {
type ActorDriver,
type AnyActorInstance,
type ManagerDriver,
import type {
ActorDriver,
AnyActorInstance,
ManagerDriver,
} from "rivetkit/driver-helpers";
import { promiseWithResolvers } from "rivetkit/utils";
import { KEYS } from "./actor-handler-do";
Expand Down Expand Up @@ -239,7 +239,6 @@ export class CloudflareActorsActorDriver implements ActorDriver {
// Persist data key
return Uint8Array.from([1]);
}

}

export function createCloudflareActorsActorDriverBuilder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
type GetOrCreateWithKeyInput,
type GetWithKeyInput,
generateRandomString,
type ListActorsInput,
type ManagerDisplayInformation,
type ManagerDriver,
WS_PROTOCOL_ACTOR,
Expand Down Expand Up @@ -348,6 +349,14 @@ export class CloudflareActorsManagerDriver implements ManagerDriver {
};
}

async listActors({ c, name }: ListActorsInput): Promise<ActorOutput[]> {
logger().warn({
msg: "listActors not fully implemented for Cloudflare Workers",
name,
});
return [];
}

// Helper method to build actor output from an ID
async #buildActorOutput(
c: any,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { actor } from "rivetkit";

export const counterWithParams = actor({
state: { count: 0, initializers: [] as string[] },
createConnState: (c, opts, params: { name?: string }) => {
createConnState: (c, params: { name?: string }) => {
return {
name: params.name || "anonymous",
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ export const connStateActor = actor({
// Define connection state
createConnState: (
c,
opts,
params: { username?: string; role?: string; noCount?: boolean },
): ConnState => {
return {
Expand Down
Loading
Loading