diff --git a/CLAUDE.md b/CLAUDE.md index ad057c6673..27f523d2a7 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -125,7 +125,7 @@ Key points: - Add `Serialize, Deserialize` derives for errors with metadata fields - Always return anyhow errors from failable functions - For example: `fn foo() -> Result { /* ... */ }` -- Import anyhow using `use anyhow::*` instead of importing individual types +- Do not glob import (`::*`) from anyhow. Instead, import individual types and traits **Dependency Management** - When adding a dependency, check for a workspace dependency in Cargo.toml diff --git a/engine/artifacts/errors/guard.websocket_service_hibernate.json b/engine/artifacts/errors/guard.websocket_service_hibernate.json new file mode 100644 index 0000000000..a9647dfd97 --- /dev/null +++ b/engine/artifacts/errors/guard.websocket_service_hibernate.json @@ -0,0 +1,5 @@ +{ + "code": "websocket_service_hibernate", + "group": "guard", + "message": "Initiate WebSocket service hibernation." +} \ No newline at end of file diff --git a/engine/artifacts/errors/guard.websocket_service_retry.json b/engine/artifacts/errors/guard.websocket_service_retry.json deleted file mode 100644 index e73bbbc507..0000000000 --- a/engine/artifacts/errors/guard.websocket_service_retry.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "code": "websocket_service_retry", - "group": "guard", - "message": "WebSocket service retry." -} \ No newline at end of file diff --git a/engine/packages/guard-core/src/custom_serve.rs b/engine/packages/guard-core/src/custom_serve.rs index 351747e96d..7a7f4a2e2c 100644 --- a/engine/packages/guard-core/src/custom_serve.rs +++ b/engine/packages/guard-core/src/custom_serve.rs @@ -1,4 +1,4 @@ -use anyhow::*; +use anyhow::{Result, bail}; use async_trait::async_trait; use bytes::Bytes; use http_body_util::Full; @@ -10,6 +10,11 @@ use crate::WebSocketHandle; use crate::proxy_service::ResponseBody; use crate::request_context::RequestContext; +pub enum HibernationResult { + Continue, + Close, +} + /// Trait for custom request serving logic that can handle both HTTP and WebSocket requests #[async_trait] pub trait CustomServeTrait: Send + Sync { @@ -23,11 +28,21 @@ pub trait CustomServeTrait: Send + Sync { /// Handle a WebSocket connection after upgrade. Supports connection retries. async fn handle_websocket( &self, - websocket: WebSocketHandle, - headers: &hyper::HeaderMap, - path: &str, - request_context: &mut RequestContext, + _websocket: WebSocketHandle, + _headers: &hyper::HeaderMap, + _path: &str, + _request_context: &mut RequestContext, // Identifies the websocket across retries. - unique_request_id: Uuid, - ) -> Result>; + _unique_request_id: Uuid, + ) -> Result> { + bail!("service does not support websockets"); + } + + /// Returns true if the websocket should close. + async fn handle_websocket_hibernation( + &self, + _websocket: WebSocketHandle, + ) -> Result { + bail!("service does not support websocket hibernation"); + } } diff --git a/engine/packages/guard-core/src/errors.rs b/engine/packages/guard-core/src/errors.rs index ac88f0a439..ebfa809da5 100644 --- a/engine/packages/guard-core/src/errors.rs +++ b/engine/packages/guard-core/src/errors.rs @@ -93,8 +93,12 @@ pub struct ServiceUnavailable; pub struct WebSocketServiceUnavailable; #[derive(RivetError, Serialize, Deserialize)] -#[error("guard", "websocket_service_retry", "WebSocket service retry.")] -pub struct WebSocketServiceRetry; +#[error( + "guard", + "websocket_service_hibernate", + "Initiate WebSocket service hibernation." +)] +pub struct WebSocketServiceHibernate; #[derive(RivetError, Serialize, Deserialize)] #[error("guard", "websocket_service_timeout", "WebSocket service timed out.")] diff --git a/engine/packages/guard-core/src/proxy_service.rs b/engine/packages/guard-core/src/proxy_service.rs index eac786680c..6a03749033 100644 --- a/engine/packages/guard-core/src/proxy_service.rs +++ b/engine/packages/guard-core/src/proxy_service.rs @@ -1,4 +1,4 @@ -use anyhow::{Context, Result, bail}; +use anyhow::{Context, Result, bail, ensure}; use bytes::Bytes; use futures_util::{SinkExt, StreamExt}; use http_body_util::{BodyExt, Full}; @@ -31,7 +31,9 @@ use url::Url; use uuid::Uuid; use crate::{ - WebSocketHandle, custom_serve::CustomServeTrait, errors, metrics, + WebSocketHandle, + custom_serve::{CustomServeTrait, HibernationResult}, + errors, metrics, request_context::RequestContext, }; @@ -1828,7 +1830,7 @@ impl ProxyService { ); } ResolveRouteOutput::Response(_) => unreachable!(), - ResolveRouteOutput::CustomServe(mut handlers) => { + ResolveRouteOutput::CustomServe(mut handler) => { tracing::debug!(%req_path, "Spawning task to handle WebSocket communication"); let mut request_context = request_context.clone(); let req_headers = req_headers.clone(); @@ -1838,6 +1840,7 @@ impl ProxyService { tokio::spawn( async move { let request_id = Uuid::new_v4(); + let mut ws_hibernation_close = false; let mut attempts = 0u32; let ws_handle = WebSocketHandle::new(client_ws) @@ -1845,7 +1848,7 @@ impl ProxyService { .context("failed initiating websocket handle")?; loop { - match handlers + match handler .handle_websocket( ws_handle.clone(), &req_headers, @@ -1895,18 +1898,43 @@ impl ProxyService { Err(err) => { tracing::debug!(?err, "websocket handler error"); - // Denotes that the connection did not fail, but needs to be retried to - // resole a new target - let ws_retry = is_ws_retry(&err); + // Denotes that the connection did not fail, but the downstream has closed + let ws_hibernate = is_ws_hibernate(&err); - if ws_retry { + if ws_hibernate { attempts = 0; } else { attempts += 1; } - if attempts > max_attempts - || (!is_retryable_ws_error(&err) && !ws_retry) + if ws_hibernate { + // This should be unreachable because as soon as the actor is + // reconnected to after hibernation the gateway will consume the close + // frame from the client ws stream + ensure!( + !ws_hibernation_close, + "should not be hibernating again after receiving a close frame during hibernation" + ); + + // After this function returns: + // - the route will be resolved again + // - the websocket will connect to the new downstream target + // - the gateway will continue reading messages from the client ws + // (starting with the message that caused the hibernation to end) + let res = handler + .handle_websocket_hibernation(ws_handle.clone()) + .await?; + + // Despite receiving a close frame from the client during hibernation + // we are going to reconnect to the actor so that it knows the + // connection has closed + if let HibernationResult::Close = res { + tracing::debug!("starting hibernating websocket close"); + + ws_hibernation_close = true; + } + } else if attempts > max_attempts + || !is_retryable_ws_error(&err) { tracing::debug!( ?attempts, @@ -1929,79 +1957,79 @@ impl ProxyService { break; } else { - if !ws_retry { - let backoff = ProxyService::calculate_backoff( - attempts, - initial_interval, - ); + let backoff = ProxyService::calculate_backoff( + attempts, + initial_interval, + ); - tracing::debug!( - ?backoff, - "WebSocket attempt {attempts} failed (service unavailable)" - ); + tracing::debug!( + ?backoff, + "WebSocket attempt {attempts} failed (service unavailable)" + ); - tokio::time::sleep(backoff).await; - } + // Apply backoff for retryable error + tokio::time::sleep(backoff).await; + } - match state - .resolve_route( - &req_host, - &req_path, - &req_method, - state.port_type.clone(), - &req_headers, - true, - ) - .await - { - Ok(ResolveRouteOutput::CustomServe(new_handlers)) => { - handlers = new_handlers; - continue; - } - Ok(ResolveRouteOutput::Response(response)) => { - ws_handle - .send(to_hyper_close(Some(str_to_close_frame( - response.message.as_ref(), - )))) - .await?; - - // Flush to ensure close frame is sent - ws_handle.flush().await?; - - // Keep TCP connection open briefly to allow client to process close - tokio::time::sleep(WEBSOCKET_CLOSE_LINGER).await; - } - Ok(ResolveRouteOutput::Target(_)) => { - ws_handle - .send(to_hyper_close(Some(err_to_close_frame( - errors::WebSocketTargetChanged.build(), - ray_id, - )))) - .await?; + // Retry route resolution + match state + .resolve_route( + &req_host, + &req_path, + &req_method, + state.port_type.clone(), + &req_headers, + true, + ) + .await + { + Ok(ResolveRouteOutput::CustomServe(new_handler)) => { + handler = new_handler; + continue; + } + Ok(ResolveRouteOutput::Response(response)) => { + ws_handle + .send(to_hyper_close(Some(str_to_close_frame( + response.message.as_ref(), + )))) + .await?; + + // Flush to ensure close frame is sent + ws_handle.flush().await?; + + // Keep TCP connection open briefly to allow client to process close + tokio::time::sleep(WEBSOCKET_CLOSE_LINGER).await; + } + Ok(ResolveRouteOutput::Target(_)) => { + ws_handle + .send(to_hyper_close(Some(err_to_close_frame( + errors::WebSocketTargetChanged.build(), + ray_id, + )))) + .await?; - // Flush to ensure close frame is sent - ws_handle.flush().await?; + // Flush to ensure close frame is sent + ws_handle.flush().await?; - // Keep TCP connection open briefly to allow client to process close - tokio::time::sleep(WEBSOCKET_CLOSE_LINGER).await; + // Keep TCP connection open briefly to allow client to process close + tokio::time::sleep(WEBSOCKET_CLOSE_LINGER).await; - break; - } - Err(err) => { - ws_handle - .send(to_hyper_close(Some(err_to_close_frame( - err, ray_id, - )))) - .await?; + break; + } + Err(err) => { + ws_handle + .send(to_hyper_close(Some(err_to_close_frame( + err, ray_id, + )))) + .await?; - // Flush to ensure close frame is sent - ws_handle.flush().await?; + // Flush to ensure close frame is sent + ws_handle.flush().await?; - // Keep TCP connection open briefly to allow client to process close - tokio::time::sleep(WEBSOCKET_CLOSE_LINGER).await; + // Keep TCP connection open briefly to allow client to process close + tokio::time::sleep(WEBSOCKET_CLOSE_LINGER).await; - break; - } + break; } } } @@ -2509,9 +2537,9 @@ fn is_retryable_ws_error(err: &anyhow::Error) -> bool { } } -fn is_ws_retry(err: &anyhow::Error) -> bool { +fn is_ws_hibernate(err: &anyhow::Error) -> bool { if let Some(rivet_err) = err.chain().find_map(|x| x.downcast_ref::()) { - rivet_err.group() == "guard" && rivet_err.code() == "websocket_service_retry" + rivet_err.group() == "guard" && rivet_err.code() == "websocket_service_hibernate" } else { false } diff --git a/engine/packages/guard-core/src/websocket_handle.rs b/engine/packages/guard-core/src/websocket_handle.rs index 763f337b20..2a3c50a4b3 100644 --- a/engine/packages/guard-core/src/websocket_handle.rs +++ b/engine/packages/guard-core/src/websocket_handle.rs @@ -1,5 +1,5 @@ use anyhow::*; -use futures_util::{SinkExt, StreamExt}; +use futures_util::{SinkExt, StreamExt, stream::Peekable}; use hyper::upgrade::Upgraded; use hyper_tungstenite::HyperWebsocket; use hyper_tungstenite::tungstenite::Message as WsMessage; @@ -8,7 +8,8 @@ use std::sync::Arc; use tokio::sync::Mutex; use tokio_tungstenite::WebSocketStream; -pub type WebSocketReceiver = futures_util::stream::SplitStream>>; +pub type WebSocketReceiver = + Peekable>>>; pub type WebSocketSender = futures_util::stream::SplitSink>, WsMessage>; @@ -26,7 +27,7 @@ impl WebSocketHandle { Ok(Self { ws_tx: Arc::new(Mutex::new(ws_tx)), - ws_rx: Arc::new(Mutex::new(ws_rx)), + ws_rx: Arc::new(Mutex::new(ws_rx.peekable())), }) } diff --git a/engine/packages/guard-core/tests/custom_serve.rs b/engine/packages/guard-core/tests/custom_serve.rs index a6ce98a49c..0d8196373d 100644 --- a/engine/packages/guard-core/tests/custom_serve.rs +++ b/engine/packages/guard-core/tests/custom_serve.rs @@ -12,17 +12,21 @@ use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; use common::{create_test_config, init_tracing, start_guard}; use rivet_guard_core::WebSocketHandle; -use rivet_guard_core::custom_serve::CustomServeTrait; +use rivet_guard_core::custom_serve::{CustomServeTrait, HibernationResult}; +use rivet_guard_core::errors::WebSocketServiceHibernate; use rivet_guard_core::proxy_service::{ResponseBody, RoutingFn, RoutingOutput}; use rivet_guard_core::request_context::RequestContext; use tokio_tungstenite::tungstenite::protocol::frame::CloseFrame; use uuid::Uuid; +const HIBERNATION_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(2); + // Track what was called for testing #[derive(Clone, Debug, Default)] struct CallTracker { http_calls: Arc>>, websocket_calls: Arc>>, + websocket_hibernation_calls: Arc>>, } // Test implementation of CustomServeTrait @@ -83,6 +87,11 @@ impl CustomServeTrait for TestCustomServe { match msg_result { std::result::Result::Ok(msg) if msg.is_text() => { let text = msg.to_text().unwrap_or(""); + + if text == "hibernate" { + return Err(WebSocketServiceHibernate.build()); + } + let response = format!("Custom: {}", text); if let std::result::Result::Err(e) = websocket.send(response.into()).await { eprintln!("Failed to send WebSocket message: {}", e); @@ -102,6 +111,22 @@ impl CustomServeTrait for TestCustomServe { Ok(None) } + + async fn handle_websocket_hibernation( + &self, + _websocket: WebSocketHandle, + ) -> Result { + // Track this WebSocket call + self.tracker + .websocket_hibernation_calls + .lock() + .unwrap() + .push("hibernation".to_string()); + + tokio::time::sleep(HIBERNATION_TIMEOUT).await; + + Ok(HibernationResult::Continue) + } } // Create routing function that returns CustomServe @@ -232,6 +257,68 @@ async fn test_custom_serve_websocket() { assert_eq!(http_calls.len(), 0); } +#[tokio::test] +async fn test_custom_serve_websocket_hibernation() { + init_tracing(); + + // Create tracker to verify calls + let tracker = CallTracker::default(); + + // Create routing function that returns CustomServe + let routing_fn = create_custom_serve_routing_fn(tracker.clone()); + + // Start guard with custom routing + let config = create_test_config(|_| {}); + let (guard_addr, _shutdown) = start_guard(config, routing_fn).await; + + // Connect to WebSocket through guard + let ws_url = format!("ws://{}/ws/custom", guard_addr); + let (mut ws_stream, response) = connect_async(&ws_url) + .await + .expect("Failed to connect to WebSocket"); + + // Verify upgrade was successful + assert_eq!(response.status(), StatusCode::SWITCHING_PROTOCOLS); + + // Send hibernation + ws_stream + .send(Message::Text("hibernate".to_string().into())) + .await + .expect("Failed to send WebSocket message"); + + // Send a test message + let test_message = "Hello Custom Hibernating WebSocket"; + ws_stream + .send(Message::Text(test_message.to_string().into())) + .await + .expect("Failed to send WebSocket message"); + + // Give some time for async operations to complete + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + // Verify the WebSocket handler hibernated + let ws_hibernation_calls = tracker.websocket_hibernation_calls.lock().unwrap(); + assert_eq!(ws_hibernation_calls.len(), 1); + assert_eq!(ws_hibernation_calls[0], "hibernation"); + + // Receive the echoed message with custom prefix + let response = tokio::time::timeout(HIBERNATION_TIMEOUT * 2, ws_stream.next()) + .await + .expect("timed out waiting for message from hibernating WebSocket"); + match response { + Some(Result::Ok(Message::Text(text))) => { + assert_eq!(text, format!("Custom: {}", test_message)); + } + other => panic!("Expected text message, got: {:?}", other), + } + + // Close the connection + ws_stream + .close(None) + .await + .expect("Failed to close WebSocket"); +} + #[tokio::test] async fn test_custom_serve_multiple_requests() { init_tracing(); diff --git a/engine/packages/guard/src/routing/api_public.rs b/engine/packages/guard/src/routing/api_public.rs index 143db070da..f6fee1840c 100644 --- a/engine/packages/guard/src/routing/api_public.rs +++ b/engine/packages/guard/src/routing/api_public.rs @@ -1,15 +1,13 @@ use std::sync::Arc; -use anyhow::*; +use anyhow::{Context, Result}; use async_trait::async_trait; use bytes::Bytes; use gas::prelude::*; use http_body_util::{BodyExt, Full}; use hyper::{Request, Response}; -use rivet_guard_core::WebSocketHandle; use rivet_guard_core::proxy_service::{ResponseBody, RoutingOutput}; use rivet_guard_core::{CustomServeTrait, request_context::RequestContext}; -use tokio_tungstenite::tungstenite::protocol::frame::CloseFrame; use tower::Service; struct ApiPublicService { @@ -30,31 +28,20 @@ impl CustomServeTrait for ApiPublicService { let response = service .call(req) .await - .map_err(|e| anyhow::anyhow!("Failed to call api-public service: {}", e))?; + .context("failed to call api-public service")?; // Collect the body and convert to ResponseBody let (parts, body) = response.into_parts(); let collected = body .collect() .await - .map_err(|e| anyhow::anyhow!("Failed to collect response body: {}", e))?; + .context("failed to collect response body")?; let bytes = collected.to_bytes(); let response_body = ResponseBody::Full(Full::new(bytes)); let response = Response::from_parts(parts, response_body); Ok(response) } - - async fn handle_websocket( - &self, - _client_ws: WebSocketHandle, - _headers: &hyper::HeaderMap, - _path: &str, - _request_context: &mut RequestContext, - _unique_request_id: Uuid, - ) -> Result> { - bail!("api-public does not support WebSocket connections") - } } /// Route requests to the api-public service diff --git a/engine/packages/guard/src/routing/pegboard_gateway.rs b/engine/packages/guard/src/routing/pegboard_gateway.rs index f8c961f34c..4d460fa0c1 100644 --- a/engine/packages/guard/src/routing/pegboard_gateway.rs +++ b/engine/packages/guard/src/routing/pegboard_gateway.rs @@ -235,6 +235,7 @@ async fn route_request_inner( // Return pegboard-gateway instance with path let gateway = pegboard_gateway::PegboardGateway::new( + ctx.clone(), shared_state.pegboard_gateway.clone(), runner_id, actor_id, diff --git a/engine/packages/pegboard-gateway/src/lib.rs b/engine/packages/pegboard-gateway/src/lib.rs index 7ea4685fb1..81b7b9dd09 100644 --- a/engine/packages/pegboard-gateway/src/lib.rs +++ b/engine/packages/pegboard-gateway/src/lib.rs @@ -8,18 +8,19 @@ use hyper::{Request, Response, StatusCode}; use rivet_error::*; use rivet_guard_core::{ WebSocketHandle, - custom_serve::CustomServeTrait, + custom_serve::{CustomServeTrait, HibernationResult}, errors::{ - ServiceUnavailable, WebSocketServiceRetry, WebSocketServiceTimeout, + ServiceUnavailable, WebSocketServiceHibernate, WebSocketServiceTimeout, WebSocketServiceUnavailable, }, proxy_service::ResponseBody, request_context::RequestContext, + websocket_handle::WebSocketReceiver, }; use rivet_runner_protocol as protocol; use rivet_util::serde::HashableMap; -use std::time::Duration; -use tokio::sync::watch; +use std::{sync::Arc, time::Duration}; +use tokio::sync::{Mutex, watch}; use tokio_tungstenite::tungstenite::{ Message, protocol::frame::{CloseFrame, coding::CloseCode}, @@ -47,6 +48,7 @@ enum LifecycleResult { } pub struct PegboardGateway { + ctx: StandaloneCtx, shared_state: SharedState, runner_id: Id, actor_id: Id, @@ -55,8 +57,15 @@ pub struct PegboardGateway { impl PegboardGateway { #[tracing::instrument(skip_all, fields(?actor_id, ?runner_id, ?path))] - pub fn new(shared_state: SharedState, runner_id: Id, actor_id: Id, path: String) -> Self { + pub fn new( + ctx: StandaloneCtx, + shared_state: SharedState, + runner_id: Id, + actor_id: Id, + path: String, + ) -> Self { Self { + ctx, shared_state, runner_id, actor_id, @@ -372,7 +381,7 @@ impl CustomServeTrait for PegboardGateway { if open_msg.can_hibernate && close.retry { // Successful closure - return Err(WebSocketServiceRetry.build()); + return Err(WebSocketServiceHibernate.build()); } else { return Ok(LifecycleResult::ServerClose(close)); } @@ -385,7 +394,7 @@ impl CustomServeTrait for PegboardGateway { } } else { tracing::debug!("tunnel sub closed"); - return Err(WebSocketServiceRetry.build()); + return Err(WebSocketServiceHibernate.build()); } } _ = tunnel_to_ws_abort_rx.changed() => { @@ -541,4 +550,63 @@ impl CustomServeTrait for PegboardGateway { Err(err) => Err(err), } } + + #[tracing::instrument(skip_all, fields(actor_id=?self.actor_id))] + async fn handle_websocket_hibernation( + &self, + client_ws: WebSocketHandle, + ) -> Result { + let mut ready_sub = self + .ctx + .subscribe::(("actor_id", self.actor_id)) + .await?; + + let close = tokio::select! { + _ = ready_sub.next() => { + tracing::debug!("actor became ready during hibernation"); + + HibernationResult::Continue + } + hibernation_res = hibernate_ws(client_ws.recv()) => { + let res = hibernation_res?; + + match &res { + HibernationResult::Continue => { + tracing::debug!("received message during hibernation"); + } + HibernationResult::Close => { + tracing::debug!("websocket stream closed during hibernation"); + } + } + + res + } + }; + + Ok(close) + } +} + +async fn hibernate_ws(ws_rx: Arc>) -> Result { + let mut guard = ws_rx.lock().await; + let mut pinned = std::pin::Pin::new(&mut *guard); + + loop { + if let Some(msg) = pinned.as_mut().peek().await { + match msg { + Ok(Message::Binary(_)) | Ok(Message::Text(_)) => { + return Ok(HibernationResult::Continue); + } + // We don't care about the close frame because we're currently hibernating; there is no + // downstream to send the close frame to. + Ok(Message::Close(_)) => return Ok(HibernationResult::Close), + // Ignore rest + _ => { + pinned.try_next().await?; + } + } + } else { + return Ok(HibernationResult::Close); + } + } } diff --git a/rivetkit-openapi/openapi.json b/rivetkit-openapi/openapi.json index 55f4224dad..611d6443e2 100644 --- a/rivetkit-openapi/openapi.json +++ b/rivetkit-openapi/openapi.json @@ -1,7 +1,7 @@ { "openapi": "3.0.0", "info": { - "version": "2.0.22", + "version": "2.0.23", "title": "RivetKit API" }, "components": { diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/config.ts b/rivetkit-typescript/packages/rivetkit/src/actor/config.ts index 38a40b9262..c308dcfd2c 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/config.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/config.ts @@ -71,7 +71,7 @@ export const ActorConfigSchema = z noSleep: z.boolean().default(false), sleepTimeout: z.number().positive().default(30_000), /** @experimental */ - canHibernatWebSocket: z + canHibernateWebSocket: z .union([ z.boolean(), z diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/instance.ts b/rivetkit-typescript/packages/rivetkit/src/actor/instance.ts index e84786de7f..845cc9dffb 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/instance.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/instance.ts @@ -2069,11 +2069,8 @@ export class ActorInstance { // Check for active conns. This will also cover active actions, since all actions have a connection. for (const conn of this.#connections.values()) { - // TODO: Enable this when hibernation is implemented. We're waiting on support for Guard to not auto-wake the actor if it sleeps. - // if (conn.status === "connected" && !conn.isHibernatable) - // return false; - - if (conn.status === "connected") return CanSleep.ActiveConns; + if (conn.status === "connected" && !conn.isHibernatable) + return CanSleep.ActiveConns; } return CanSleep.Yes; diff --git a/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts b/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts index c577b58915..0f97ffe7ac 100644 --- a/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts +++ b/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts @@ -213,14 +213,16 @@ export class EngineActorDriver implements ActorDriver { ); // Check if can hibernate - const canHibernatWebSocket = - definition.config.options?.canHibernatWebSocket; - if (canHibernatWebSocket === true) { + const canHibernateWebSocket = + definition.config.options?.canHibernateWebSocket; + if (canHibernateWebSocket === true) { hibernationConfig = { enabled: true, lastMsgIndex: undefined, }; - } else if (typeof canHibernatWebSocket === "function") { + } else if ( + typeof canHibernateWebSocket === "function" + ) { try { // Truncate the path to match the behavior on onRawWebSocket const newPath = truncateRawWebSocketPathPrefix( @@ -232,14 +234,14 @@ export class EngineActorDriver implements ActorDriver { ); const canHibernate = - canHibernatWebSocket(truncatedRequest); + canHibernateWebSocket(truncatedRequest); hibernationConfig = { enabled: canHibernate, lastMsgIndex: undefined, }; } catch (error) { logger().error({ - msg: "error calling canHibernatWebSocket", + msg: "error calling canHibernateWebSocket", error, }); hibernationConfig = { diff --git a/website/public/llms-full.txt b/website/public/llms-full.txt index 26bdf66b13..a3af55421b 100644 --- a/website/public/llms-full.txt +++ b/website/public/llms-full.txt @@ -6341,6 +6341,11 @@ kubectl -n rivet-engine wait --for=condition=ready pod -l app=postgres --timeout ### 3. Deploy Rivet Engine +The Rivet Engine deployment consists of two components: + +- **Main Engine Deployment**: Runs all services except singleton services. Configured with Horizontal Pod Autoscaling (HPA) to automatically scale between 2-10 replicas based on CPU and memory utilization. +- **Singleton Engine Deployment**: Runs singleton services that must have exactly 1 replica (e.g., schedulers, coordinators). + Save as `rivet-engine.yaml`: ```yaml @@ -6382,7 +6387,7 @@ metadata: name: rivet-engine namespace: rivet-engine spec: - replicas: 1 + replicas: 2 selector: matchLabels: app: rivet-engine @@ -6396,6 +6401,8 @@ spec: image: rivetkit/engine:latest args: - start + - --except-services + - singleton env: - name: RIVET_CONFIG_PATH value: /etc/rivet/config.jsonc @@ -6410,16 +6417,107 @@ spec: readOnly: true resources: requests: - cpu: 500m - memory: 1Gi + cpu: 2000m + memory: 4Gi limits: + cpu: 4000m + memory: 8Gi + startupProbe: + httpGet: + path: /health + port: 6421 + initialDelaySeconds: 30 + periodSeconds: 10 + failureThreshold: 30 + readinessProbe: + httpGet: + path: /health + port: 6421 + periodSeconds: 5 + failureThreshold: 2 + livenessProbe: + httpGet: + path: /health + port: 6421 + periodSeconds: 10 + failureThreshold: 3 + volumes: + - name: config + configMap: + name: engine-config +--- +apiVersion: autoscaling/v2 +kind: HorizontalPodAutoscaler +metadata: + name: rivet-engine + namespace: rivet-engine +spec: + scaleTargetRef: + apiVersion: apps/v1 + kind: Deployment + name: rivet-engine + minReplicas: 2 + maxReplicas: 10 + metrics: + - type: Resource + resource: + name: cpu + target: + type: Utilization + averageUtilization: 60 + - type: Resource + resource: + name: memory + target: + type: Utilization + averageUtilization: 80 +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: rivet-engine-singleton + namespace: rivet-engine +spec: + replicas: 1 + selector: + matchLabels: + app: rivet-engine-singleton + template: + metadata: + labels: + app: rivet-engine-singleton + spec: + containers: + - name: rivet-engine + image: rivetkit/engine:latest + args: + - start + - --services + - singleton + - --services + - api-peer + env: + - name: RIVET_CONFIG_PATH + value: /etc/rivet/config.jsonc + ports: + - containerPort: 6421 + name: api-peer + volumeMounts: + - name: config + mountPath: /etc/rivet + readOnly: true + resources: + requests: cpu: 2000m memory: 4Gi + limits: + cpu: 4000m + memory: 8Gi startupProbe: httpGet: path: /health port: 6421 - initialDelaySeconds: 10 + initialDelaySeconds: 30 periodSeconds: 10 failureThreshold: 30 readinessProbe: @@ -6427,11 +6525,13 @@ spec: path: /health port: 6421 periodSeconds: 5 + failureThreshold: 2 livenessProbe: httpGet: path: /health port: 6421 periodSeconds: 10 + failureThreshold: 3 volumes: - name: config configMap: @@ -6443,9 +6543,21 @@ Apply and wait for the engine to be ready: ```bash kubectl apply -f rivet-engine.yaml kubectl -n rivet-engine wait --for=condition=ready pod -l app=rivet-engine --timeout=300s +kubectl -n rivet-engine wait --for=condition=ready pod -l app=rivet-engine-singleton --timeout=300s ``` -### 4. Access the Engine +**Note**: The HPA requires a metrics server to be running in your cluster. Most Kubernetes distributions (including k3d, GKE, EKS, AKS) include this by default. + +### 4. Verify Deployment + +Check that all pods are running (you should see 2+ engine pods and 1 singleton pod): + +```bash +kubectl -n rivet-engine get pods +kubectl -n rivet-engine get hpa +``` + +### 5. Access the Engine Get the service URL: @@ -6528,14 +6640,44 @@ k3d cluster delete rivet ### Scaling -For horizontal scaling, update the deployment: +The engine is configured with Horizontal Pod Autoscaling (HPA) by default, automatically scaling between 2-10 replicas based on CPU (60%) and memory (80%) utilization. + +To adjust the scaling parameters, modify the HPA configuration: ```yaml +apiVersion: autoscaling/v2 +kind: HorizontalPodAutoscaler +metadata: + name: rivet-engine + namespace: rivet-engine spec: - replicas: 3 # Multiple replicas -``` + scaleTargetRef: + apiVersion: apps/v1 + kind: Deployment + name: rivet-engine + minReplicas: 2 # Adjust minimum replicas + maxReplicas: 20 # Adjust maximum replicas + metrics: + - type: Resource + resource: + name: cpu + target: + type: Utilization + averageUtilization: 70 # Adjust CPU threshold + - type: Resource + resource: + name: memory + target: + type: Utilization + averageUtilization: 80 # Adjust memory threshold +``` + +Monitor HPA status: -See our [HPA set up on github](https://github.com/rivet-gg/rivet/tree/main/k8s/engines/05-rivet-engine-hpa.yaml) for info on configuring automatic horizontal scaling. +```bash +kubectl -n rivet-engine get hpa +kubectl -n rivet-engine describe hpa rivet-engine +``` ## Next Steps diff --git a/website/public/llms.txt b/website/public/llms.txt index b2d6fa1cf4..6ab0fea412 100644 --- a/website/public/llms.txt +++ b/website/public/llms.txt @@ -24,8 +24,10 @@ https://rivet.dev/blog/2025-10-01-railway-selfhost https://rivet.dev/blog/2025-10-05-weekly-updates https://rivet.dev/blog/2025-10-09-rivet-cloud-launch https://rivet.dev/blog/2025-10-17-rivet-actors-vercel +https://rivet.dev/blog/2025-10-19-weekly-updates https://rivet.dev/blog/2025-10-20-how-we-built-websocket-servers-for-vercel-functions https://rivet.dev/blog/2025-10-20-weekly-updates +https://rivet.dev/blog/2025-10-24-weekly-updates https://rivet.dev/blog/godot-multiplayer-compared-to-unity https://rivet.dev/changelog https://rivet.dev/changelog.json @@ -50,8 +52,10 @@ https://rivet.dev/changelog/2025-10-01-railway-selfhost https://rivet.dev/changelog/2025-10-05-weekly-updates https://rivet.dev/changelog/2025-10-09-rivet-cloud-launch https://rivet.dev/changelog/2025-10-17-rivet-actors-vercel +https://rivet.dev/changelog/2025-10-19-weekly-updates https://rivet.dev/changelog/2025-10-20-how-we-built-websocket-servers-for-vercel-functions https://rivet.dev/changelog/2025-10-20-weekly-updates +https://rivet.dev/changelog/2025-10-24-weekly-updates https://rivet.dev/changelog/godot-multiplayer-compared-to-unity https://rivet.dev/cloud https://rivet.dev/docs/actors