diff --git a/Cargo.lock b/Cargo.lock index 4431806248..c487b5a21e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2499,17 +2499,6 @@ dependencies = [ "generic-array", ] -[[package]] -name = "internal" -version = "2.0.24-rc.1" -dependencies = [ - "anyhow", - "gasoline", - "rivet-api-util", - "rivet-types", - "serde", -] - [[package]] name = "io-uring" version = "0.7.9" @@ -2906,8 +2895,6 @@ dependencies = [ "epoxy", "epoxy-protocol", "gasoline", - "internal", - "reqwest", "rivet-api-builder", "rivet-api-types", "rivet-api-util", @@ -3399,6 +3386,8 @@ dependencies = [ "lazy_static", "namespace", "nix 0.30.1", + "reqwest", + "reqwest-eventsource", "rivet-api-types", "rivet-api-util", "rivet-data", @@ -3413,6 +3402,7 @@ dependencies = [ "tracing", "universaldb", "universalpubsub", + "url", "utoipa", "vbare", ] @@ -3498,28 +3488,6 @@ dependencies = [ "vbare", ] -[[package]] -name = "pegboard-serverless" -version = "2.0.24-rc.1" -dependencies = [ - "anyhow", - "base64 0.22.1", - "epoxy", - "gasoline", - "namespace", - "pegboard", - "reqwest", - "reqwest-eventsource", - "rivet-config", - "rivet-runner-protocol", - "rivet-types", - "rivet-util", - "tracing", - "universaldb", - "universalpubsub", - "vbare", -] - [[package]] name = "pem-rfc7468" version = "0.7.0" @@ -4457,7 +4425,6 @@ dependencies = [ "namespace", "pegboard", "pegboard-runner", - "pegboard-serverless", "portpicker", "rand 0.8.5", "reqwest", diff --git a/Cargo.toml b/Cargo.toml index 7630afc160..127e7ee494 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [workspace] resolver = "2" -members = ["engine/packages/actor-kv","engine/packages/api-builder","engine/packages/api-peer","engine/packages/api-public","engine/packages/api-types","engine/packages/api-util","engine/packages/bootstrap","engine/packages/cache","engine/packages/cache-purge","engine/packages/cache-result","engine/packages/clickhouse-inserter","engine/packages/clickhouse-user-query","engine/packages/config","engine/packages/dump-openapi","engine/packages/engine","engine/packages/env","engine/packages/epoxy","engine/packages/error","engine/packages/error-macros","engine/packages/gasoline","engine/packages/gasoline-macros","engine/packages/guard","engine/packages/guard-core","engine/packages/internal","engine/packages/logs","engine/packages/metrics","engine/packages/namespace","engine/packages/pegboard","engine/packages/pegboard-gateway","engine/packages/pegboard-runner","engine/packages/pegboard-serverless","engine/packages/pools","engine/packages/runtime","engine/packages/service-manager","engine/packages/telemetry","engine/packages/test-deps","engine/packages/test-deps-docker","engine/packages/tracing-reconfigure","engine/packages/types","engine/packages/universaldb","engine/packages/universalpubsub","engine/packages/util","engine/packages/util-id","engine/packages/workflow-worker","engine/sdks/rust/api-full","engine/sdks/rust/data","engine/sdks/rust/epoxy-protocol","engine/sdks/rust/runner-protocol","engine/sdks/rust/ups-protocol"] +members = ["engine/packages/actor-kv","engine/packages/api-builder","engine/packages/api-peer","engine/packages/api-public","engine/packages/api-types","engine/packages/api-util","engine/packages/bootstrap","engine/packages/cache","engine/packages/cache-purge","engine/packages/cache-result","engine/packages/clickhouse-inserter","engine/packages/clickhouse-user-query","engine/packages/config","engine/packages/dump-openapi","engine/packages/engine","engine/packages/env","engine/packages/epoxy","engine/packages/error","engine/packages/error-macros","engine/packages/gasoline","engine/packages/gasoline-macros","engine/packages/guard","engine/packages/guard-core","engine/packages/logs","engine/packages/metrics","engine/packages/namespace","engine/packages/pegboard","engine/packages/pegboard-gateway","engine/packages/pegboard-runner","engine/packages/pools","engine/packages/runtime","engine/packages/service-manager","engine/packages/telemetry","engine/packages/test-deps","engine/packages/test-deps-docker","engine/packages/tracing-reconfigure","engine/packages/types","engine/packages/universaldb","engine/packages/universalpubsub","engine/packages/util","engine/packages/util-id","engine/packages/workflow-worker","engine/sdks/rust/api-full","engine/sdks/rust/data","engine/sdks/rust/epoxy-protocol","engine/sdks/rust/runner-protocol","engine/sdks/rust/ups-protocol"] [workspace.package] version = "2.0.24-rc.1" @@ -334,9 +334,6 @@ path = "engine/packages/guard" [workspace.dependencies.rivet-guard-core] path = "engine/packages/guard-core" -[workspace.dependencies.internal] -path = "engine/packages/internal" - [workspace.dependencies.rivet-logs] path = "engine/packages/logs" @@ -355,9 +352,6 @@ path = "engine/packages/pegboard-gateway" [workspace.dependencies.pegboard-runner] path = "engine/packages/pegboard-runner" -[workspace.dependencies.pegboard-serverless] -path = "engine/packages/pegboard-serverless" - [workspace.dependencies.rivet-pools] path = "engine/packages/pools" diff --git a/engine/artifacts/errors/api.bad_request.json b/engine/artifacts/errors/api.bad_request.json deleted file mode 100644 index 54871df310..0000000000 --- a/engine/artifacts/errors/api.bad_request.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "code": "bad_request", - "group": "api", - "message": "Request is invalid" -} \ No newline at end of file diff --git a/engine/artifacts/errors/api.forbidden.json b/engine/artifacts/errors/api.forbidden.json deleted file mode 100644 index d86125b508..0000000000 --- a/engine/artifacts/errors/api.forbidden.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "code": "forbidden", - "group": "api", - "message": "Access denied" -} \ No newline at end of file diff --git a/engine/artifacts/errors/api.internal_error.json b/engine/artifacts/errors/api.internal_error.json deleted file mode 100644 index 2bb3d5c81f..0000000000 --- a/engine/artifacts/errors/api.internal_error.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "code": "internal_error", - "group": "api", - "message": "An internal server error occurred" -} \ No newline at end of file diff --git a/engine/artifacts/errors/api.not_found.json b/engine/artifacts/errors/api.not_found.json deleted file mode 100644 index 97d7cace0e..0000000000 --- a/engine/artifacts/errors/api.not_found.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "code": "not_found", - "group": "api", - "message": "The requested resource was not found" -} \ No newline at end of file diff --git a/engine/artifacts/errors/api.rate_limited.json b/engine/artifacts/errors/api.rate_limited.json deleted file mode 100644 index 4f42c1974d..0000000000 --- a/engine/artifacts/errors/api.rate_limited.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "code": "rate_limited", - "group": "api", - "message": "\n\tRate limit exceeded.\n\t\n\tThe API rate limit has been exceeded for this endpoint.\n\tPlease wait before making additional requests.\n\t" -} \ No newline at end of file diff --git a/engine/artifacts/errors/api.unauthorized.json b/engine/artifacts/errors/api.unauthorized.json deleted file mode 100644 index d7e369b02d..0000000000 --- a/engine/artifacts/errors/api.unauthorized.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "code": "unauthorized", - "group": "api", - "message": "Authentication required" -} \ No newline at end of file diff --git a/engine/artifacts/errors/kv.leader_forwarding_failed.json b/engine/artifacts/errors/kv.leader_forwarding_failed.json deleted file mode 100644 index 1c72e2127f..0000000000 --- a/engine/artifacts/errors/kv.leader_forwarding_failed.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "code": "leader_forwarding_failed", - "group": "kv", - "message": "Failed to forward request to leader." -} \ No newline at end of file diff --git a/engine/artifacts/errors/kv.no_leader_elected.json b/engine/artifacts/errors/kv.no_leader_elected.json deleted file mode 100644 index ed4e5ad8b8..0000000000 --- a/engine/artifacts/errors/kv.no_leader_elected.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "code": "no_leader_elected", - "group": "kv", - "message": "No leader has been elected yet." -} \ No newline at end of file diff --git a/engine/artifacts/errors/kv.not_leader.json b/engine/artifacts/errors/kv.not_leader.json deleted file mode 100644 index ea061498b4..0000000000 --- a/engine/artifacts/errors/kv.not_leader.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "code": "not_leader", - "group": "kv", - "message": "Current node is not the leader." -} \ No newline at end of file diff --git a/engine/artifacts/errors/kv.response_channel_closed.json b/engine/artifacts/errors/kv.response_channel_closed.json deleted file mode 100644 index 85184515f2..0000000000 --- a/engine/artifacts/errors/kv.response_channel_closed.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "code": "response_channel_closed", - "group": "kv", - "message": "Failed to receive KV response, channel closed." -} \ No newline at end of file diff --git a/engine/artifacts/errors/namespace.invalid_name.json b/engine/artifacts/errors/namespace.invalid_name.json deleted file mode 100644 index 537dba4c8f..0000000000 --- a/engine/artifacts/errors/namespace.invalid_name.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "code": "invalid_name", - "group": "namespace", - "message": "\n\tInvalid namespace name.\n\t\n\tNamespace names must be valid DNS subdomains.\n\t" -} \ No newline at end of file diff --git a/engine/artifacts/errors/serverless_runner_pool.not_found.json b/engine/artifacts/errors/serverless_runner_pool.not_found.json new file mode 100644 index 0000000000..8695b814ec --- /dev/null +++ b/engine/artifacts/errors/serverless_runner_pool.not_found.json @@ -0,0 +1,5 @@ +{ + "code": "not_found", + "group": "serverless_runner_pool", + "message": "No serverless pool for this runner exists." +} \ No newline at end of file diff --git a/engine/artifacts/openapi.json b/engine/artifacts/openapi.json index e13da9b43f..0e59ac1fc0 100644 --- a/engine/artifacts/openapi.json +++ b/engine/artifacts/openapi.json @@ -11,7 +11,7 @@ "name": "Apache-2.0", "identifier": "Apache-2.0" }, - "version": "2.0.23" + "version": "2.0.24-rc.1" }, "paths": { "/actors": { diff --git a/engine/packages/api-peer/Cargo.toml b/engine/packages/api-peer/Cargo.toml index 8b95d7e98a..2bcc915ad4 100644 --- a/engine/packages/api-peer/Cargo.toml +++ b/engine/packages/api-peer/Cargo.toml @@ -23,10 +23,10 @@ serde.workspace = true serde_json.workspace = true indexmap.workspace = true -tokio.workspace = true -tracing.workspace = true namespace.workspace = true pegboard.workspace = true +tokio.workspace = true +tracing.workspace = true universalpubsub.workspace = true uuid.workspace = true utoipa.workspace = true diff --git a/engine/packages/api-peer/src/internal.rs b/engine/packages/api-peer/src/internal.rs index e9359c764d..27068f5d86 100644 --- a/engine/packages/api-peer/src/internal.rs +++ b/engine/packages/api-peer/src/internal.rs @@ -29,6 +29,12 @@ pub async fn cache_purge( Ok(CachePurgeResponse {}) } +#[derive(Serialize, Deserialize)] +pub struct BumpServerlessAutoscalerRequest { + pub namespace_id: Id, + pub runner_name: String, +} + #[derive(Serialize)] #[serde(deny_unknown_fields)] pub struct BumpServerlessAutoscalerResponse {} @@ -37,11 +43,25 @@ pub async fn bump_serverless_autoscaler( ctx: ApiCtx, _path: (), _query: (), - _body: (), + body: BumpServerlessAutoscalerRequest, ) -> Result { - ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {}) + let res = ctx + .signal(pegboard::workflows::serverless::pool::Bump {}) + .to_workflow::() + .tag("namespace_id", body.namespace_id) + .tag("runner_name", body.runner_name.clone()) .send() - .await?; + .await; + + if let Some(WorkflowError::WorkflowNotFound) = res + .as_ref() + .err() + .and_then(|x| x.chain().find_map(|x| x.downcast_ref::())) + { + return Err(pegboard::errors::ServerlessRunnerPool::NotFound.build()); + } else { + res?; + } Ok(BumpServerlessAutoscalerResponse {}) } diff --git a/engine/packages/api-peer/src/runner_configs.rs b/engine/packages/api-peer/src/runner_configs.rs index 8324984794..045f65155c 100644 --- a/engine/packages/api-peer/src/runner_configs.rs +++ b/engine/packages/api-peer/src/runner_configs.rs @@ -17,7 +17,7 @@ pub async fn list(ctx: ApiCtx, _path: ListPath, query: ListQuery) -> Result Result Result .await? .ok_or_else(|| namespace::errors::Namespace::NotFound.build())?; - ctx.op(namespace::ops::runner_config::delete::Input { + ctx.op(pegboard::ops::runner_config::delete::Input { namespace_id: namespace.namespace_id, name: path.runner_name, }) diff --git a/engine/packages/api-public/src/runner_configs/refresh_metadata.rs b/engine/packages/api-public/src/runner_configs/refresh_metadata.rs index 8537e87cea..5880cf3610 100644 --- a/engine/packages/api-public/src/runner_configs/refresh_metadata.rs +++ b/engine/packages/api-public/src/runner_configs/refresh_metadata.rs @@ -88,7 +88,7 @@ async fn refresh_metadata_inner( .collect(); let runner_configs = ctx - .op(namespace::ops::runner_config::get::Input { + .op(pegboard::ops::runner_config::get::Input { runners, bypass_cache: true, }) diff --git a/engine/packages/engine/Cargo.toml b/engine/packages/engine/Cargo.toml index efda95749f..8e06b715f1 100644 --- a/engine/packages/engine/Cargo.toml +++ b/engine/packages/engine/Cargo.toml @@ -21,7 +21,6 @@ include_dir.workspace = true indoc.workspace = true lz4_flex.workspace = true pegboard-runner.workspace = true -pegboard-serverless.workspace = true reqwest.workspace = true rivet-api-peer.workspace = true rivet-bootstrap.workspace = true diff --git a/engine/packages/engine/src/run_config.rs b/engine/packages/engine/src/run_config.rs index 07d23d8c48..eb8bfbc237 100644 --- a/engine/packages/engine/src/run_config.rs +++ b/engine/packages/engine/src/run_config.rs @@ -17,12 +17,6 @@ pub fn config(_rivet_config: rivet_config::Config) -> Result { Service::new("bootstrap", ServiceKind::Oneshot, |config, pools| { Box::pin(rivet_bootstrap::start(config, pools)) }), - Service::new( - "pegboard_serverless", - // There should only be one of these, since it's auto-scaling requests - ServiceKind::Singleton, - |config, pools| Box::pin(pegboard_serverless::start(config, pools)), - ), // Core services Service::new("tracing_reconfigure", ServiceKind::Core, |config, pools| { Box::pin(rivet_tracing_reconfigure::start(config, pools)) diff --git a/engine/packages/gasoline/src/ctx/activity.rs b/engine/packages/gasoline/src/ctx/activity.rs index ce1de28c55..f1bb667e1b 100644 --- a/engine/packages/gasoline/src/ctx/activity.rs +++ b/engine/packages/gasoline/src/ctx/activity.rs @@ -7,16 +7,18 @@ use tokio::sync::Mutex; use tracing::Instrument; use crate::{ + builder::common as builder, ctx::{ common, message::{MessageCtx, SubscriptionHandle}, }, - db::DatabaseHandle, + db::{DatabaseHandle, WorkflowData}, error::{WorkflowError, WorkflowResult}, message::Message, operation::{Operation, OperationInput}, + signal::Signal, utils::tags::AsTags, - workflow::StateGuard, + workflow::{StateGuard, Workflow}, }; pub struct ActivityCtx { @@ -77,6 +79,33 @@ impl ActivityCtx { } impl ActivityCtx { + /// Finds the first incomplete workflow with the given tags. + #[tracing::instrument(skip_all, ret(Debug), fields(workflow_name=W::NAME))] + pub async fn find_workflow(&self, tags: impl AsTags) -> Result> { + common::find_workflow::(&self.db, tags) + .in_current_span() + .await + } + + /// Finds the first incomplete workflow with the given tags. + #[tracing::instrument(skip_all)] + pub async fn get_workflows(&self, workflow_ids: Vec) -> Result> { + common::get_workflows(&self.db, workflow_ids) + .in_current_span() + .await + } + + /// Creates a signal builder. + pub fn signal(&self, body: T) -> builder::signal::SignalBuilder { + builder::signal::SignalBuilder::new( + self.db.clone(), + self.config.clone(), + self.ray_id, + body, + true, + ) + } + #[tracing::instrument(skip_all)] pub fn state(&self) -> Result> { if self.parallelized { diff --git a/engine/packages/gasoline/src/ctx/operation.rs b/engine/packages/gasoline/src/ctx/operation.rs index 167b0f9af4..7e2030205e 100644 --- a/engine/packages/gasoline/src/ctx/operation.rs +++ b/engine/packages/gasoline/src/ctx/operation.rs @@ -101,7 +101,6 @@ impl OperationCtx { /// Creates a signal builder. pub fn signal(&self, body: T) -> builder::signal::SignalBuilder { - // TODO: Add check for from_workflow so you cant dispatch a signal builder::signal::SignalBuilder::new( self.db.clone(), self.config.clone(), diff --git a/engine/packages/gasoline/src/db/mod.rs b/engine/packages/gasoline/src/db/mod.rs index 81af4f6b24..a5c24da006 100644 --- a/engine/packages/gasoline/src/db/mod.rs +++ b/engine/packages/gasoline/src/db/mod.rs @@ -309,6 +309,10 @@ impl WorkflowData { .transpose() .map_err(WorkflowError::DeserializeWorkflowOutput) } + + pub fn has_output(&self) -> bool { + self.output.is_some() + } } #[derive(Debug)] diff --git a/engine/packages/internal/Cargo.toml b/engine/packages/internal/Cargo.toml deleted file mode 100644 index 8c85c065b7..0000000000 --- a/engine/packages/internal/Cargo.toml +++ /dev/null @@ -1,13 +0,0 @@ -[package] -name = "internal" -version.workspace = true -authors.workspace = true -license.workspace = true -edition.workspace = true - -[dependencies] -anyhow.workspace = true -gas.workspace = true -rivet-api-util.workspace = true -rivet-types.workspace = true -serde.workspace = true diff --git a/engine/packages/internal/README.md b/engine/packages/internal/README.md deleted file mode 100644 index 0c60920680..0000000000 --- a/engine/packages/internal/README.md +++ /dev/null @@ -1 +0,0 @@ -TODO: move somewhere else diff --git a/engine/packages/internal/src/lib.rs b/engine/packages/internal/src/lib.rs deleted file mode 100644 index 01eafd2ecc..0000000000 --- a/engine/packages/internal/src/lib.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod ops; diff --git a/engine/packages/internal/src/ops/bump_serverless_autoscaler_global.rs b/engine/packages/internal/src/ops/bump_serverless_autoscaler_global.rs deleted file mode 100644 index 402f7829ed..0000000000 --- a/engine/packages/internal/src/ops/bump_serverless_autoscaler_global.rs +++ /dev/null @@ -1,64 +0,0 @@ -use std::fmt::Debug; - -use futures_util::StreamExt; -use gas::prelude::*; -use rivet_api_util::{Method, request_remote_datacenter}; - -#[derive(Clone, Debug, Default)] -pub struct Input {} - -#[operation] -pub async fn bump_serverless_autoscaler_global(ctx: &OperationCtx, input: &Input) -> Result<()> { - let dcs = &ctx.config().topology().datacenters; - - let results = futures_util::stream::iter(dcs.clone().into_iter().map(|dc| { - let ctx = ctx.clone(); - - async move { - if dc.datacenter_label == ctx.config().dc_label() { - // Local datacenter - ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {}) - .send() - .await - } else { - // Remote datacenter - HTTP request - request_remote_datacenter::( - ctx.config(), - dc.datacenter_label, - "/bump-serverless-autoscaler", - Method::POST, - Option::<&()>::None, - Option::<&()>::None, - ) - .await - .map(|_| ()) - } - } - })) - .buffer_unordered(16) - .collect::>() - .await; - - // Aggregate results - let result_count = results.len(); - let mut errors = Vec::new(); - for res in results { - if let Err(err) = res { - tracing::error!(?err, "failed to request edge dc"); - errors.push(err); - } - } - - // Error only if all requests failed - if result_count == errors.len() { - if let Some(res) = errors.into_iter().next() { - return Err(res).context("all datacenter requests failed"); - } - } - - Ok(()) -} - -// TODO: This is cloned from api-peer because of a cyclical dependency -#[derive(Deserialize)] -pub struct BumpServerlessAutoscalerResponse {} diff --git a/engine/packages/internal/src/ops/cache/mod.rs b/engine/packages/internal/src/ops/cache/mod.rs deleted file mode 100644 index 65ba4904bd..0000000000 --- a/engine/packages/internal/src/ops/cache/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod purge_global; diff --git a/engine/packages/internal/src/ops/cache/purge_global.rs b/engine/packages/internal/src/ops/cache/purge_global.rs deleted file mode 100644 index 982457d89e..0000000000 --- a/engine/packages/internal/src/ops/cache/purge_global.rs +++ /dev/null @@ -1,81 +0,0 @@ -use std::fmt::Debug; - -use futures_util::StreamExt; -use gas::prelude::*; -use rivet_api_util::{Method, request_remote_datacenter}; -use rivet_cache::RawCacheKey; -use serde::Serialize; - -#[derive(Clone, Debug, Default)] -pub struct Input { - pub base_key: String, - pub keys: Vec, -} - -#[operation] -pub async fn cache_purge_global(ctx: &OperationCtx, input: &Input) -> Result<()> { - let dcs = &ctx.config().topology().datacenters; - - let results = futures_util::stream::iter(dcs.clone().into_iter().map(|dc| { - let ctx = ctx.clone(); - let input = input.clone(); - - async move { - if dc.datacenter_label == ctx.config().dc_label() { - // Local datacenter - ctx.cache() - .clone() - .request() - .purge(input.base_key, input.keys) - .await - } else { - // Remote datacenter - HTTP request - request_remote_datacenter::( - ctx.config(), - dc.datacenter_label, - "/cache/purge", - Method::POST, - Option::<&()>::None, - Some(&CachePurgeRequest { - base_key: input.base_key, - keys: input.keys, - }), - ) - .await - .map(|_| ()) - } - } - })) - .buffer_unordered(16) - .collect::>() - .await; - - // Aggregate results - let result_count = results.len(); - let mut errors = Vec::new(); - for res in results { - if let Err(err) = res { - tracing::error!(?err, "failed to request edge dc"); - errors.push(err); - } - } - - // Error only if all requests failed - if result_count == errors.len() { - if let Some(res) = errors.into_iter().next() { - return Err(res).context("all datacenter requests failed"); - } - } - - Ok(()) -} - -// TODO: This is cloned from api-peer because of a cyclical dependency -#[derive(Serialize)] -pub struct CachePurgeRequest { - pub base_key: String, - pub keys: Vec, -} - -#[derive(Deserialize)] -pub struct CachePurgeResponse {} diff --git a/engine/packages/internal/src/ops/mod.rs b/engine/packages/internal/src/ops/mod.rs deleted file mode 100644 index ab866e20d5..0000000000 --- a/engine/packages/internal/src/ops/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod bump_serverless_autoscaler_global; -pub mod cache; diff --git a/engine/packages/namespace/Cargo.toml b/engine/packages/namespace/Cargo.toml index c9d6e953e4..e9e627e46b 100644 --- a/engine/packages/namespace/Cargo.toml +++ b/engine/packages/namespace/Cargo.toml @@ -10,8 +10,6 @@ anyhow.workspace = true epoxy-protocol.workspace = true epoxy.workspace = true gas.workspace = true -internal.workspace = true -reqwest.workspace = true rivet-api-builder.workspace = true rivet-api-types.workspace = true rivet-api-util.workspace = true diff --git a/engine/packages/namespace/src/keys/mod.rs b/engine/packages/namespace/src/keys/mod.rs index b04883c981..020c834f8d 100644 --- a/engine/packages/namespace/src/keys/mod.rs +++ b/engine/packages/namespace/src/keys/mod.rs @@ -2,8 +2,6 @@ use anyhow::Result; use gas::prelude::*; use universaldb::prelude::*; -pub mod runner_config; - pub fn subspace() -> universaldb::utils::Subspace { universaldb::utils::Subspace::new(&(RIVET, NAMESPACE)) } diff --git a/engine/packages/namespace/src/ops/mod.rs b/engine/packages/namespace/src/ops/mod.rs index f08fd1f5b5..74fc79b4a9 100644 --- a/engine/packages/namespace/src/ops/mod.rs +++ b/engine/packages/namespace/src/ops/mod.rs @@ -3,4 +3,3 @@ pub mod get_local; pub mod list; pub mod resolve_for_name_global; pub mod resolve_for_name_local; -pub mod runner_config; diff --git a/engine/packages/pegboard-serverless/Cargo.toml b/engine/packages/pegboard-serverless/Cargo.toml deleted file mode 100644 index 6112a75754..0000000000 --- a/engine/packages/pegboard-serverless/Cargo.toml +++ /dev/null @@ -1,25 +0,0 @@ -[package] -name = "pegboard-serverless" -version.workspace = true -authors.workspace = true -license.workspace = true -edition.workspace = true - -[dependencies] -anyhow.workspace = true -base64.workspace = true -epoxy.workspace = true -gas.workspace = true -reqwest-eventsource.workspace = true -reqwest.workspace = true -rivet-config.workspace = true -rivet-runner-protocol.workspace = true -rivet-types.workspace = true -rivet-util.workspace = true -tracing.workspace = true -universaldb.workspace = true -universalpubsub.workspace = true -vbare.workspace = true - -namespace.workspace = true -pegboard.workspace = true diff --git a/engine/packages/pegboard-serverless/src/lib.rs b/engine/packages/pegboard-serverless/src/lib.rs deleted file mode 100644 index 19d9c9386c..0000000000 --- a/engine/packages/pegboard-serverless/src/lib.rs +++ /dev/null @@ -1,524 +0,0 @@ -use std::{ - collections::HashMap, - sync::{ - Arc, - atomic::{AtomicBool, Ordering}, - }, -}; - -use anyhow::{Context, Result}; -use base64::Engine; -use base64::engine::general_purpose::STANDARD as BASE64; -use futures_util::{StreamExt, TryStreamExt}; -use gas::prelude::*; -use pegboard::keys; -use reqwest::header::{HeaderName, HeaderValue}; -use reqwest_eventsource as sse; -use rivet_runner_protocol as protocol; -use rivet_types::runner_configs::RunnerConfigKind; -use tokio::{sync::oneshot, task::JoinHandle, time::Duration}; -use universaldb::options::StreamingMode; -use universaldb::utils::IsolationLevel::*; -use universalpubsub::PublishOpts; -use vbare::OwnedVersionedData; - -const X_RIVET_ENDPOINT: HeaderName = HeaderName::from_static("x-rivet-endpoint"); -const X_RIVET_TOKEN: HeaderName = HeaderName::from_static("x-rivet-token"); -const X_RIVET_TOTAL_SLOTS: HeaderName = HeaderName::from_static("x-rivet-total-slots"); -const X_RIVET_RUNNER_NAME: HeaderName = HeaderName::from_static("x-rivet-runner-name"); -const X_RIVET_NAMESPACE_NAME: HeaderName = HeaderName::from_static("x-rivet-namespace-name"); - -const DRAIN_GRACE_PERIOD: Duration = Duration::from_secs(5); - -struct OutboundConnection { - handle: JoinHandle<()>, - shutdown_tx: oneshot::Sender<()>, - draining: Arc, -} - -#[tracing::instrument(skip_all)] -pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> Result<()> { - let cache = rivet_cache::CacheInner::from_env(&config, pools.clone())?; - let ctx = StandaloneCtx::new( - db::DatabaseKv::from_pools(pools.clone()).await?, - config.clone(), - pools, - cache, - "pegboard-serverless", - Id::new_v1(config.dc_label()), - Id::new_v1(config.dc_label()), - )?; - - let mut sub = ctx - .subscribe::(()) - .await?; - let mut outbound_connections = HashMap::new(); - - loop { - tick(&ctx, &mut outbound_connections).await?; - - sub.next().await?; - } -} - -async fn tick( - ctx: &StandaloneCtx, - outbound_connections: &mut HashMap<(Id, String), Vec>, -) -> Result<()> { - let serverless_data = ctx - .udb()? - .run(|tx| async move { - let tx = tx.with_subspace(keys::subspace()); - - let serverless_desired_subspace = keys::subspace().subspace( - &rivet_types::keys::pegboard::ns::ServerlessDesiredSlotsKey::entire_subspace(), - ); - - tx.get_ranges_keyvalues( - universaldb::RangeOption { - mode: StreamingMode::WantAll, - ..(&serverless_desired_subspace).into() - }, - // NOTE: This is a snapshot to prevent conflict with updates to this subspace - Snapshot, - ) - .map(|res| match res { - Ok(entry) => { - let (key, desired_slots) = - tx.read_entry::(&entry)?; - - Ok((key.namespace_id, key.runner_name, desired_slots)) - } - Err(err) => Err(err.into()), - }) - .try_collect::>() - .await - }) - .custom_instrument(tracing::info_span!("tick_tx")) - .await?; - - let runner_configs = ctx - .op(namespace::ops::runner_config::get::Input { - runners: serverless_data - .iter() - .map(|(ns_id, runner_name, _)| (*ns_id, runner_name.clone())) - .collect(), - bypass_cache: true, - }) - .await?; - - // Process each runner config with error handling - for (ns_id, runner_name, desired_slots) in &serverless_data { - let runner_config = runner_configs - .iter() - .find(|rc| rc.namespace_id == *ns_id && &rc.name == runner_name); - - let Some(runner_config) = runner_config else { - tracing::debug!( - ?ns_id, - ?runner_name, - "runner config not found, likely deleted" - ); - continue; - }; - - if let Err(err) = tick_runner_config( - ctx, - *ns_id, - runner_name.clone(), - *desired_slots, - runner_config, - outbound_connections, - ) - .await - { - tracing::error!( - ?ns_id, - ?runner_name, - ?err, - "failed to process runner config, continuing with others" - ); - // Continue processing other runner configs even if this one failed - continue; - } - } - - // Remove entries that aren't returned from udb - outbound_connections.retain(|(ns_id, runner_name), _| { - serverless_data - .iter() - .any(|(ns_id2, runner_name2, _)| ns_id == ns_id2 && runner_name == runner_name2) - }); - - tracing::debug!( - connection_counts=?outbound_connections.iter().map(|(k, v)| (k, v.len())).collect::>(), - ); - - Ok(()) -} - -async fn tick_runner_config( - ctx: &StandaloneCtx, - ns_id: Id, - runner_name: String, - desired_slots: i64, - runner_config: &namespace::ops::runner_config::get::RunnerConfig, - outbound_connections: &mut HashMap<(Id, String), Vec>, -) -> Result<()> { - let namespace = ctx - .op(namespace::ops::get_global::Input { - namespace_ids: vec![ns_id.clone()], - }) - .await - .context("runner namespace not found")?; - let namespace = namespace.first().context("runner namespace not found")?; - let namespace_name = &namespace.name; - - let RunnerConfigKind::Serverless { - url, - headers, - request_lifespan, - slots_per_runner, - min_runners, - max_runners, - runners_margin, - } = &runner_config.config.kind - else { - tracing::debug!("not serverless config"); - return Ok(()); - }; - - let curr = outbound_connections - .entry((ns_id, runner_name.clone())) - .or_insert_with(Vec::new); - - // Remove finished and draining connections from list - curr.retain(|conn| !conn.handle.is_finished() && !conn.draining.load(Ordering::SeqCst)); - - // Log warning and reset to 0 if negative - let adjusted_desired_slots = if desired_slots < 0 { - tracing::error!( - ?ns_id, - ?runner_name, - ?desired_slots, - "negative desired slots, scaling to 0" - ); - 0 - } else { - desired_slots - }; - - let desired_count = - (rivet_util::math::div_ceil_i64(adjusted_desired_slots, *slots_per_runner as i64) - .max(*min_runners as i64) - + *runners_margin as i64) - .min(*max_runners as i64) - .try_into()?; - - // Calculate diff - let drain_count = curr.len().saturating_sub(desired_count); - let start_count = desired_count.saturating_sub(curr.len()); - - tracing::debug!(%namespace_name, %runner_name, %desired_count, %drain_count, %start_count, "scaling"); - - if drain_count != 0 { - // TODO: Implement smart logic of draining runners with the lowest allocated actors - let draining_connections = curr.split_off(desired_count); - - for conn in draining_connections { - if conn.shutdown_tx.send(()).is_err() { - tracing::debug!( - "serverless connection shutdown channel dropped, likely already stopped" - ); - } - } - } - - let starting_connections = std::iter::repeat_with(|| { - spawn_connection( - ctx.clone(), - url.clone(), - headers.clone(), - Duration::from_secs(*request_lifespan as u64), - *slots_per_runner, - runner_name.clone(), - namespace_name.clone(), - ) - }) - .take(start_count); - curr.extend(starting_connections); - - Ok(()) -} - -fn spawn_connection( - ctx: StandaloneCtx, - url: String, - headers: HashMap, - request_lifespan: Duration, - slots_per_runner: u32, - runner_name: String, - namespace_name: String, -) -> OutboundConnection { - let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); - let draining = Arc::new(AtomicBool::new(false)); - - let draining2 = draining.clone(); - let handle = tokio::spawn(async move { - if let Err(err) = outbound_handler( - &ctx, - url, - headers, - request_lifespan, - slots_per_runner, - runner_name, - namespace_name, - shutdown_rx, - draining2, - ) - .await - { - tracing::warn!(?err, "outbound req failed"); - - // TODO: Add backoff - tokio::time::sleep(Duration::from_secs(1)).await; - - // On error, bump the autoscaler loop again - let _ = ctx - .msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {}) - .send() - .await; - } - }); - - OutboundConnection { - handle, - shutdown_tx, - draining, - } -} - -async fn outbound_handler( - ctx: &StandaloneCtx, - url: String, - headers: HashMap, - request_lifespan: Duration, - slots_per_runner: u32, - runner_name: String, - namespace_name: String, - shutdown_rx: oneshot::Receiver<()>, - draining: Arc, -) -> Result<()> { - let current_dc = ctx.config().topology().current_dc()?; - - let client = rivet_pools::reqwest::client_no_timeout().await?; - - let token = if let Some(auth) = &ctx.config().auth { - Some(( - X_RIVET_TOKEN, - HeaderValue::try_from(auth.admin_token.read())?, - )) - } else { - None - }; - - let headers = headers - .into_iter() - .flat_map(|(k, v)| { - // NOTE: This will filter out invalid headers without warning - Some(( - k.parse::().ok()?, - v.parse::().ok()?, - )) - }) - .chain([ - ( - X_RIVET_ENDPOINT, - HeaderValue::try_from(current_dc.public_url.to_string())?, - ), - ( - X_RIVET_TOTAL_SLOTS, - HeaderValue::try_from(slots_per_runner)?, - ), - (X_RIVET_RUNNER_NAME, HeaderValue::try_from(runner_name)?), - ( - X_RIVET_NAMESPACE_NAME, - HeaderValue::try_from(namespace_name.clone())?, - ), - // Deprecated - ( - HeaderName::from_static("x-rivet-namespace-id"), - HeaderValue::try_from(namespace_name)?, - ), - ]) - .chain(token) - .collect(); - - let endpoint_url = format!("{}/start", url.trim_end_matches('/')); - tracing::debug!(%endpoint_url, "sending outbound req"); - let req = client.get(endpoint_url).headers(headers); - - let mut source = sse::EventSource::new(req).context("failed creating event source")?; - let mut runner_id = None; - - let stream_handler = async { - while let Some(event) = source.next().await { - match event { - Ok(sse::Event::Open) => {} - Ok(sse::Event::Message(msg)) => { - tracing::debug!(%msg.data, "received outbound req message"); - - if runner_id.is_none() { - let data = BASE64.decode(msg.data).context("invalid base64 message")?; - let payload = - protocol::versioned::ToServerlessServer::deserialize_with_embedded_version(&data) - .context("invalid payload")?; - - match payload { - protocol::ToServerlessServer::ToServerlessServerInit(init) => { - runner_id = - Some(Id::parse(&init.runner_id).context("invalid runner id")?); - } - } - } - } - Err(sse::Error::StreamEnded) => { - tracing::debug!(?runner_id, "outbound req stopped early"); - - return Ok(()); - } - Err(sse::Error::InvalidStatusCode(code, res)) => { - let body = res - .text() - .await - .unwrap_or_else(|_| "".to_string()); - bail!( - "invalid status code ({code}):\n{}", - util::safe_slice(&body, 0, 512) - ); - } - Err(err) => return Err(err.into()), - } - } - - anyhow::Ok(()) - }; - - let sleep_until_drop = request_lifespan.saturating_sub(DRAIN_GRACE_PERIOD); - tokio::select! { - res = stream_handler => return res.map_err(Into::into), - _ = tokio::time::sleep(sleep_until_drop) => {} - _ = shutdown_rx => {} - } - - draining.store(true, Ordering::SeqCst); - - ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {}) - .send() - .await?; - - if let Some(runner_id) = runner_id { - drain_runner(ctx, runner_id).await?; - } - - // Continue waiting on req while draining - let wait_for_shutdown_fut = async move { - while let Some(event) = source.next().await { - match event { - Ok(sse::Event::Open) => {} - Ok(sse::Event::Message(msg)) => { - tracing::debug!(%msg.data, ?runner_id, "received outbound req message"); - - // If runner_id is none at this point it means we did not send the stopping signal yet, so - // send it now - if runner_id.is_none() { - let data = BASE64.decode(msg.data).context("invalid base64 message")?; - let payload = - protocol::versioned::ToServerlessServer::deserialize_with_embedded_version( - &data, - ) - .context("invalid payload")?; - - match payload { - protocol::ToServerlessServer::ToServerlessServerInit(init) => { - let runner_id_local = - Id::parse(&init.runner_id).context("invalid runner id")?; - runner_id = Some(runner_id_local); - drain_runner(ctx, runner_id_local).await?; - } - } - } - } - Err(sse::Error::StreamEnded) => break, - Err(err) => return Err(err.into()), - } - } - - Result::<()>::Ok(()) - }; - - // Wait for runner to shut down - tokio::select! { - res = wait_for_shutdown_fut => return res.map_err(Into::into), - _ = tokio::time::sleep(DRAIN_GRACE_PERIOD) => { - tracing::debug!(?runner_id, "reached drain grace period before runner shut down") - } - } - - // Close connection - // - // This will force the runner to stop the request in order to avoid hitting the serverless - // timeout threshold - if let Some(runner_id) = runner_id { - publish_to_client_stop(ctx, runner_id).await?; - } - - tracing::debug!(?runner_id, "outbound req stopped"); - - Ok(()) -} - -async fn drain_runner(ctx: &StandaloneCtx, runner_id: Id) -> Result<()> { - let res = ctx - .signal(pegboard::workflows::runner::Stop { - reset_actor_rescheduling: true, - }) - .to_workflow::() - .tag("runner_id", runner_id) - .send() - .await; - - if let Some(WorkflowError::WorkflowNotFound) = res - .as_ref() - .err() - .and_then(|x| x.chain().find_map(|x| x.downcast_ref::())) - { - tracing::warn!( - ?runner_id, - "runner workflow not found, likely already stopped" - ); - } else { - res?; - } - - Ok(()) -} - -/// Send a stop message to the client. -/// -/// This will close the runner's WebSocket. -async fn publish_to_client_stop(ctx: &StandaloneCtx, runner_id: Id) -> Result<()> { - let receiver_subject = - pegboard::pubsub_subjects::RunnerReceiverSubject::new(runner_id).to_string(); - - let message_serialized = rivet_runner_protocol::versioned::ToClient::wrap_latest( - rivet_runner_protocol::ToClient::ToClientClose, - ) - .serialize_with_embedded_version(rivet_runner_protocol::PROTOCOL_VERSION)?; - - ctx.ups()? - .publish(&receiver_subject, &message_serialized, PublishOpts::one()) - .await?; - - Ok(()) -} diff --git a/engine/packages/pegboard/Cargo.toml b/engine/packages/pegboard/Cargo.toml index e9a661c822..de6d837a95 100644 --- a/engine/packages/pegboard/Cargo.toml +++ b/engine/packages/pegboard/Cargo.toml @@ -13,6 +13,8 @@ gas.workspace = true lazy_static.workspace = true namespace.workspace = true nix.workspace = true +reqwest-eventsource.workspace = true +reqwest.workspace = true rivet-api-types.workspace = true rivet-api-util.workspace = true rivet-data.workspace = true @@ -27,5 +29,6 @@ strum.workspace = true tracing.workspace = true universaldb.workspace = true universalpubsub.workspace = true +url.workspace = true utoipa.workspace = true vbare.workspace = true diff --git a/engine/packages/pegboard/src/errors.rs b/engine/packages/pegboard/src/errors.rs index dbd3173e63..b6a64ef6f0 100644 --- a/engine/packages/pegboard/src/errors.rs +++ b/engine/packages/pegboard/src/errors.rs @@ -71,3 +71,20 @@ pub enum Runner { #[error("not_found", "The runner does not exist.")] NotFound, } + +#[derive(RivetError, Debug, Deserialize, Serialize)] +#[error("runner_config")] +pub enum RunnerConfig { + #[error("invalid", "Invalid runner config.", "Invalid runner config: {reason}")] + Invalid { reason: String }, + + #[error("not_found", "No config for this runner exists.")] + NotFound, +} + +#[derive(RivetError, Debug, Deserialize, Serialize)] +#[error("serverless_runner_pool")] +pub enum ServerlessRunnerPool { + #[error("not_found", "No serverless pool for this runner exists.")] + NotFound, +} diff --git a/engine/packages/pegboard/src/keys/mod.rs b/engine/packages/pegboard/src/keys/mod.rs index 253fdcb409..4a6150c068 100644 --- a/engine/packages/pegboard/src/keys/mod.rs +++ b/engine/packages/pegboard/src/keys/mod.rs @@ -4,6 +4,7 @@ pub mod actor; pub mod epoxy; pub mod ns; pub mod runner; +pub mod runner_config; pub fn subspace() -> universaldb::utils::Subspace { rivet_types::keys::pegboard::subspace() diff --git a/engine/packages/namespace/src/keys/runner_config.rs b/engine/packages/pegboard/src/keys/runner_config.rs similarity index 100% rename from engine/packages/namespace/src/keys/runner_config.rs rename to engine/packages/pegboard/src/keys/runner_config.rs diff --git a/engine/packages/pegboard/src/lib.rs b/engine/packages/pegboard/src/lib.rs index a776a3d227..de4121239a 100644 --- a/engine/packages/pegboard/src/lib.rs +++ b/engine/packages/pegboard/src/lib.rs @@ -14,6 +14,9 @@ pub fn registry() -> WorkflowResult { let mut registry = Registry::new(); registry.register_workflow::()?; registry.register_workflow::()?; + registry.register_workflow::()?; + registry.register_workflow::()?; + registry.register_workflow::()?; Ok(registry) } diff --git a/engine/packages/pegboard/src/ops/mod.rs b/engine/packages/pegboard/src/ops/mod.rs index f8878061f9..f8263b10dd 100644 --- a/engine/packages/pegboard/src/ops/mod.rs +++ b/engine/packages/pegboard/src/ops/mod.rs @@ -1,2 +1,3 @@ pub mod actor; pub mod runner; +pub mod runner_config; diff --git a/engine/packages/pegboard/src/ops/runner/find_dc_with_runner.rs b/engine/packages/pegboard/src/ops/runner/find_dc_with_runner.rs index bb8f3a2efc..60a77057ca 100644 --- a/engine/packages/pegboard/src/ops/runner/find_dc_with_runner.rs +++ b/engine/packages/pegboard/src/ops/runner/find_dc_with_runner.rs @@ -73,7 +73,7 @@ async fn find_dc_with_runner_inner(ctx: &OperationCtx, input: &Input) -> Result< // Check if a serverless runner config with a max runners > 0 exists let res = ctx - .op(namespace::ops::runner_config::get::Input { + .op(crate::ops::runner_config::get::Input { runners: vec![(input.namespace_id, input.runner_name.clone())], bypass_cache: false, }) diff --git a/engine/packages/namespace/src/ops/runner_config/delete.rs b/engine/packages/pegboard/src/ops/runner_config/delete.rs similarity index 67% rename from engine/packages/namespace/src/ops/runner_config/delete.rs rename to engine/packages/pegboard/src/ops/runner_config/delete.rs index b5f7e4597d..ae78039ffd 100644 --- a/engine/packages/namespace/src/ops/runner_config/delete.rs +++ b/engine/packages/pegboard/src/ops/runner_config/delete.rs @@ -10,17 +10,17 @@ pub struct Input { } #[operation] -pub async fn namespace_runner_config_delete(ctx: &OperationCtx, input: &Input) -> Result<()> { - let bump_autoscaler = ctx +pub async fn pegboard_runner_config_delete(ctx: &OperationCtx, input: &Input) -> Result<()> { + let delete_workflow = ctx .udb()? .run(|tx| async move { - let tx = tx.with_subspace(keys::subspace()); + let tx = tx.with_subspace(namespace::keys::subspace()); // Read existing config to determine variant let runner_config_key = keys::runner_config::DataKey::new(input.namespace_id, input.name.clone()); - let bump_autoscaler = + let delete_workflow = if let Some(config) = tx.read_opt(&runner_config_key, Serializable).await? { tx.delete(&runner_config_key); @@ -37,14 +37,17 @@ pub async fn namespace_runner_config_delete(ctx: &OperationCtx, input: &Input) - false }; - Ok(bump_autoscaler) + Ok(delete_workflow) }) .custom_instrument(tracing::info_span!("runner_config_delete_tx")) .await?; // Bump autoscaler when a serverless config is modified - if bump_autoscaler { - ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {}) + if delete_workflow { + ctx.signal(crate::workflows::serverless::pool::Bump {}) + .to_workflow::() + .tag("namespace_id", input.namespace_id) + .tag("runner_name", input.name.clone()) .send() .await?; } diff --git a/engine/packages/namespace/src/ops/runner_config/get.rs b/engine/packages/pegboard/src/ops/runner_config/get.rs similarity index 95% rename from engine/packages/namespace/src/ops/runner_config/get.rs rename to engine/packages/pegboard/src/ops/runner_config/get.rs index 75751c8769..469506e74e 100644 --- a/engine/packages/namespace/src/ops/runner_config/get.rs +++ b/engine/packages/pegboard/src/ops/runner_config/get.rs @@ -19,7 +19,7 @@ pub struct RunnerConfig { } #[operation] -pub async fn namespace_runner_config_get( +pub async fn pegboard_runner_config_get( ctx: &OperationCtx, input: &Input, ) -> Result> { @@ -62,7 +62,7 @@ async fn runner_config_get_inner( let tx = tx.clone(); async move { - let tx = tx.with_subspace(keys::subspace()); + let tx = tx.with_subspace(namespace::keys::subspace()); let runner_config_key = keys::runner_config::DataKey::new( namespace_id, diff --git a/engine/packages/namespace/src/ops/runner_config/list.rs b/engine/packages/pegboard/src/ops/runner_config/list.rs similarity index 91% rename from engine/packages/namespace/src/ops/runner_config/list.rs rename to engine/packages/pegboard/src/ops/runner_config/list.rs index c0fcf8de95..7b6f152b1d 100644 --- a/engine/packages/namespace/src/ops/runner_config/list.rs +++ b/engine/packages/pegboard/src/ops/runner_config/list.rs @@ -17,17 +17,17 @@ pub struct Input { // TODO: Needs to return default configs if they exist (currently no way to list from epoxy) #[operation] -pub async fn namespace_runner_config_list( +pub async fn pegboard_runner_config_list( ctx: &OperationCtx, input: &Input, ) -> Result> { let runner_configs = ctx .udb()? .run(|tx| async move { - let tx = tx.with_subspace(keys::subspace()); + let tx = tx.with_subspace(namespace::keys::subspace()); let (start, end) = if let Some(variant) = input.variant { - let (start, end) = keys::subspace() + let (start, end) = namespace::keys::subspace() .subspace(&keys::runner_config::ByVariantKey::subspace_with_variant( input.namespace_id, variant, @@ -46,7 +46,7 @@ pub async fn namespace_runner_config_list( (start, end) } else { - let (start, end) = keys::subspace() + let (start, end) = namespace::keys::subspace() .subspace(&keys::runner_config::DataKey::subspace(input.namespace_id)) .range(); diff --git a/engine/packages/namespace/src/ops/runner_config/mod.rs b/engine/packages/pegboard/src/ops/runner_config/mod.rs similarity index 100% rename from engine/packages/namespace/src/ops/runner_config/mod.rs rename to engine/packages/pegboard/src/ops/runner_config/mod.rs diff --git a/engine/packages/namespace/src/ops/runner_config/upsert.rs b/engine/packages/pegboard/src/ops/runner_config/upsert.rs similarity index 67% rename from engine/packages/namespace/src/ops/runner_config/upsert.rs rename to engine/packages/pegboard/src/ops/runner_config/upsert.rs index 3b745d901a..dc5235cf56 100644 --- a/engine/packages/namespace/src/ops/runner_config/upsert.rs +++ b/engine/packages/pegboard/src/ops/runner_config/upsert.rs @@ -11,18 +11,23 @@ pub struct Input { pub config: RunnerConfig, } +struct UpsertOutput { + endpoint_config_changed: bool, + serverless_runner_created: bool, +} + #[operation] -pub async fn namespace_runner_config_upsert(ctx: &OperationCtx, input: &Input) -> Result { - let endpoint_config_changed = ctx +pub async fn pegboard_runner_config_upsert(ctx: &OperationCtx, input: &Input) -> Result { + let res = ctx .udb()? .run(|tx| async move { - let tx = tx.with_subspace(keys::subspace()); + let tx = tx.with_subspace(namespace::keys::subspace()); let runner_config_key = keys::runner_config::DataKey::new(input.namespace_id, input.name.clone()); // Check if config changed (for serverless, compare URL and headers) - let endpoint_config_changed = if let Some(existing_config) = + let output = if let Some(existing_config) = tx.read_opt(&runner_config_key, Serializable).await? { // Delete previous index @@ -45,15 +50,34 @@ pub async fn namespace_runner_config_upsert(ctx: &OperationCtx, input: &Input) - headers: new_headers, .. }, - ) => old_url != new_url || old_headers != new_headers, + ) => UpsertOutput { + endpoint_config_changed: old_url != new_url || old_headers != new_headers, + serverless_runner_created: false, + }, + (RunnerConfigKind::Normal { .. }, RunnerConfigKind::Serverless { .. }) => { + // Config type changed to serverless + UpsertOutput { + endpoint_config_changed: true, + serverless_runner_created: true, + } + } _ => { - // Config type changed or is not serverless - true + // Not serverless + UpsertOutput { + endpoint_config_changed: true, + serverless_runner_created: false, + } } } } else { // New config - true + UpsertOutput { + endpoint_config_changed: true, + serverless_runner_created: matches!( + input.config.kind, + RunnerConfigKind::Serverless { .. } + ), + } }; // Write new config @@ -131,18 +155,32 @@ pub async fn namespace_runner_config_upsert(ctx: &OperationCtx, input: &Input) - } } - Ok(Ok(endpoint_config_changed)) + Ok(Ok(output)) }) .custom_instrument(tracing::info_span!("runner_config_upsert_tx")) .await? .map_err(|err| err.build())?; // Bump autoscaler - if input.config.affects_autoscaler() { - ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {}) + if res.serverless_runner_created { + ctx.workflow(crate::workflows::serverless::pool::Input { + namespace_id: input.namespace_id, + runner_name: input.name.clone(), + }) + .tag("namespace_id", input.namespace_id) + .tag("runner_name", input.name.clone()) + .unique() + .dispatch() + .await?; + } else if input.config.affects_autoscaler() { + // Maybe scale it + ctx.signal(crate::workflows::serverless::pool::Bump {}) + .to_workflow::() + .tag("namespace_id", input.namespace_id) + .tag("runner_name", input.name.clone()) .send() .await?; } - Ok(endpoint_config_changed) + Ok(res.endpoint_config_changed) } diff --git a/engine/packages/pegboard/src/utils.rs b/engine/packages/pegboard/src/utils.rs index 61bba809ae..d86dbaf6aa 100644 --- a/engine/packages/pegboard/src/utils.rs +++ b/engine/packages/pegboard/src/utils.rs @@ -1,4 +1,8 @@ use rivet_runner_protocol as protocol; +use rivet_types::{ + keys::namespace::runner_config::RunnerConfigVariant, + runner_configs::{RunnerConfig, RunnerConfigKind}, +}; pub fn event_actor_id(event: &protocol::Event) -> &str { match event { @@ -27,3 +31,10 @@ pub fn event_generation(event: &protocol::Event) -> u32 { }) => *generation, } } + +pub fn runner_config_variant(runner_config: &RunnerConfig) -> RunnerConfigVariant { + match runner_config.kind { + RunnerConfigKind::Normal { .. } => RunnerConfigVariant::Normal, + RunnerConfigKind::Serverless { .. } => RunnerConfigVariant::Serverless, + } +} diff --git a/engine/packages/pegboard/src/workflows/actor/destroy.rs b/engine/packages/pegboard/src/workflows/actor/destroy.rs index a04126f4dd..70a1ddc008 100644 --- a/engine/packages/pegboard/src/workflows/actor/destroy.rs +++ b/engine/packages/pegboard/src/workflows/actor/destroy.rs @@ -49,9 +49,31 @@ pub(crate) async fn pegboard_actor_destroy(ctx: &mut WorkflowCtx, input: &Input) // If a slot was allocated at the time of actor destruction then bump the serverless autoscaler so it can scale down // if needed if res.allocated_serverless_slot { - ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {}) - .send() + ctx.removed::>() .await?; + + let bump_res = ctx + .v(2) + .signal(crate::workflows::serverless::pool::Bump {}) + .to_workflow::() + .tag("namespace_id", input.namespace_id) + .tag("runner_name", res.runner_name_selector.clone()) + .send() + .await; + + if let Some(WorkflowError::WorkflowNotFound) = bump_res + .as_ref() + .err() + .and_then(|x| x.chain().find_map(|x| x.downcast_ref::())) + { + tracing::warn!( + namespace_id=%input.namespace_id, + runner_name=%res.runner_name_selector, + "serverless pool workflow not found, respective runner config likely deleted" + ); + } else { + bump_res?; + } } // Clear KV @@ -77,6 +99,7 @@ struct UpdateStateAndDbInput { struct UpdateStateAndDbOutput { runner_workflow_id: Option, allocated_serverless_slot: bool, + runner_name_selector: String, } #[activity(UpdateStateAndDb)] @@ -159,6 +182,7 @@ async fn update_state_and_db( Ok(UpdateStateAndDbOutput { runner_workflow_id, allocated_serverless_slot: old_allocated_serverless_slot, + runner_name_selector: state.runner_name_selector.clone(), }) } diff --git a/engine/packages/pegboard/src/workflows/actor/mod.rs b/engine/packages/pegboard/src/workflows/actor/mod.rs index aa5ded6ceb..f0f53969b9 100644 --- a/engine/packages/pegboard/src/workflows/actor/mod.rs +++ b/engine/packages/pegboard/src/workflows/actor/mod.rs @@ -526,9 +526,31 @@ async fn handle_stopped( if allocate_pending_res.allocations.is_empty() { // Bump autoscaler so it can scale down if needed if deallocate_res.for_serverless { - ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {}) - .send() + ctx.removed::>() .await?; + + let res = ctx + .v(2) + .signal(crate::workflows::serverless::pool::Bump {}) + .to_workflow::() + .tag("namespace_id", input.namespace_id) + .tag("runner_name", input.runner_name_selector.clone()) + .send() + .await; + + if let Some(WorkflowError::WorkflowNotFound) = res + .as_ref() + .err() + .and_then(|x| x.chain().find_map(|x| x.downcast_ref::())) + { + tracing::warn!( + namespace_id=%input.namespace_id, + runner_name=%input.runner_name_selector, + "serverless pool workflow not found, respective runner config likely deleted" + ); + } else { + res?; + } } } else { // Dispatch pending allocs (if any) @@ -722,3 +744,6 @@ join_signal!(Main { Lost, Destroy, }); + +#[message("pegboard_bump_serverless_autoscaler")] +pub(crate) struct BumpServerlessAutoscalerStub {} diff --git a/engine/packages/pegboard/src/workflows/actor/runtime.rs b/engine/packages/pegboard/src/workflows/actor/runtime.rs index 751c39adb4..fb0fefbe34 100644 --- a/engine/packages/pegboard/src/workflows/actor/runtime.rs +++ b/engine/packages/pegboard/src/workflows/actor/runtime.rs @@ -1,3 +1,4 @@ +// runner wf see how signal fail handling use base64::Engine; use base64::prelude::BASE64_STANDARD; use futures_util::StreamExt; @@ -101,7 +102,7 @@ async fn update_runner(ctx: &ActivityCtx, input: &UpdateRunnerInput) -> Result<( } #[derive(Debug, Serialize, Deserialize, Hash)] -struct AllocateActorInput { +struct AllocateActorInputV1 { actor_id: Id, generation: u32, force_allocate: bool, @@ -109,7 +110,7 @@ struct AllocateActorInput { #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] -pub enum AllocateActorOutput { +pub enum AllocateActorOutputV1 { Allocated { runner_id: Id, runner_workflow_id: Id, @@ -120,12 +121,69 @@ pub enum AllocateActorOutput { Sleep, } -// If no availability, returns the timestamp of the actor's queue key #[activity(AllocateActor)] async fn allocate_actor( ctx: &ActivityCtx, - input: &AllocateActorInput, -) -> Result { + input: &AllocateActorInputV1, +) -> Result { + bail!("allocate actor v1 should never be called again") +} + +#[derive(Debug, Serialize, Deserialize, Hash)] +struct AllocateActorInputV2 { + actor_id: Id, + generation: u32, + force_allocate: bool, +} + +#[derive(Debug, Serialize, Deserialize)] +struct AllocateActorOutputV2 { + status: AllocateActorStatus, + serverless: bool, +} + +impl From for AllocateActorOutputV2 { + fn from(value: AllocateActorOutputV1) -> Self { + Self { + serverless: false, + status: match value { + AllocateActorOutputV1::Allocated { + runner_id, + runner_workflow_id, + } => AllocateActorStatus::Allocated { + runner_id, + runner_workflow_id, + }, + AllocateActorOutputV1::Pending { + pending_allocation_ts, + } => AllocateActorStatus::Pending { + pending_allocation_ts, + }, + AllocateActorOutputV1::Sleep => AllocateActorStatus::Sleep, + }, + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +enum AllocateActorStatus { + Allocated { + runner_id: Id, + runner_workflow_id: Id, + }, + Pending { + pending_allocation_ts: i64, + }, + Sleep, +} + +// If no availability, returns the timestamp of the actor's queue key +#[activity(AllocateActorV2)] +async fn allocate_actor_v2( + ctx: &ActivityCtx, + input: &AllocateActorInputV2, +) -> Result { let start_instant = Instant::now(); let mut state = ctx.state::()?; @@ -135,7 +193,7 @@ async fn allocate_actor( // Check if valid serverless config exists for the current ns + runner name let runner_config_res = ctx - .op(namespace::ops::runner_config::get::Input { + .op(crate::ops::runner_config::get::Input { runners: vec![(namespace_id, runner_name_selector.clone())], bypass_cache: false, }) @@ -152,7 +210,7 @@ async fn allocate_actor( // NOTE: This txn should closely resemble the one found in the allocate_pending_actors activity of the // client wf - let (for_serverless, res) = ctx + let res = ctx .udb()? .run(|tx| async move { let ping_threshold_ts = util::timestamp::now() - runner_eligible_threshold; @@ -161,7 +219,7 @@ async fn allocate_actor( let for_serverless = tx .with_subspace(namespace::keys::subspace()) .exists( - &namespace::keys::runner_config::ByVariantKey::new( + &keys::runner_config::ByVariantKey::new( namespace_id, RunnerConfigVariant::Serverless, runner_name_selector.clone(), @@ -304,22 +362,23 @@ async fn allocate_actor( // Set actor as not sleeping tx.delete(&keys::actor::SleepTsKey::new(input.actor_id)); - return Ok(( - for_serverless, - AllocateActorOutput::Allocated { + return Ok(AllocateActorOutputV2 { + serverless: for_serverless, + status: AllocateActorStatus::Allocated { runner_id: old_runner_alloc_key.runner_id, runner_workflow_id: old_runner_alloc_key_data.workflow_id, }, - )); + }); } } // At this point in the txn there is no availability match (crash_policy, input.force_allocate, has_valid_serverless) { - (CrashPolicy::Sleep, false, false) => { - Ok((for_serverless, AllocateActorOutput::Sleep)) - } + (CrashPolicy::Sleep, false, false) => Ok(AllocateActorOutputV2 { + serverless: for_serverless, + status: AllocateActorStatus::Sleep, + }), // Write the actor to the alloc queue to wait _ => { let pending_allocation_ts = util::timestamp::now(); @@ -337,12 +396,12 @@ async fn allocate_actor( input.generation, )?; - Ok(( - for_serverless, - AllocateActorOutput::Pending { + Ok(AllocateActorOutputV2 { + serverless: for_serverless, + status: AllocateActorStatus::Pending { pending_allocation_ts, }, - )) + }) } } }) @@ -354,25 +413,27 @@ async fn allocate_actor( dt, &[KeyValue::new( "did_reserve", - matches!(res, AllocateActorOutput::Allocated { .. }).to_string(), + matches!(res.status, AllocateActorStatus::Allocated { .. }).to_string(), )], ); - state.for_serverless = for_serverless; - state.allocated_serverless_slot = for_serverless; + state.for_serverless = res.serverless; + state.allocated_serverless_slot = res.serverless; - match &res { - AllocateActorOutput::Allocated { + match &res.status { + AllocateActorStatus::Allocated { runner_id, runner_workflow_id, + .. } => { state.sleep_ts = None; state.pending_allocation_ts = None; state.runner_id = Some(*runner_id); state.runner_workflow_id = Some(*runner_workflow_id); } - AllocateActorOutput::Pending { + AllocateActorStatus::Pending { pending_allocation_ts, + .. } => { tracing::warn!( actor_id=?input.actor_id, @@ -381,7 +442,7 @@ async fn allocate_actor( state.pending_allocation_ts = Some(*pending_allocation_ts); } - AllocateActorOutput::Sleep => {} + AllocateActorStatus::Sleep => {} } Ok(res) @@ -481,25 +542,62 @@ pub async fn spawn_actor( generation: u32, force_allocate: bool, ) -> Result { - // Attempt allocation - let allocate_res = ctx - .activity(AllocateActorInput { - actor_id: input.actor_id, - generation, - force_allocate, - }) - .await?; + let allocate_res: AllocateActorOutputV2 = match ctx.check_version(2).await? { + 1 => { + // Attempt allocation + ctx.activity(AllocateActorInputV1 { + actor_id: input.actor_id, + generation, + force_allocate, + }) + .await? + .into() + } + _latest => { + ctx.v(2) + .activity(AllocateActorInputV2 { + actor_id: input.actor_id, + generation, + force_allocate, + }) + .await? + } + }; - match allocate_res { - AllocateActorOutput::Allocated { + match allocate_res.status { + AllocateActorStatus::Allocated { runner_id, runner_workflow_id, } => { // Bump the autoscaler so it can scale up - ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {}) - .send() + ctx.removed::>() .await?; + if allocate_res.serverless { + let res = ctx + .v(2) + .signal(crate::workflows::serverless::pool::Bump {}) + .to_workflow::() + .tag("namespace_id", input.namespace_id) + .tag("runner_name", input.runner_name_selector.clone()) + .send() + .await; + + if let Some(WorkflowError::WorkflowNotFound) = res + .as_ref() + .err() + .and_then(|x| x.chain().find_map(|x| x.downcast_ref::())) + { + tracing::warn!( + namespace_id=%input.namespace_id, + runner_name=%input.runner_name_selector, + "serverless pool workflow not found, respective runner config likely deleted" + ); + } else { + res?; + } + } + ctx.signal(crate::workflows::runner::Command { inner: protocol::Command::CommandStartActor(protocol::CommandStartActor { actor_id: input.actor_id.to_string(), @@ -527,14 +625,38 @@ pub async fn spawn_actor( runner_workflow_id, }) } - AllocateActorOutput::Pending { + AllocateActorStatus::Pending { pending_allocation_ts, } => { - // Bump the autoscaler so it can scale up - ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {}) - .send() + ctx.removed::>() .await?; + // Bump the autoscaler so it can scale up + if allocate_res.serverless { + let res = ctx + .v(2) + .signal(crate::workflows::serverless::pool::Bump {}) + .to_workflow::() + .tag("namespace_id", input.namespace_id) + .tag("runner_name", input.runner_name_selector.clone()) + .send() + .await; + + if let Some(WorkflowError::WorkflowNotFound) = res + .as_ref() + .err() + .and_then(|x| x.chain().find_map(|x| x.downcast_ref::())) + { + tracing::warn!( + namespace_id=%input.namespace_id, + runner_name=%input.runner_name_selector, + "serverless pool workflow not found, respective runner config likely deleted" + ); + } else { + res?; + } + } + // If allocation fails, the allocate txn already inserted this actor into the queue. Now we wait for // an `Allocate` signal match ctx.listen::().await? { @@ -602,7 +724,7 @@ pub async fn spawn_actor( } } } - AllocateActorOutput::Sleep => Ok(SpawnActorOutput::Sleep), + AllocateActorStatus::Sleep => Ok(SpawnActorOutput::Sleep), } } diff --git a/engine/packages/pegboard/src/workflows/mod.rs b/engine/packages/pegboard/src/workflows/mod.rs index f8878061f9..d55cc7acfb 100644 --- a/engine/packages/pegboard/src/workflows/mod.rs +++ b/engine/packages/pegboard/src/workflows/mod.rs @@ -1,2 +1,3 @@ pub mod actor; pub mod runner; +pub mod serverless; diff --git a/engine/packages/pegboard/src/workflows/serverless/connection.rs b/engine/packages/pegboard/src/workflows/serverless/connection.rs new file mode 100644 index 0000000000..f290b2ba2c --- /dev/null +++ b/engine/packages/pegboard/src/workflows/serverless/connection.rs @@ -0,0 +1,400 @@ +use anyhow::Context; +use base64::{Engine, engine::general_purpose::STANDARD as BASE64}; +use futures_util::StreamExt; +use gas::prelude::*; +use reqwest::header::{HeaderName, HeaderValue}; +use reqwest_eventsource as sse; +use rivet_runner_protocol as protocol; +use rivet_types::runner_configs::RunnerConfigKind; +use tokio::time::Duration; +use universalpubsub::PublishOpts; +use vbare::OwnedVersionedData; + +use super::{pool, runner}; +use crate::pubsub_subjects::RunnerReceiverSubject; + +const X_RIVET_ENDPOINT: HeaderName = HeaderName::from_static("x-rivet-endpoint"); +const X_RIVET_TOKEN: HeaderName = HeaderName::from_static("x-rivet-token"); +const X_RIVET_TOTAL_SLOTS: HeaderName = HeaderName::from_static("x-rivet-total-slots"); +const X_RIVET_RUNNER_NAME: HeaderName = HeaderName::from_static("x-rivet-runner-name"); +const X_RIVET_NAMESPACE_NAME: HeaderName = HeaderName::from_static("x-rivet-namespace-name"); + +const DRAIN_GRACE_PERIOD: Duration = Duration::from_secs(5); + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct Input { + pub pool_wf_id: Id, + pub runner_wf_id: Id, + pub namespace_id: Id, + pub runner_name: String, +} + +#[workflow] +pub async fn pegboard_serverless_connection(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> { + // Run the connection activity, which will handle the full lifecycle + let res = ctx + .activity(OutboundReqInput { + pool_wf_id: input.pool_wf_id, + runner_wf_id: input.runner_wf_id, + namespace_id: input.namespace_id, + runner_name: input.runner_name.clone(), + }) + .await?; + + // If we failed to send inline during the activity, durably ensure the + // signal is dispatched here + if res.send_drain_started { + ctx.signal(pool::RunnerDrainStarted { + runner_wf_id: input.runner_wf_id, + }) + .to_workflow_id(input.pool_wf_id) + .send() + .await?; + } + + Ok(()) +} + +#[derive(Debug, Serialize, Deserialize, Hash)] +struct OutboundReqInput { + pool_wf_id: Id, + runner_wf_id: Id, + namespace_id: Id, + runner_name: String, +} + +#[derive(Debug, Serialize, Deserialize)] +struct OutboundReqOutput { + send_drain_started: bool, +} + +#[activity(OutboundReq)] +#[timeout = u64::MAX] +#[max_retries = usize::MAX] +async fn outbound_req(ctx: &ActivityCtx, input: &OutboundReqInput) -> Result { + if is_runner_draining(ctx, input.runner_wf_id).await? { + return Ok(OutboundReqOutput { + send_drain_started: true, + }); + } + + let mut drain_sub = ctx + .subscribe::(("workflow_id", ctx.workflow_id())) + .await?; + + let (runner_config_res, namespace_res) = tokio::try_join!( + ctx.op(crate::ops::runner_config::get::Input { + runners: vec![(input.namespace_id, input.runner_name.clone())], + bypass_cache: false, + }), + ctx.op(namespace::ops::get_global::Input { + namespace_ids: vec![input.namespace_id], + }) + )?; + let Some(runner_config) = runner_config_res.into_iter().next() else { + tracing::debug!("runner config does not exist, ending outbound req"); + return Ok(OutboundReqOutput { + send_drain_started: true, + }); + }; + + let RunnerConfigKind::Serverless { + url, + headers, + slots_per_runner, + request_lifespan, + .. + } = runner_config.config.kind + else { + tracing::debug!("runner config is not serverless, ending outbound req"); + return Ok(OutboundReqOutput { + send_drain_started: true, + }); + }; + + let namespace = namespace_res + .into_iter() + .next() + .context("runner namespace not found")?; + + let current_dc = ctx.config().topology().current_dc()?; + + let token = if let Some(auth) = &ctx.config().auth { + Some(( + X_RIVET_TOKEN, + HeaderValue::try_from(auth.admin_token.read())?, + )) + } else { + None + }; + + let headers = headers + .into_iter() + .flat_map(|(k, v)| { + // NOTE: This will filter out invalid headers without warning + Some(( + k.parse::().ok()?, + v.parse::().ok()?, + )) + }) + .chain([ + ( + X_RIVET_ENDPOINT, + HeaderValue::try_from(current_dc.public_url.to_string())?, + ), + ( + X_RIVET_TOTAL_SLOTS, + HeaderValue::try_from(slots_per_runner)?, + ), + ( + X_RIVET_RUNNER_NAME, + HeaderValue::try_from(input.runner_name.clone())?, + ), + ( + X_RIVET_NAMESPACE_NAME, + HeaderValue::try_from(namespace.name.clone())?, + ), + // Deprecated + ( + HeaderName::from_static("x-rivet-namespace-id"), + HeaderValue::try_from(namespace.name)?, + ), + ]) + .chain(token) + .collect(); + + let endpoint_url = format!("{}/start", url.trim_end_matches('/')); + + tracing::debug!(%endpoint_url, "sending outbound req"); + + let client = rivet_pools::reqwest::client_no_timeout().await?; + let req = client.get(endpoint_url).headers(headers); + + let mut source = sse::EventSource::new(req).context("failed creating event source")?; + let mut runner_id = None; + + let stream_handler = async { + while let Some(event) = source.next().await { + match event { + Ok(sse::Event::Open) => {} + Ok(sse::Event::Message(msg)) => { + tracing::debug!(%msg.data, "received outbound req message"); + + if runner_id.is_none() { + let data = BASE64.decode(msg.data).context("invalid base64 message")?; + let payload = + protocol::versioned::ToServerlessServer::deserialize_with_embedded_version(&data) + .context("invalid payload")?; + + match payload { + protocol::ToServerlessServer::ToServerlessServerInit(init) => { + runner_id = + Some(Id::parse(&init.runner_id).context("invalid runner id")?); + } + } + } + } + Err(sse::Error::StreamEnded) => { + tracing::debug!("outbound req stopped early"); + + return Ok(()); + } + Err(sse::Error::InvalidStatusCode(code, res)) => { + let body = res + .text() + .await + .unwrap_or_else(|_| "".to_string()); + bail!( + "invalid status code ({code}):\n{}", + util::safe_slice(&body, 0, 512) + ); + } + Err(err) => return Err(err.into()), + } + } + + anyhow::Ok(()) + }; + + let sleep_until_drain = + Duration::from_secs(request_lifespan as u64).saturating_sub(DRAIN_GRACE_PERIOD); + tokio::select! { + res = stream_handler => { + return match res { + Err(e) => Err(e.into()), + // TODO: + // For unexpected closes, we don’t know if the runner connected + // or not bc we can’t correlate the runner id. + // + // Lifecycle state falls apart + Ok(_) => Ok(OutboundReqOutput { + send_drain_started: false + }) + }; + }, + _ = tokio::time::sleep(sleep_until_drain) => {} + _ = drain_sub.next() => {} + }; + + tracing::debug!(?runner_id, "connection reached lifespan, needs draining"); + + if let Err(e) = ctx + .signal(pool::RunnerDrainStarted { + runner_wf_id: input.runner_wf_id, + }) + // This is ok, because we only send DrainStarted once + .bypass_signal_from_workflow_I_KNOW_WHAT_IM_DOING() + .to_workflow_id(input.pool_wf_id) + .send() + .await + { + tracing::warn!( + runner_name=%input.runner_name.clone(), + namespace_id=%input.namespace_id, + workflow_id=%ctx.workflow_id(), + "failed to send signal: {}", e + ); + + // If we failed to send, have the workflow send it durably + return Ok(OutboundReqOutput { + send_drain_started: true, + }); + } + + // After we tell the pool we're draining, any remaining failures + // don't matter as the pool already stopped caring about us. + if let Err(err) = finish_non_critical_draining(ctx, source, runner_id).await { + tracing::debug!(?err, "failed non critical draining phase"); + } + + Ok(OutboundReqOutput { + send_drain_started: false, + }) +} + +async fn is_runner_draining(ctx: &ActivityCtx, runner_wf_id: Id) -> Result { + let runner_wf = ctx + .get_workflows(vec![runner_wf_id]) + .await? + .into_iter() + .next() + .context("cannot find own runner wf")?; + let state = runner_wf.parse_state::()?; + + Ok(state.is_draining) +} + +async fn finish_non_critical_draining( + ctx: &ActivityCtx, + mut source: sse::EventSource, + mut runner_id: Option, +) -> Result<()> { + if let Some(runner_id) = runner_id { + drain_runner(ctx, runner_id).await?; + } + + // Continue waiting on req while draining + let wait_for_shutdown_fut = async move { + while let Some(event) = source.next().await { + match event { + Ok(sse::Event::Open) => {} + Ok(sse::Event::Message(msg)) => { + tracing::debug!(%msg.data, ?runner_id, "received outbound req message"); + + // If runner_id is none at this point it means we did not send the stopping signal yet, so + // send it now + if runner_id.is_none() { + let data = BASE64.decode(msg.data).context("invalid base64 message")?; + let payload = + protocol::versioned::ToServerlessServer::deserialize_with_embedded_version( + &data, + ) + .context("invalid payload")?; + + match payload { + protocol::ToServerlessServer::ToServerlessServerInit(init) => { + let runner_id_local = + Id::parse(&init.runner_id).context("invalid runner id")?; + runner_id = Some(runner_id_local); + drain_runner(ctx, runner_id_local).await?; + } + } + } + } + Err(sse::Error::StreamEnded) => break, + Err(err) => return Err(err.into()), + } + } + + Result::<()>::Ok(()) + }; + + // Wait for runner to shut down + tokio::select! { + res = wait_for_shutdown_fut => return res.map_err(Into::into), + _ = tokio::time::sleep(DRAIN_GRACE_PERIOD) => { + tracing::debug!(?runner_id, "reached drain grace period before runner shut down") + } + } + + // Close connection + // + // This will force the runner to stop the request in order to avoid hitting the serverless + // timeout threshold + if let Some(runner_id) = runner_id { + publish_to_client_stop(ctx, runner_id).await?; + } + + tracing::debug!(?runner_id, "outbound req stopped"); + + Ok(()) +} + +async fn drain_runner(ctx: &ActivityCtx, runner_id: Id) -> Result<()> { + let res = ctx + .signal(crate::workflows::runner::Stop { + reset_actor_rescheduling: true, + }) + // This is ok, because runner_id changes every retry of outbound_req + .bypass_signal_from_workflow_I_KNOW_WHAT_IM_DOING() + .to_workflow::() + .tag("runner_id", runner_id) + .send() + .await; + + if let Some(WorkflowError::WorkflowNotFound) = res + .as_ref() + .err() + .and_then(|x| x.chain().find_map(|x| x.downcast_ref::())) + { + tracing::warn!( + ?runner_id, + "runner workflow not found, likely already stopped" + ); + } else { + res?; + } + + Ok(()) +} + +/// Send a stop message to the client. +/// +/// This will close the runner's WebSocket. +async fn publish_to_client_stop(ctx: &ActivityCtx, runner_id: Id) -> Result<()> { + let receiver_subject = RunnerReceiverSubject::new(runner_id).to_string(); + + let message_serialized = rivet_runner_protocol::versioned::ToClient::wrap_latest( + rivet_runner_protocol::ToClient::ToClientClose, + ) + .serialize_with_embedded_version(rivet_runner_protocol::PROTOCOL_VERSION)?; + + ctx.ups()? + .publish(&receiver_subject, &message_serialized, PublishOpts::one()) + .await?; + + Ok(()) +} + +#[message("pegboard_serverless_connection_drain")] +pub struct Drain {} diff --git a/engine/packages/pegboard/src/workflows/serverless/mod.rs b/engine/packages/pegboard/src/workflows/serverless/mod.rs new file mode 100644 index 0000000000..6e37f560c3 --- /dev/null +++ b/engine/packages/pegboard/src/workflows/serverless/mod.rs @@ -0,0 +1,3 @@ +pub mod connection; +pub mod pool; +pub mod runner; diff --git a/engine/packages/pegboard/src/workflows/serverless/pool.rs b/engine/packages/pegboard/src/workflows/serverless/pool.rs new file mode 100644 index 0000000000..1b3cb422ed --- /dev/null +++ b/engine/packages/pegboard/src/workflows/serverless/pool.rs @@ -0,0 +1,195 @@ +use futures_util::FutureExt; +use gas::{db::WorkflowData, prelude::*}; +use rivet_types::{keys, runner_configs::RunnerConfigKind}; + +use super::runner; + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct Input { + pub namespace_id: Id, + pub runner_name: String, +} + +#[derive(Debug, Serialize, Deserialize, Default)] +struct LifecycleState { + runners: Vec, +} + +#[workflow] +pub async fn pegboard_serverless_pool(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> { + ctx.loope(LifecycleState::default(), |ctx, state| { + let input = input.clone(); + async move { + // 1. Remove completed connections + let completed_runners = ctx + .activity(GetCompletedInput { + runners: state.runners.clone(), + }) + .await?; + + state.runners.retain(|r| !completed_runners.contains(r)); + + // 2. Get desired count -> drain and start counts + let ReadDesiredOutput::Desired(desired_count) = ctx + .activity(ReadDesiredInput { + namespace_id: input.namespace_id, + runner_name: input.runner_name.clone(), + }) + .await? + else { + return Ok(Loop::Break(())); + }; + + let drain_count = state.runners.len().saturating_sub(desired_count); + let start_count = desired_count.saturating_sub(state.runners.len()); + + // 3. Drain old runners + if drain_count != 0 { + // TODO: Implement smart logic of draining runners with the lowest allocated actors + let draining_runners = state.runners.iter().take(drain_count).collect::>(); + + for wf_id in draining_runners { + ctx.signal(runner::Drain {}) + .to_workflow_id(*wf_id) + .send() + .await?; + } + } + + // 4. Dispatch new runner workflows + if start_count != 0 { + for _ in 0..start_count { + let wf_id = ctx + .workflow(runner::Input { + pool_wf_id: ctx.workflow_id(), + namespace_id: input.namespace_id, + runner_name: input.runner_name.clone(), + }) + .tag("namespace_id", input.namespace_id) + .tag("runner_name", input.runner_name.clone()) + .dispatch() + .await?; + + state.runners.push(wf_id); + } + } + + // Wait for Bump or runner update signals until we tick again + match ctx.listen::
().await? { + Main::RunnerDrainStarted(sig) => { + state.runners.retain(|wf_id| *wf_id != sig.runner_wf_id); + } + Main::Bump(_) => {} + } + + Ok(Loop::Continue) + } + .boxed() + }) + .await?; + + Ok(()) +} + +#[derive(Debug, Serialize, Deserialize, Hash)] +struct GetCompletedInput { + runners: Vec, +} + +#[activity(GetCompleted)] +async fn get_completed(ctx: &ActivityCtx, input: &GetCompletedInput) -> Result> { + Ok(ctx + .get_workflows(input.runners.clone()) + .await? + .into_iter() + .filter(WorkflowData::has_output) + .map(|wf| wf.workflow_id) + .collect()) +} + +#[derive(Debug, Serialize, Deserialize, Hash)] +struct ReadDesiredInput { + namespace_id: Id, + runner_name: String, +} + +#[derive(Debug, Serialize, Deserialize)] +enum ReadDesiredOutput { + Desired(usize), + Stop, +} + +#[activity(ReadDesired)] +async fn read_desired(ctx: &ActivityCtx, input: &ReadDesiredInput) -> Result { + let runner_config_res = ctx + .op(crate::ops::runner_config::get::Input { + runners: vec![(input.namespace_id, input.runner_name.clone())], + bypass_cache: false, + }) + .await?; + let Some(runner_config) = runner_config_res.into_iter().next() else { + return Ok(ReadDesiredOutput::Stop); + }; + + let RunnerConfigKind::Serverless { + slots_per_runner, + min_runners, + max_runners, + runners_margin, + .. + } = runner_config.config.kind + else { + return Ok(ReadDesiredOutput::Stop); + }; + + let desired_slots = ctx + .udb()? + .run(|tx| async move { + let tx = tx.with_subspace(keys::pegboard::subspace()); + + tx.read( + &keys::pegboard::ns::ServerlessDesiredSlotsKey { + namespace_id: input.namespace_id, + runner_name: input.runner_name.clone(), + }, + universaldb::utils::IsolationLevel::Serializable, + ) + .await + }) + .await?; + + let adjusted_desired_slots = if desired_slots < 0 { + tracing::error!( + namespace_id=%input.namespace_id, + runner_name=%input.runner_name, + ?desired_slots, + "negative desired slots, scaling to 0" + ); + 0 + } else { + desired_slots + }; + + // Won't overflow as these values are all in u32 range + let desired_count = (runners_margin + + (adjusted_desired_slots as u32).div_ceil(slots_per_runner)) + .max(min_runners) + .min(max_runners) + .try_into()?; + + Ok(ReadDesiredOutput::Desired(desired_count)) +} + +#[signal("pegboard_serverless_bump")] +#[derive(Debug)] +pub struct Bump {} + +#[signal("pegboard_serverless_runner_drain_started")] +pub struct RunnerDrainStarted { + pub runner_wf_id: Id, +} + +join_signal!(Main { + Bump, + RunnerDrainStarted, +}); diff --git a/engine/packages/pegboard/src/workflows/serverless/runner.rs b/engine/packages/pegboard/src/workflows/serverless/runner.rs new file mode 100644 index 0000000000..a139df3bf6 --- /dev/null +++ b/engine/packages/pegboard/src/workflows/serverless/runner.rs @@ -0,0 +1,78 @@ +use gas::prelude::*; + +use super::connection; + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct Input { + pub pool_wf_id: Id, + pub namespace_id: Id, + pub runner_name: String, +} + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct State { + pub is_draining: bool, +} + +impl State { + fn new() -> Self { + Self { is_draining: false } + } +} + +#[workflow] +pub async fn pegboard_serverless_runner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> { + ctx.activity(InitStateInput {}).await?; + + let conn_wf_id = ctx + .workflow(connection::Input { + pool_wf_id: input.pool_wf_id, + runner_wf_id: ctx.workflow_id(), + namespace_id: input.namespace_id, + runner_name: input.runner_name.clone(), + }) + .dispatch() + .await?; + + ctx.listen::().await?; + + ctx.activity(MarkAsDrainingInput {}).await?; + + ctx.msg(connection::Drain {}) + .tag("workflow_id", conn_wf_id) + .send() + .await?; + + // Wait for connection wf to complete so this wf's state remains readable + ctx.workflow::(conn_wf_id) + .output() + .await?; + + Ok(()) +} + +#[derive(Debug, Serialize, Deserialize, Hash)] +struct InitStateInput {} + +#[activity(InitState)] +async fn init_state(ctx: &ActivityCtx, input: &InitStateInput) -> Result<()> { + let mut state = ctx.state::>()?; + + *state = Some(State::new()); + + Ok(()) +} + +#[derive(Debug, Serialize, Deserialize, Hash)] +struct MarkAsDrainingInput {} + +#[activity(MarkAsDraining)] +async fn mark_as_draining(ctx: &ActivityCtx, input: &MarkAsDrainingInput) -> Result<()> { + let mut state = ctx.state::()?; + state.is_draining = true; + + Ok(()) +} + +#[signal("pegboard_serverless_runner_drain")] +pub struct Drain {} diff --git a/engine/packages/types/src/lib.rs b/engine/packages/types/src/lib.rs index ed3c57a928..c4de39bea6 100644 --- a/engine/packages/types/src/lib.rs +++ b/engine/packages/types/src/lib.rs @@ -1,7 +1,6 @@ pub mod actors; pub mod datacenters; pub mod keys; -pub mod msgs; pub mod namespaces; pub mod runner_configs; pub mod runners; diff --git a/engine/packages/types/src/msgs/mod.rs b/engine/packages/types/src/msgs/mod.rs deleted file mode 100644 index 38311a6f72..0000000000 --- a/engine/packages/types/src/msgs/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod pegboard; diff --git a/engine/packages/types/src/msgs/pegboard.rs b/engine/packages/types/src/msgs/pegboard.rs deleted file mode 100644 index 0ea2d1154c..0000000000 --- a/engine/packages/types/src/msgs/pegboard.rs +++ /dev/null @@ -1,5 +0,0 @@ -use gas::prelude::*; - -// TODO: Add namespace + runner name to this struct so bumps can be more targeted -#[message("pegboard_bump_serverless_autoscaler")] -pub struct BumpServerlessAutoscaler {} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 55e198e52a..c3425efa3a 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1034,6 +1034,28 @@ importers: specifier: ^3.1.1 version: 3.2.4(@types/debug@4.1.12)(@types/node@22.18.1)(@vitest/ui@3.1.1)(less@4.4.1)(lightningcss@1.30.2)(sass@1.93.2)(stylus@0.62.0)(terser@5.44.0) + examples/hono-serverless: + dependencies: + '@hono/node-server': + specifier: ^1.14.0 + version: 1.19.1(hono@4.9.8) + hono: + specifier: ^4.7.0 + version: 4.9.8 + devDependencies: + '@types/node': + specifier: ^22.13.9 + version: 22.18.1 + rivetkit: + specifier: workspace:* + version: link:../../rivetkit-typescript/packages/rivetkit + tsx: + specifier: ^3.12.7 + version: 3.14.0 + typescript: + specifier: ^5.5.2 + version: 5.9.2 + examples/kitchen-sink: dependencies: '@rivetkit/react': diff --git a/rivetkit-openapi/openapi.json b/rivetkit-openapi/openapi.json index 55f4224dad..611d6443e2 100644 --- a/rivetkit-openapi/openapi.json +++ b/rivetkit-openapi/openapi.json @@ -1,7 +1,7 @@ { "openapi": "3.0.0", "info": { - "version": "2.0.22", + "version": "2.0.23", "title": "RivetKit API" }, "components": { diff --git a/scripts/run/engine-postgres-shell.sh b/scripts/run/docker/engine-postgres-shell.sh similarity index 92% rename from scripts/run/engine-postgres-shell.sh rename to scripts/run/docker/engine-postgres-shell.sh index 20ce44d64f..3900677bcc 100755 --- a/scripts/run/engine-postgres-shell.sh +++ b/scripts/run/docker/engine-postgres-shell.sh @@ -2,7 +2,7 @@ set -euo pipefail SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -REPO_ROOT="$(cd "${SCRIPT_DIR}/../.." && pwd)" +REPO_ROOT="$(cd "${SCRIPT_DIR}/../../.." && pwd)" cd "${REPO_ROOT}" diff --git a/scripts/run/docker/engine-postgres.sh b/scripts/run/docker/engine-postgres.sh index 054cd02841..569e59ca79 100755 --- a/scripts/run/docker/engine-postgres.sh +++ b/scripts/run/docker/engine-postgres.sh @@ -2,7 +2,7 @@ set -euo pipefail SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -REPO_ROOT="$(cd "${SCRIPT_DIR}/../.." && pwd)" +REPO_ROOT="$(cd "${SCRIPT_DIR}/../../.." && pwd)" if ! command -v nc >/dev/null 2>&1; then echo "error: required command 'nc' not found." diff --git a/scripts/run/docker/engine-rocksdb.sh b/scripts/run/docker/engine-rocksdb.sh index b3896cdb82..c81d75568b 100755 --- a/scripts/run/docker/engine-rocksdb.sh +++ b/scripts/run/docker/engine-rocksdb.sh @@ -2,7 +2,7 @@ set -euo pipefail SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -REPO_ROOT="$(cd "${SCRIPT_DIR}/../.." && pwd)" +REPO_ROOT="$(cd "${SCRIPT_DIR}/../../.." && pwd)" cd "${REPO_ROOT}" @@ -13,4 +13,3 @@ RIVET__PEGBOARD__RESCHEDULE_BACKOFF_MAX_EXPONENT="1" \ RIVET__PEGBOARD__RUNNER_ELIGIBLE_THRESHOLD="5000" \ RIVET__PEGBOARD__RUNNER_LOST_THRESHOLD="7000" \ cargo run --bin rivet-engine -- start "$@" | tee /tmp/rivet-engine.log - diff --git a/scripts/run/nuke-rocksdb.sh b/scripts/run/docker/nuke-rocksdb.sh similarity index 66% rename from scripts/run/nuke-rocksdb.sh rename to scripts/run/docker/nuke-rocksdb.sh index 3423a19ab3..9a863c3c82 100755 --- a/scripts/run/nuke-rocksdb.sh +++ b/scripts/run/docker/nuke-rocksdb.sh @@ -2,8 +2,9 @@ set -euo pipefail SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -REPO_ROOT="$(cd "${SCRIPT_DIR}/../.." && pwd)" +REPO_ROOT="$(cd "${SCRIPT_DIR}/../../.." && pwd)" cd "${REPO_ROOT}" rm -rf "~/Library/Application Support/rivet-engine/" +rm -rf ls ~/.local/share/rivet-engine diff --git a/website/public/llms-full.txt b/website/public/llms-full.txt index 26bdf66b13..a3af55421b 100644 --- a/website/public/llms-full.txt +++ b/website/public/llms-full.txt @@ -6341,6 +6341,11 @@ kubectl -n rivet-engine wait --for=condition=ready pod -l app=postgres --timeout ### 3. Deploy Rivet Engine +The Rivet Engine deployment consists of two components: + +- **Main Engine Deployment**: Runs all services except singleton services. Configured with Horizontal Pod Autoscaling (HPA) to automatically scale between 2-10 replicas based on CPU and memory utilization. +- **Singleton Engine Deployment**: Runs singleton services that must have exactly 1 replica (e.g., schedulers, coordinators). + Save as `rivet-engine.yaml`: ```yaml @@ -6382,7 +6387,7 @@ metadata: name: rivet-engine namespace: rivet-engine spec: - replicas: 1 + replicas: 2 selector: matchLabels: app: rivet-engine @@ -6396,6 +6401,8 @@ spec: image: rivetkit/engine:latest args: - start + - --except-services + - singleton env: - name: RIVET_CONFIG_PATH value: /etc/rivet/config.jsonc @@ -6410,16 +6417,107 @@ spec: readOnly: true resources: requests: - cpu: 500m - memory: 1Gi + cpu: 2000m + memory: 4Gi limits: + cpu: 4000m + memory: 8Gi + startupProbe: + httpGet: + path: /health + port: 6421 + initialDelaySeconds: 30 + periodSeconds: 10 + failureThreshold: 30 + readinessProbe: + httpGet: + path: /health + port: 6421 + periodSeconds: 5 + failureThreshold: 2 + livenessProbe: + httpGet: + path: /health + port: 6421 + periodSeconds: 10 + failureThreshold: 3 + volumes: + - name: config + configMap: + name: engine-config +--- +apiVersion: autoscaling/v2 +kind: HorizontalPodAutoscaler +metadata: + name: rivet-engine + namespace: rivet-engine +spec: + scaleTargetRef: + apiVersion: apps/v1 + kind: Deployment + name: rivet-engine + minReplicas: 2 + maxReplicas: 10 + metrics: + - type: Resource + resource: + name: cpu + target: + type: Utilization + averageUtilization: 60 + - type: Resource + resource: + name: memory + target: + type: Utilization + averageUtilization: 80 +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: rivet-engine-singleton + namespace: rivet-engine +spec: + replicas: 1 + selector: + matchLabels: + app: rivet-engine-singleton + template: + metadata: + labels: + app: rivet-engine-singleton + spec: + containers: + - name: rivet-engine + image: rivetkit/engine:latest + args: + - start + - --services + - singleton + - --services + - api-peer + env: + - name: RIVET_CONFIG_PATH + value: /etc/rivet/config.jsonc + ports: + - containerPort: 6421 + name: api-peer + volumeMounts: + - name: config + mountPath: /etc/rivet + readOnly: true + resources: + requests: cpu: 2000m memory: 4Gi + limits: + cpu: 4000m + memory: 8Gi startupProbe: httpGet: path: /health port: 6421 - initialDelaySeconds: 10 + initialDelaySeconds: 30 periodSeconds: 10 failureThreshold: 30 readinessProbe: @@ -6427,11 +6525,13 @@ spec: path: /health port: 6421 periodSeconds: 5 + failureThreshold: 2 livenessProbe: httpGet: path: /health port: 6421 periodSeconds: 10 + failureThreshold: 3 volumes: - name: config configMap: @@ -6443,9 +6543,21 @@ Apply and wait for the engine to be ready: ```bash kubectl apply -f rivet-engine.yaml kubectl -n rivet-engine wait --for=condition=ready pod -l app=rivet-engine --timeout=300s +kubectl -n rivet-engine wait --for=condition=ready pod -l app=rivet-engine-singleton --timeout=300s ``` -### 4. Access the Engine +**Note**: The HPA requires a metrics server to be running in your cluster. Most Kubernetes distributions (including k3d, GKE, EKS, AKS) include this by default. + +### 4. Verify Deployment + +Check that all pods are running (you should see 2+ engine pods and 1 singleton pod): + +```bash +kubectl -n rivet-engine get pods +kubectl -n rivet-engine get hpa +``` + +### 5. Access the Engine Get the service URL: @@ -6528,14 +6640,44 @@ k3d cluster delete rivet ### Scaling -For horizontal scaling, update the deployment: +The engine is configured with Horizontal Pod Autoscaling (HPA) by default, automatically scaling between 2-10 replicas based on CPU (60%) and memory (80%) utilization. + +To adjust the scaling parameters, modify the HPA configuration: ```yaml +apiVersion: autoscaling/v2 +kind: HorizontalPodAutoscaler +metadata: + name: rivet-engine + namespace: rivet-engine spec: - replicas: 3 # Multiple replicas -``` + scaleTargetRef: + apiVersion: apps/v1 + kind: Deployment + name: rivet-engine + minReplicas: 2 # Adjust minimum replicas + maxReplicas: 20 # Adjust maximum replicas + metrics: + - type: Resource + resource: + name: cpu + target: + type: Utilization + averageUtilization: 70 # Adjust CPU threshold + - type: Resource + resource: + name: memory + target: + type: Utilization + averageUtilization: 80 # Adjust memory threshold +``` + +Monitor HPA status: -See our [HPA set up on github](https://github.com/rivet-gg/rivet/tree/main/k8s/engines/05-rivet-engine-hpa.yaml) for info on configuring automatic horizontal scaling. +```bash +kubectl -n rivet-engine get hpa +kubectl -n rivet-engine describe hpa rivet-engine +``` ## Next Steps diff --git a/website/public/llms.txt b/website/public/llms.txt index b2d6fa1cf4..6ab0fea412 100644 --- a/website/public/llms.txt +++ b/website/public/llms.txt @@ -24,8 +24,10 @@ https://rivet.dev/blog/2025-10-01-railway-selfhost https://rivet.dev/blog/2025-10-05-weekly-updates https://rivet.dev/blog/2025-10-09-rivet-cloud-launch https://rivet.dev/blog/2025-10-17-rivet-actors-vercel +https://rivet.dev/blog/2025-10-19-weekly-updates https://rivet.dev/blog/2025-10-20-how-we-built-websocket-servers-for-vercel-functions https://rivet.dev/blog/2025-10-20-weekly-updates +https://rivet.dev/blog/2025-10-24-weekly-updates https://rivet.dev/blog/godot-multiplayer-compared-to-unity https://rivet.dev/changelog https://rivet.dev/changelog.json @@ -50,8 +52,10 @@ https://rivet.dev/changelog/2025-10-01-railway-selfhost https://rivet.dev/changelog/2025-10-05-weekly-updates https://rivet.dev/changelog/2025-10-09-rivet-cloud-launch https://rivet.dev/changelog/2025-10-17-rivet-actors-vercel +https://rivet.dev/changelog/2025-10-19-weekly-updates https://rivet.dev/changelog/2025-10-20-how-we-built-websocket-servers-for-vercel-functions https://rivet.dev/changelog/2025-10-20-weekly-updates +https://rivet.dev/changelog/2025-10-24-weekly-updates https://rivet.dev/changelog/godot-multiplayer-compared-to-unity https://rivet.dev/cloud https://rivet.dev/docs/actors