diff --git a/Cargo.lock b/Cargo.lock index b4abc629a1..0090de3921 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3443,6 +3443,7 @@ dependencies = [ "universaldb", "universalpubsub", "utoipa", + "uuid", "vbare", ] diff --git a/engine/packages/cache-purge/src/lib.rs b/engine/packages/cache-purge/src/lib.rs index a08b0a9c78..4bcde3f530 100644 --- a/engine/packages/cache-purge/src/lib.rs +++ b/engine/packages/cache-purge/src/lib.rs @@ -9,7 +9,7 @@ pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> R let ups = pools.ups()?; let mut sub = ups.subscribe(CACHE_PURGE_TOPIC).await?; - tracing::info!(subject=?CACHE_PURGE_TOPIC, "subscribed to cache purge updates"); + tracing::debug!(subject=?CACHE_PURGE_TOPIC, "subscribed to cache purge updates"); // Get cache instance let cache = rivet_cache::CacheInner::from_env(&config, pools)?; diff --git a/engine/packages/config/src/config/pegboard.rs b/engine/packages/config/src/config/pegboard.rs index 98f2df6228..57220fe7db 100644 --- a/engine/packages/config/src/config/pegboard.rs +++ b/engine/packages/config/src/config/pegboard.rs @@ -50,6 +50,12 @@ pub struct Pegboard { /// /// **Experimental** pub runner_lost_threshold: Option, + /// How long after last ping before considering a hibernating request disconnected. + /// + /// Unit is in milliseconds. + /// + /// **Experimental** + pub hibernating_request_eligible_threshold: Option, } impl Pegboard { @@ -80,4 +86,9 @@ impl Pegboard { pub fn runner_lost_threshold(&self) -> i64 { self.runner_lost_threshold.unwrap_or(15_000) } + + pub fn hibernating_request_eligible_threshold(&self) -> i64 { + self.hibernating_request_eligible_threshold + .unwrap_or(90_000) + } } diff --git a/engine/packages/gasoline/src/db/kv/keys/worker.rs b/engine/packages/gasoline/src/db/kv/keys/worker.rs index 376e6b1d27..901bcfd8ee 100644 --- a/engine/packages/gasoline/src/db/kv/keys/worker.rs +++ b/engine/packages/gasoline/src/db/kv/keys/worker.rs @@ -1,6 +1,4 @@ -use std::result::Result::Ok; - -use anyhow::*; +use anyhow::Result; use rivet_util::Id; use universaldb::prelude::*; diff --git a/engine/packages/guard-core/src/custom_serve.rs b/engine/packages/guard-core/src/custom_serve.rs index 7a7f4a2e2c..1f57fbeac5 100644 --- a/engine/packages/guard-core/src/custom_serve.rs +++ b/engine/packages/guard-core/src/custom_serve.rs @@ -34,6 +34,8 @@ pub trait CustomServeTrait: Send + Sync { _request_context: &mut RequestContext, // Identifies the websocket across retries. _unique_request_id: Uuid, + // True if this websocket is reconnecting after hibernation. + _after_hibernation: bool, ) -> Result> { bail!("service does not support websockets"); } @@ -42,6 +44,7 @@ pub trait CustomServeTrait: Send + Sync { async fn handle_websocket_hibernation( &self, _websocket: WebSocketHandle, + _unique_request_id: Uuid, ) -> Result { bail!("service does not support websocket hibernation"); } diff --git a/engine/packages/guard-core/src/proxy_service.rs b/engine/packages/guard-core/src/proxy_service.rs index d025c0e0a4..25831626ec 100644 --- a/engine/packages/guard-core/src/proxy_service.rs +++ b/engine/packages/guard-core/src/proxy_service.rs @@ -1845,6 +1845,7 @@ impl ProxyService { async move { let request_id = Uuid::new_v4(); let mut ws_hibernation_close = false; + let mut after_hibernation = false; let mut attempts = 0u32; let ws_handle = WebSocketHandle::new(client_ws) @@ -1859,6 +1860,7 @@ impl ProxyService { &req_path, &mut request_context, request_id, + after_hibernation, ) .await { @@ -1926,9 +1928,14 @@ impl ProxyService { // - the gateway will continue reading messages from the client ws // (starting with the message that caused the hibernation to end) let res = handler - .handle_websocket_hibernation(ws_handle.clone()) + .handle_websocket_hibernation( + ws_handle.clone(), + request_id, + ) .await?; + after_hibernation = true; + // Despite receiving a close frame from the client during hibernation // we are going to reconnect to the actor so that it knows the // connection has closed diff --git a/engine/packages/guard-core/tests/custom_serve.rs b/engine/packages/guard-core/tests/custom_serve.rs index 0d8196373d..af87a82097 100644 --- a/engine/packages/guard-core/tests/custom_serve.rs +++ b/engine/packages/guard-core/tests/custom_serve.rs @@ -72,6 +72,7 @@ impl CustomServeTrait for TestCustomServe { _path: &str, _request_context: &mut RequestContext, _unique_request_id: Uuid, + _after_hibernation: bool, ) -> Result> { // Track this WebSocket call self.tracker @@ -115,6 +116,7 @@ impl CustomServeTrait for TestCustomServe { async fn handle_websocket_hibernation( &self, _websocket: WebSocketHandle, + _unique_request_id: Uuid, ) -> Result { // Track this WebSocket call self.tracker diff --git a/engine/packages/pegboard-gateway/src/lib.rs b/engine/packages/pegboard-gateway/src/lib.rs index 81b7b9dd09..a486607a8f 100644 --- a/engine/packages/pegboard-gateway/src/lib.rs +++ b/engine/packages/pegboard-gateway/src/lib.rs @@ -5,6 +5,7 @@ use futures_util::TryStreamExt; use gas::prelude::*; use http_body_util::{BodyExt, Full}; use hyper::{Request, Response, StatusCode}; +use rand::Rng; use rivet_error::*; use rivet_guard_core::{ WebSocketHandle, @@ -20,13 +21,16 @@ use rivet_guard_core::{ use rivet_runner_protocol as protocol; use rivet_util::serde::HashableMap; use std::{sync::Arc, time::Duration}; -use tokio::sync::{Mutex, watch}; +use tokio::{ + sync::{Mutex, watch}, + task::JoinHandle, +}; use tokio_tungstenite::tungstenite::{ Message, protocol::frame::{CloseFrame, coding::CloseCode}, }; -use crate::shared_state::{InFlightRequestHandle, SharedState, TunnelMessageData}; +use crate::shared_state::{InFlightRequestHandle, SharedState}; pub mod shared_state; @@ -156,7 +160,7 @@ impl CustomServeTrait for PegboardGateway { let request_id = Uuid::new_v4().into_bytes(); let InFlightRequestHandle { mut msg_rx, - drop_rx: _drop_rx, + mut drop_rx, } = self .shared_state .start_in_flight_request(tunnel_subject, request_id) @@ -182,30 +186,39 @@ impl CustomServeTrait for PegboardGateway { // Wait for response tracing::debug!("gateway waiting for response from tunnel"); let fut = async { - while let Some(msg) = msg_rx.recv().await { - match msg { - TunnelMessageData::Message(msg) => match msg { - protocol::ToServerTunnelMessageKind::ToServerResponseStart( - response_start, - ) => { - return anyhow::Ok(response_start); - } - protocol::ToServerTunnelMessageKind::ToServerResponseAbort => { - tracing::warn!("request aborted"); - return Err(ServiceUnavailable.build()); - } - _ => { - tracing::warn!("received non-response message from pubsub"); + loop { + tokio::select! { + res = msg_rx.recv() => { + if let Some(msg) = res { + match msg { + protocol::ToServerTunnelMessageKind::ToServerResponseStart( + response_start, + ) => { + return anyhow::Ok(response_start); + } + protocol::ToServerTunnelMessageKind::ToServerResponseAbort => { + tracing::warn!("request aborted"); + return Err(ServiceUnavailable.build()); + } + _ => { + tracing::warn!("received non-response message from pubsub"); + } + } + } else { + tracing::warn!( + request_id=?Uuid::from_bytes(request_id), + "received no message response during request init", + ); + break; } - }, - TunnelMessageData::Timeout => { + } + _ = drop_rx.changed() => { tracing::warn!("tunnel message timeout"); return Err(ServiceUnavailable.build()); } } } - tracing::warn!(request_id=?Uuid::from_bytes(request_id), "received no message response during request init"); Err(ServiceUnavailable.build()) }; let response_start = tokio::time::timeout(TUNNEL_ACK_TIMEOUT, fut) @@ -252,6 +265,7 @@ impl CustomServeTrait for PegboardGateway { _path: &str, _request_context: &mut RequestContext, unique_request_id: Uuid, + after_hibernation: bool, ) -> Result> { // Use the actor ID from the gateway instance let actor_id = self.actor_id.to_string(); @@ -272,73 +286,87 @@ impl CustomServeTrait for PegboardGateway { let request_id = unique_request_id.into_bytes(); let InFlightRequestHandle { mut msg_rx, - drop_rx: _drop_rx, + mut drop_rx, } = self .shared_state .start_in_flight_request(tunnel_subject.clone(), request_id) .await; - // Send WebSocket open message - let open_message = protocol::ToClientTunnelMessageKind::ToClientWebSocketOpen( - protocol::ToClientWebSocketOpen { - actor_id: actor_id.clone(), - path: self.path.clone(), - headers: request_headers, - }, - ); + // If we are reconnecting after hibernation, don't send an open message + let can_hibernate = if after_hibernation { + true + } else { + // Send WebSocket open message + let open_message = protocol::ToClientTunnelMessageKind::ToClientWebSocketOpen( + protocol::ToClientWebSocketOpen { + actor_id: actor_id.clone(), + path: self.path.clone(), + headers: request_headers, + }, + ); - self.shared_state - .send_message(request_id, open_message) - .await?; + self.shared_state + .send_message(request_id, open_message) + .await?; - tracing::debug!("gateway waiting for websocket open from tunnel"); + tracing::debug!("gateway waiting for websocket open from tunnel"); - // Wait for WebSocket open acknowledgment - let fut = async { - while let Some(msg) = msg_rx.recv().await { - match msg { - TunnelMessageData::Message( - protocol::ToServerTunnelMessageKind::ToServerWebSocketOpen(msg), - ) => { - return anyhow::Ok(msg); - } - TunnelMessageData::Message( - protocol::ToServerTunnelMessageKind::ToServerWebSocketClose(close), - ) => { - tracing::warn!(?close, "websocket closed before opening"); - return Err(WebSocketServiceUnavailable.build()); - } - TunnelMessageData::Timeout => { - tracing::warn!("websocket open timeout"); - return Err(WebSocketServiceUnavailable.build()); - } - _ => { - tracing::warn!( - "received unexpected message while waiting for websocket open" - ); + // Wait for WebSocket open acknowledgment + let fut = async { + loop { + tokio::select! { + res = msg_rx.recv() => { + if let Some(msg) = res { + match msg { + protocol::ToServerTunnelMessageKind::ToServerWebSocketOpen(msg) => { + return anyhow::Ok(msg); + } + protocol::ToServerTunnelMessageKind::ToServerWebSocketClose(close) => { + tracing::warn!(?close, "websocket closed before opening"); + return Err(WebSocketServiceUnavailable.build()); + } + _ => { + tracing::warn!( + "received unexpected message while waiting for websocket open" + ); + } + } + } else { + tracing::warn!( + request_id=?Uuid::from_bytes(request_id), + "received no message response during ws init", + ); + break; + } + } + _ = drop_rx.changed() => { + tracing::warn!("websocket open timeout"); + return Err(WebSocketServiceUnavailable.build()); + } } } - } - tracing::warn!(request_id=?Uuid::from_bytes(request_id), "received no message response during ws init"); - Err(WebSocketServiceUnavailable.build()) - }; + Err(WebSocketServiceUnavailable.build()) + }; - let open_msg = tokio::time::timeout(TUNNEL_ACK_TIMEOUT, fut) - .await - .map_err(|_| { - tracing::warn!("timed out waiting for tunnel ack"); + let open_msg = tokio::time::timeout(TUNNEL_ACK_TIMEOUT, fut) + .await + .map_err(|_| { + tracing::warn!("timed out waiting for tunnel ack"); - WebSocketServiceUnavailable.build() - })??; + WebSocketServiceUnavailable.build() + })??; - self.shared_state - .toggle_hibernation(request_id, open_msg.can_hibernate) - .await?; + self.shared_state + .toggle_hibernation(request_id, open_msg.can_hibernate) + .await?; + + open_msg.can_hibernate + }; // Send reclaimed messages self.shared_state - .resend_pending_websocket_messages(request_id, open_msg.last_msg_index) + .resend_pending_websocket_messages(request_id) .await?; let ws_rx = client_ws.recv(); @@ -355,9 +383,7 @@ impl CustomServeTrait for PegboardGateway { res = msg_rx.recv() => { if let Some(msg) = res { match msg { - TunnelMessageData::Message( - protocol::ToServerTunnelMessageKind::ToServerWebSocketMessage(ws_msg), - ) => { + protocol::ToServerTunnelMessageKind::ToServerWebSocketMessage(ws_msg) => { let msg = if ws_msg.binary { Message::Binary(ws_msg.data.into()) } else { @@ -367,29 +393,21 @@ impl CustomServeTrait for PegboardGateway { }; client_ws.send(msg).await?; } - TunnelMessageData::Message( - protocol::ToServerTunnelMessageKind::ToServerWebSocketMessageAck(ack), - ) => { + protocol::ToServerTunnelMessageKind::ToServerWebSocketMessageAck(ack) => { shared_state .ack_pending_websocket_messages(request_id, ack.index) .await?; } - TunnelMessageData::Message( - protocol::ToServerTunnelMessageKind::ToServerWebSocketClose(close), - ) => { + protocol::ToServerTunnelMessageKind::ToServerWebSocketClose(close) => { tracing::debug!(?close, "server closed websocket"); - if open_msg.can_hibernate && close.retry { + if can_hibernate && close.retry { // Successful closure return Err(WebSocketServiceHibernate.build()); } else { return Ok(LifecycleResult::ServerClose(close)); } } - TunnelMessageData::Timeout => { - tracing::warn!("websocket message timeout"); - return Err(WebSocketServiceTimeout.build()); - } _ => {} } } else { @@ -397,6 +415,10 @@ impl CustomServeTrait for PegboardGateway { return Err(WebSocketServiceHibernate.build()); } } + _ = drop_rx.changed() => { + tracing::warn!("websocket message timeout"); + return Err(WebSocketServiceTimeout.build()); + } _ = tunnel_to_ws_abort_rx.changed() => { tracing::debug!("task aborted"); return Ok(LifecycleResult::Aborted); @@ -555,22 +577,77 @@ impl CustomServeTrait for PegboardGateway { async fn handle_websocket_hibernation( &self, client_ws: WebSocketHandle, + unique_request_id: Uuid, + ) -> Result { + // Start keepalive task + let ctx = self.ctx.clone(); + let actor_id = self.actor_id; + let keepalive_handle: JoinHandle> = tokio::spawn(async move { + let mut ping_interval = tokio::time::interval(Duration::from_millis( + (ctx.config() + .pegboard() + .hibernating_request_eligible_threshold() + / 2) + .try_into()?, + )); + ping_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + ping_interval.tick().await; + + // Jitter sleep to prevent stampeding herds + let jitter = { rand::thread_rng().gen_range(0..128) }; + tokio::time::sleep(Duration::from_millis(jitter)).await; + + ctx.op(pegboard::ops::actor::hibernating_request::upsert::Input { + actor_id, + request_id: unique_request_id, + }) + .await?; + } + }); + + let res = self.handle_websocket_hibernation_inner(client_ws).await; + + keepalive_handle.abort(); + + match &res { + Ok(HibernationResult::Continue) => {} + Ok(HibernationResult::Close) | Err(_) => { + // No longer an active hibernating request, delete entry + self.ctx + .op(pegboard::ops::actor::hibernating_request::delete::Input { + actor_id: self.actor_id, + request_id: unique_request_id, + }) + .await?; + } + } + + res + } +} + +impl PegboardGateway { + async fn handle_websocket_hibernation_inner( + &self, + client_ws: WebSocketHandle, ) -> Result { let mut ready_sub = self .ctx .subscribe::(("actor_id", self.actor_id)) .await?; - let close = tokio::select! { + let res = tokio::select! { _ = ready_sub.next() => { tracing::debug!("actor became ready during hibernation"); HibernationResult::Continue } hibernation_res = hibernate_ws(client_ws.recv()) => { - let res = hibernation_res?; + let hibernation_res = hibernation_res?; - match &res { + match &hibernation_res { HibernationResult::Continue => { tracing::debug!("received message during hibernation"); } @@ -579,11 +656,11 @@ impl CustomServeTrait for PegboardGateway { } } - res + hibernation_res } }; - Ok(close) + Ok(res) } } diff --git a/engine/packages/pegboard-gateway/src/shared_state.rs b/engine/packages/pegboard-gateway/src/shared_state.rs index 7cef4cc803..8b6aabb8cb 100644 --- a/engine/packages/pegboard-gateway/src/shared_state.rs +++ b/engine/packages/pegboard-gateway/src/shared_state.rs @@ -17,13 +17,8 @@ const GC_INTERVAL: Duration = Duration::from_secs(15); const MESSAGE_ACK_TIMEOUT: Duration = Duration::from_secs(30); const MAX_PENDING_MSGS_SIZE_PER_REQ: u64 = util::size::mebibytes(1); -pub enum TunnelMessageData { - Message(protocol::ToServerTunnelMessageKind), - Timeout, -} - pub struct InFlightRequestHandle { - pub msg_rx: mpsc::Receiver, + pub msg_rx: mpsc::Receiver, /// Used to check if the request handler has been dropped. /// /// This is separate from `msg_rx` there may still be messages that need to be sent to the @@ -35,7 +30,7 @@ struct InFlightRequest { /// UPS subject to send messages to for this request. receiver_subject: String, /// Sender for incoming messages to this request. - msg_tx: mpsc::Sender, + msg_tx: mpsc::Sender, /// Used to check if the request handler has been dropped. drop_tx: watch::Sender<()>, /// True once first message for this request has been sent (so runner learned reply_to). @@ -260,10 +255,7 @@ impl SharedState { request_id=?Uuid::from_bytes(msg.request_id), "forwarding message to request handler" ); - let _ = in_flight - .msg_tx - .send(TunnelMessageData::Message(msg.message_kind)) - .await; + let _ = in_flight.msg_tx.send(msg.message_kind).await; // Send ack back to runner let ups_clone = self.ups.clone(); @@ -330,11 +322,7 @@ impl SharedState { Ok(()) } - pub async fn resend_pending_websocket_messages( - &self, - request_id: RequestId, - last_msg_index: i64, - ) -> Result<()> { + pub async fn resend_pending_websocket_messages(&self, request_id: RequestId) -> Result<()> { let Some(mut req) = self.in_flight_requests.get_async(&request_id).await else { bail!("request not in flight"); }; @@ -343,45 +331,12 @@ impl SharedState { if let Some(hs) = &mut req.hibernation_state { if !hs.pending_ws_msgs.is_empty() { - tracing::debug!(request_id=?Uuid::from_bytes(request_id.clone()), len=?hs.pending_ws_msgs.len(), ?last_msg_index, "resending pending messages"); - - let len = hs.pending_ws_msgs.len().try_into()?; - - for (iter_index, pending_msg) in hs.pending_ws_msgs.iter().enumerate() { - let msg_index = hs - .last_ws_msg_index - .wrapping_sub(len) - .wrapping_add(1) - .wrapping_add(iter_index.try_into()?); - - if last_msg_index < 0 || wrapping_gt(msg_index, last_msg_index.try_into()?) { - self.ups - .publish(&receiver_subject, &pending_msg.payload, PublishOpts::one()) - .await?; - } - } - - // Perform ack - if last_msg_index >= 0 { - let last_msg_index = last_msg_index.try_into()?; - let mut iter_index = 0; - - hs.pending_ws_msgs.retain(|_| { - let msg_index = hs - .last_ws_msg_index - .wrapping_sub(len) - .wrapping_add(1) - .wrapping_add(iter_index); - let keep = wrapping_gt(msg_index, last_msg_index); + tracing::debug!(request_id=?Uuid::from_bytes(request_id.clone()), len=?hs.pending_ws_msgs.len(), "resending pending messages"); - iter_index += 1; - - keep - }); - - if hs.pending_ws_msgs.is_empty() { - hs.last_ws_msg_index = last_msg_index; - } + for pending_msg in &hs.pending_ws_msgs { + self.ups + .publish(&receiver_subject, &pending_msg.payload, PublishOpts::one()) + .await?; } } } @@ -517,7 +472,9 @@ impl SharedState { "gc stopping in flight request" ); - let _ = req.msg_tx.send(TunnelMessageData::Timeout); + if req.drop_tx.send(()).is_err() { + tracing::debug!(request_id=?Uuid::from_bytes(*request_id), "failed to send timeout msg to tunnel"); + } // Mark req as stopping to skip this loop next time the gc is run req.stopping = true; diff --git a/engine/packages/pegboard-runner/src/lib.rs b/engine/packages/pegboard-runner/src/lib.rs index 6828114d74..08f24f302e 100644 --- a/engine/packages/pegboard-runner/src/lib.rs +++ b/engine/packages/pegboard-runner/src/lib.rs @@ -69,6 +69,7 @@ impl CustomServeTrait for PegboardRunnerWsCustomServe { path: &str, _request_context: &mut RequestContext, _unique_request_id: Uuid, + _after_hibernation: bool, ) -> Result> { // Get UPS let ups = self.ctx.ups().context("failed to get UPS instance")?; @@ -137,6 +138,7 @@ impl CustomServeTrait for PegboardRunnerWsCustomServe { let (ping_abort_tx, ping_abort_rx) = watch::channel(()); let tunnel_to_ws = tokio::spawn(tunnel_to_ws_task::task( + self.ctx.clone(), conn.clone(), sub, eviction_sub, diff --git a/engine/packages/pegboard-runner/src/tunnel_to_ws_task.rs b/engine/packages/pegboard-runner/src/tunnel_to_ws_task.rs index 7c334fc622..bf92171beb 100644 --- a/engine/packages/pegboard-runner/src/tunnel_to_ws_task.rs +++ b/engine/packages/pegboard-runner/src/tunnel_to_ws_task.rs @@ -15,6 +15,7 @@ use crate::{ #[tracing::instrument(skip_all, fields(runner_id=?conn.runner_id, workflow_id=?conn.workflow_id, protocol_version=%conn.protocol_version))] pub async fn task( + ctx: StandaloneCtx, conn: Arc, mut sub: Subscriber, mut eviction_sub: Subscriber, @@ -57,6 +58,26 @@ pub async fn task( match &mut msg { protocol::ToClient::ToClientClose => return Err(errors::WsError::Eviction.build()), + // Dynamically populate hibernating request ids + protocol::ToClient::ToClientCommands(command_wrappers) => { + for command_wrapper in command_wrappers { + if let protocol::Command::CommandStartActor(protocol::CommandStartActor { + actor_id, + hibernating_request_ids, + .. + }) = &mut command_wrapper.inner + { + let ids = ctx + .op(pegboard::ops::actor::hibernating_request::list::Input { + actor_id: Id::parse(actor_id)?, + }) + .await?; + + *hibernating_request_ids = + ids.into_iter().map(|x| x.into_bytes().to_vec()).collect(); + } + } + } // Handle tunnel messages protocol::ToClient::ToClientTunnelMessage(tunnel_msg) => { match tunnel_msg.message_kind { diff --git a/engine/packages/pegboard/Cargo.toml b/engine/packages/pegboard/Cargo.toml index e9a661c822..9204944e14 100644 --- a/engine/packages/pegboard/Cargo.toml +++ b/engine/packages/pegboard/Cargo.toml @@ -28,4 +28,5 @@ tracing.workspace = true universaldb.workspace = true universalpubsub.workspace = true utoipa.workspace = true +uuid.workspace = true vbare.workspace = true diff --git a/engine/packages/pegboard/src/keys/actor.rs b/engine/packages/pegboard/src/keys/actor.rs index ef2563e9ed..00a889506e 100644 --- a/engine/packages/pegboard/src/keys/actor.rs +++ b/engine/packages/pegboard/src/keys/actor.rs @@ -1,8 +1,7 @@ -use std::result::Result::Ok; - -use anyhow::*; +use anyhow::Result; use gas::prelude::*; use universaldb::prelude::*; +use uuid::Uuid; #[derive(Debug)] pub struct CreateTsKey { @@ -313,3 +312,113 @@ impl<'de> TupleUnpack<'de> for NamespaceIdKey { Ok((input, v)) } } + +#[derive(Debug)] +pub struct HibernatingRequestKey { + actor_id: Id, + last_ping_ts: i64, + pub request_id: Uuid, +} + +impl HibernatingRequestKey { + pub fn new(actor_id: Id, last_ping_ts: i64, request_id: Uuid) -> Self { + HibernatingRequestKey { + actor_id, + last_ping_ts, + request_id, + } + } + + pub fn subspace_with_ts(actor_id: Id, last_ping_ts: i64) -> HibernatingRequestSubspaceKey { + HibernatingRequestSubspaceKey::new_with_ts(actor_id, last_ping_ts) + } + + pub fn subspace(actor_id: Id) -> HibernatingRequestSubspaceKey { + HibernatingRequestSubspaceKey::new(actor_id) + } +} + +impl FormalKey for HibernatingRequestKey { + type Value = (); + + fn deserialize(&self, _raw: &[u8]) -> Result { + Ok(()) + } + + fn serialize(&self, _value: Self::Value) -> Result> { + Ok(Vec::new()) + } +} + +impl TuplePack for HibernatingRequestKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + let t = ( + ACTOR, + HIBERNATING_REQUEST, + self.actor_id, + self.last_ping_ts, + self.request_id, + ); + t.pack(w, tuple_depth) + } +} + +impl<'de> TupleUnpack<'de> for HibernatingRequestKey { + fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { + let (input, (_, _, actor_id, last_ping_ts, request_id)) = + <(usize, usize, Id, i64, Uuid)>::unpack(input, tuple_depth)?; + + let v = HibernatingRequestKey { + actor_id, + last_ping_ts, + request_id, + }; + + Ok((input, v)) + } +} + +#[derive(Debug)] +pub struct HibernatingRequestSubspaceKey { + actor_id: Id, + last_ping_ts: Option, +} + +impl HibernatingRequestSubspaceKey { + pub fn new(actor_id: Id) -> Self { + HibernatingRequestSubspaceKey { + actor_id, + last_ping_ts: None, + } + } + + pub fn new_with_ts(actor_id: Id, last_ping_ts: i64) -> Self { + HibernatingRequestSubspaceKey { + actor_id, + last_ping_ts: Some(last_ping_ts), + } + } +} + +impl TuplePack for HibernatingRequestSubspaceKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + let mut offset = VersionstampOffset::None { size: 0 }; + + let t = (ACTOR, HIBERNATING_REQUEST, self.actor_id); + offset += t.pack(w, tuple_depth)?; + + if let Some(last_ping_ts) = self.last_ping_ts { + offset += last_ping_ts.pack(w, tuple_depth)?; + } + + Ok(offset) + } +} diff --git a/engine/packages/pegboard/src/keys/hibernating_request.rs b/engine/packages/pegboard/src/keys/hibernating_request.rs new file mode 100644 index 0000000000..49e47b3069 --- /dev/null +++ b/engine/packages/pegboard/src/keys/hibernating_request.rs @@ -0,0 +1,49 @@ +use anyhow::Result; +use universaldb::prelude::*; +use uuid::Uuid; + +#[derive(Debug)] +pub struct LastPingTsKey { + request_id: Uuid, +} + +impl LastPingTsKey { + pub fn new(request_id: Uuid) -> Self { + LastPingTsKey { request_id } + } +} + +impl FormalKey for LastPingTsKey { + /// Timestamp. + type Value = i64; + + fn deserialize(&self, raw: &[u8]) -> Result { + Ok(i64::from_be_bytes(raw.try_into()?)) + } + + fn serialize(&self, value: Self::Value) -> Result> { + Ok(value.to_be_bytes().to_vec()) + } +} + +impl TuplePack for LastPingTsKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + let t = (HIBERNATING_REQUEST, DATA, self.request_id, LAST_PING_TS); + t.pack(w, tuple_depth) + } +} + +impl<'de> TupleUnpack<'de> for LastPingTsKey { + fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { + let (input, (_, _, request_id, _)) = + <(usize, usize, Uuid, usize)>::unpack(input, tuple_depth)?; + + let v = LastPingTsKey { request_id }; + + Ok((input, v)) + } +} diff --git a/engine/packages/pegboard/src/keys/mod.rs b/engine/packages/pegboard/src/keys/mod.rs index 253fdcb409..232e133bd1 100644 --- a/engine/packages/pegboard/src/keys/mod.rs +++ b/engine/packages/pegboard/src/keys/mod.rs @@ -2,6 +2,7 @@ use universaldb::prelude::*; pub mod actor; pub mod epoxy; +pub mod hibernating_request; pub mod ns; pub mod runner; diff --git a/engine/packages/pegboard/src/ops/actor/hibernating_request/delete.rs b/engine/packages/pegboard/src/ops/actor/hibernating_request/delete.rs new file mode 100644 index 0000000000..026d7ab207 --- /dev/null +++ b/engine/packages/pegboard/src/ops/actor/hibernating_request/delete.rs @@ -0,0 +1,38 @@ +use gas::prelude::*; +use universaldb::utils::IsolationLevel::*; +use uuid::Uuid; + +use crate::keys; + +#[derive(Debug, Default)] +pub struct Input { + pub actor_id: Id, + pub request_id: Uuid, +} + +#[operation] +pub async fn pegboard_actor_hibernating_request_delete( + ctx: &OperationCtx, + input: &Input, +) -> Result<()> { + ctx.udb()? + .run(|tx| async move { + let tx = tx.with_subspace(keys::subspace()); + + let last_ping_ts_key = keys::hibernating_request::LastPingTsKey::new(input.request_id); + + if let Some(last_ping_ts) = tx.read_opt(&last_ping_ts_key, Serializable).await? { + tx.delete(&keys::actor::HibernatingRequestKey::new( + input.actor_id, + last_ping_ts, + input.request_id, + )); + } + + tx.delete(&last_ping_ts_key); + + Ok(()) + }) + .custom_instrument(tracing::info_span!("hibernating_request_delete_tx")) + .await +} diff --git a/engine/packages/pegboard/src/ops/actor/hibernating_request/list.rs b/engine/packages/pegboard/src/ops/actor/hibernating_request/list.rs new file mode 100644 index 0000000000..31baa2baf7 --- /dev/null +++ b/engine/packages/pegboard/src/ops/actor/hibernating_request/list.rs @@ -0,0 +1,56 @@ +use futures_util::{StreamExt, TryStreamExt}; +use gas::prelude::*; +use universaldb::options::StreamingMode; +use universaldb::utils::IsolationLevel::*; +use uuid::Uuid; + +use crate::keys; + +#[derive(Debug, Default)] +pub struct Input { + pub actor_id: Id, +} + +#[operation] +pub async fn pegboard_actor_hibernating_request_list( + ctx: &OperationCtx, + input: &Input, +) -> Result> { + let hibernating_request_eligible_threshold = ctx + .config() + .pegboard() + .hibernating_request_eligible_threshold(); + + ctx.udb()? + .run(|tx| async move { + let tx = tx.with_subspace(keys::subspace()); + + let ping_threshold_ts = util::timestamp::now() - hibernating_request_eligible_threshold; + let hr_subspace_start = tx.pack(&keys::actor::HibernatingRequestKey::subspace_with_ts( + input.actor_id, + ping_threshold_ts, + )); + let hr_subspace_end = keys::subspace() + .subspace(&keys::actor::HibernatingRequestKey::subspace( + input.actor_id, + )) + .range() + .1; + + tx.get_ranges_keyvalues( + universaldb::RangeOption { + mode: StreamingMode::WantAll, + ..(hr_subspace_start, hr_subspace_end).into() + }, + Serializable, + ) + .map(|res| { + let key = tx.unpack::(res?.key())?; + Ok(key.request_id) + }) + .try_collect::>() + .await + }) + .custom_instrument(tracing::info_span!("hibernating_request_list_tx")) + .await +} diff --git a/engine/packages/pegboard/src/ops/actor/hibernating_request/mod.rs b/engine/packages/pegboard/src/ops/actor/hibernating_request/mod.rs new file mode 100644 index 0000000000..fa3aa636ea --- /dev/null +++ b/engine/packages/pegboard/src/ops/actor/hibernating_request/mod.rs @@ -0,0 +1,3 @@ +pub mod delete; +pub mod list; +pub mod upsert; diff --git a/engine/packages/pegboard/src/ops/actor/hibernating_request/upsert.rs b/engine/packages/pegboard/src/ops/actor/hibernating_request/upsert.rs new file mode 100644 index 0000000000..9fd18b6dbf --- /dev/null +++ b/engine/packages/pegboard/src/ops/actor/hibernating_request/upsert.rs @@ -0,0 +1,43 @@ +use gas::prelude::*; +use universaldb::utils::IsolationLevel::*; +use uuid::Uuid; + +use crate::keys; + +#[derive(Debug, Default)] +pub struct Input { + pub actor_id: Id, + pub request_id: Uuid, +} + +#[operation] +pub async fn pegboard_actor_hibernating_request_upsert( + ctx: &OperationCtx, + input: &Input, +) -> Result<()> { + ctx.udb()? + .run(|tx| async move { + let tx = tx.with_subspace(keys::subspace()); + + let last_ping_ts_key = keys::hibernating_request::LastPingTsKey::new(input.request_id); + + if let Some(last_ping_ts) = tx.read_opt(&last_ping_ts_key, Serializable).await? { + tx.delete(&keys::actor::HibernatingRequestKey::new( + input.actor_id, + last_ping_ts, + input.request_id, + )); + } + + let now = util::timestamp::now(); + tx.write(&last_ping_ts_key, now)?; + tx.write( + &keys::actor::HibernatingRequestKey::new(input.actor_id, now, input.request_id), + (), + )?; + + Ok(()) + }) + .custom_instrument(tracing::info_span!("hibernating_request_upsert_tx")) + .await +} diff --git a/engine/packages/pegboard/src/ops/actor/mod.rs b/engine/packages/pegboard/src/ops/actor/mod.rs index fec99d5eee..d1851a3446 100644 --- a/engine/packages/pegboard/src/ops/actor/mod.rs +++ b/engine/packages/pegboard/src/ops/actor/mod.rs @@ -4,5 +4,6 @@ pub mod get_for_gateway; pub mod get_for_key; pub mod get_reservation_for_key; pub mod get_runner; +pub mod hibernating_request; pub mod list_for_ns; pub mod list_names; diff --git a/engine/packages/pegboard/src/workflows/actor/runtime.rs b/engine/packages/pegboard/src/workflows/actor/runtime.rs index 974705bb5c..ce9bc1acec 100644 --- a/engine/packages/pegboard/src/workflows/actor/runtime.rs +++ b/engine/packages/pegboard/src/workflows/actor/runtime.rs @@ -530,6 +530,9 @@ pub async fn spawn_actor( .map(|x| BASE64_STANDARD.decode(x)) .transpose()?, }, + // Empty because request ids are ephemeral. This is intercepted by guard and + // populated before it reaches the runner + hibernating_request_ids: Vec::new(), }), }) .to_workflow_id(runner_workflow_id) @@ -567,8 +570,6 @@ pub async fn spawn_actor( config: protocol::ActorConfig { name: input.name.clone(), key: input.key.clone(), - // HACK: We should not use dynamic timestamp here, but we don't validate if signal data - // changes (like activity inputs) so this is fine for now. create_ts: util::timestamp::now(), input: input .input @@ -576,6 +577,9 @@ pub async fn spawn_actor( .map(|x| BASE64_STANDARD.decode(x)) .transpose()?, }, + // Empty because request ids are ephemeral. This is intercepted by guard and + // populated before it reaches the runner + hibernating_request_ids: Vec::new(), }), }) .to_workflow_id(sig.runner_workflow_id) diff --git a/engine/packages/service-manager/src/lib.rs b/engine/packages/service-manager/src/lib.rs index 638aee72e5..bfc7a9a77c 100644 --- a/engine/packages/service-manager/src/lib.rs +++ b/engine/packages/service-manager/src/lib.rs @@ -318,7 +318,7 @@ pub async fn start( let join_fut = async { let mut handle_futs = running_services .iter_mut() - .map(|(_, handle)| handle) + .filter_map(|(_, handle)| (!handle.is_finished()).then_some(handle)) .collect::>(); while let Some(_) = handle_futs.next().await {} @@ -344,7 +344,7 @@ pub async fn start( if abort { // Give time for services to handle final abort tokio::time::sleep(Duration::from_millis(50)).await; - rivet_runtime::shutdown().await; + rivet_runtime::shutdown().await; // TODO: Fix `JoinHandle polled after completion` error break; } diff --git a/engine/packages/universaldb/src/utils/keys.rs b/engine/packages/universaldb/src/utils/keys.rs index 9257733a92..727ad382f5 100644 --- a/engine/packages/universaldb/src/utils/keys.rs +++ b/engine/packages/universaldb/src/utils/keys.rs @@ -65,7 +65,7 @@ define_keys! { (37, TOTAL_MEMORY, "total_memory"), (38, TOTAL_CPU, "total_cpu"), (39, NAMESPACE, "namespace"), - // 40 + (40, HIBERNATING_REQUEST, "hibernating_request"), (41, DISPLAY_NAME, "display_name"), (42, CONNECTABLE, "connectable"), (43, SLEEP_TS, "sleep_ts"), diff --git a/engine/sdks/rust/data/src/versioned/namespace_runner_config.rs b/engine/sdks/rust/data/src/versioned/namespace_runner_config.rs index c902225806..67d62d5050 100644 --- a/engine/sdks/rust/data/src/versioned/namespace_runner_config.rs +++ b/engine/sdks/rust/data/src/versioned/namespace_runner_config.rs @@ -50,23 +50,59 @@ impl OwnedVersionedData for NamespaceRunnerConfig { impl NamespaceRunnerConfig { fn v1_to_v2(self) -> Result { - match self { - NamespaceRunnerConfig::V1(namespace_runner_config_v1::Data::Serverless(serverless)) => { - let namespace_runner_config_v1::Serverless { - url, - headers, - request_lifespan, - slots_per_runner, - min_runners, - max_runners, - runners_margin, - } = serverless; + if let NamespaceRunnerConfig::V1(namespace_runner_config_v1::Data::Serverless(serverless)) = + self + { + let namespace_runner_config_v1::Serverless { + url, + headers, + request_lifespan, + slots_per_runner, + min_runners, + max_runners, + runners_margin, + } = serverless; + + Ok(NamespaceRunnerConfig::V2( + namespace_runner_config_v2::RunnerConfig { + metadata: None, + kind: namespace_runner_config_v2::RunnerConfigKind::Serverless( + namespace_runner_config_v2::Serverless { + url, + headers, + request_lifespan, + slots_per_runner, + min_runners, + max_runners, + runners_margin, + }, + ), + }, + )) + } else { + bail!("unexpected version"); + } + } - Ok(NamespaceRunnerConfig::V2( - namespace_runner_config_v2::RunnerConfig { - metadata: None, - kind: namespace_runner_config_v2::RunnerConfigKind::Serverless( - namespace_runner_config_v2::Serverless { + fn v2_to_v1(self) -> Result { + if let NamespaceRunnerConfig::V2(config) = self { + let namespace_runner_config_v2::RunnerConfig { kind, .. } = config; + + match kind { + namespace_runner_config_v2::RunnerConfigKind::Serverless(serverless) => { + let namespace_runner_config_v2::Serverless { + url, + headers, + request_lifespan, + slots_per_runner, + min_runners, + max_runners, + runners_margin, + } = serverless; + + Ok(NamespaceRunnerConfig::V1( + namespace_runner_config_v1::Data::Serverless( + namespace_runner_config_v1::Serverless { url, headers, request_lifespan, @@ -76,50 +112,14 @@ impl NamespaceRunnerConfig { runners_margin, }, ), - }, - )) - } - value @ NamespaceRunnerConfig::V2(_) => Ok(value), - } - } - - fn v2_to_v1(self) -> Result { - match self { - NamespaceRunnerConfig::V1(_) => Ok(self), - NamespaceRunnerConfig::V2(config) => { - let namespace_runner_config_v2::RunnerConfig { kind, .. } = config; - - match kind { - namespace_runner_config_v2::RunnerConfigKind::Serverless(serverless) => { - let namespace_runner_config_v2::Serverless { - url, - headers, - request_lifespan, - slots_per_runner, - min_runners, - max_runners, - runners_margin, - } = serverless; - - Ok(NamespaceRunnerConfig::V1( - namespace_runner_config_v1::Data::Serverless( - namespace_runner_config_v1::Serverless { - url, - headers, - request_lifespan, - slots_per_runner, - min_runners, - max_runners, - runners_margin, - }, - ), - )) - } - namespace_runner_config_v2::RunnerConfigKind::Normal => { - bail!("namespace runner config v1 does not support normal runner config") - } + )) + } + namespace_runner_config_v2::RunnerConfigKind::Normal => { + bail!("namespace runner config v1 does not support normal runner config") } } + } else { + bail!("unexpected version"); } } } diff --git a/engine/sdks/rust/runner-protocol/src/lib.rs b/engine/sdks/rust/runner-protocol/src/lib.rs index 04553acb49..cd8d059452 100644 --- a/engine/sdks/rust/runner-protocol/src/lib.rs +++ b/engine/sdks/rust/runner-protocol/src/lib.rs @@ -2,6 +2,6 @@ pub mod generated; pub mod versioned; // Re-export latest -pub use generated::v2::*; +pub use generated::v3::*; -pub const PROTOCOL_VERSION: u16 = 2; +pub const PROTOCOL_VERSION: u16 = 3; diff --git a/engine/sdks/rust/runner-protocol/src/versioned.rs b/engine/sdks/rust/runner-protocol/src/versioned.rs index eb95c194c5..ac19248cdb 100644 --- a/engine/sdks/rust/runner-protocol/src/versioned.rs +++ b/engine/sdks/rust/runner-protocol/src/versioned.rs @@ -1,22 +1,23 @@ use anyhow::{Ok, Result, bail}; use vbare::OwnedVersionedData; -use crate::generated::{v1, v2}; +use crate::generated::{v1, v2, v3}; pub enum ToClient { V1(v1::ToClient), V2(v2::ToClient), + V3(v3::ToClient), } impl OwnedVersionedData for ToClient { - type Latest = v2::ToClient; + type Latest = v3::ToClient; - fn wrap_latest(latest: v2::ToClient) -> Self { - ToClient::V2(latest) + fn wrap_latest(latest: v3::ToClient) -> Self { + ToClient::V3(latest) } fn unwrap_latest(self) -> Result { - if let ToClient::V2(data) = self { + if let ToClient::V3(data) = self { Ok(data) } else { bail!("version not latest"); @@ -27,6 +28,7 @@ impl OwnedVersionedData for ToClient { match version { 1 => Ok(ToClient::V1(serde_bare::from_slice(payload)?)), 2 => Ok(ToClient::V2(serde_bare::from_slice(payload)?)), + 3 => Ok(ToClient::V3(serde_bare::from_slice(payload)?)), _ => bail!("invalid version: {version}"), } } @@ -35,15 +37,16 @@ impl OwnedVersionedData for ToClient { match self { ToClient::V1(data) => serde_bare::to_vec(&data).map_err(Into::into), ToClient::V2(data) => serde_bare::to_vec(&data).map_err(Into::into), + ToClient::V3(data) => serde_bare::to_vec(&data).map_err(Into::into), } } fn deserialize_converters() -> Vec Result> { - vec![Self::v1_to_v2] + vec![Self::v1_to_v2, Self::v2_to_v3] } fn serialize_converters() -> Vec Result> { - vec![Self::v2_to_v1] + vec![Self::v3_to_v2, Self::v2_to_v1] } } @@ -115,78 +118,212 @@ impl ToClient { Ok(ToClient::V2(inner)) } - value @ ToClient::V2(_) => Ok(value), + _ => bail!("unexpected version"), } } - fn v2_to_v1(self) -> Result { - match self { - ToClient::V1(_) => Ok(self), - ToClient::V2(x) => { - let inner = match x { - v2::ToClient::ToClientInit(init) => { - v1::ToClient::ToClientInit(v1::ToClientInit { - runner_id: init.runner_id, - last_event_idx: init.last_event_idx, - metadata: v1::ProtocolMetadata { - runner_lost_threshold: init.metadata.runner_lost_threshold, + fn v2_to_v3(self) -> Result { + if let ToClient::V2(x) = self { + let inner = match x { + v2::ToClient::ToClientInit(init) => v3::ToClient::ToClientInit(v3::ToClientInit { + runner_id: init.runner_id, + last_event_idx: init.last_event_idx, + metadata: v3::ProtocolMetadata { + runner_lost_threshold: init.metadata.runner_lost_threshold, + }, + }), + v2::ToClient::ToClientClose => v3::ToClient::ToClientClose, + v2::ToClient::ToClientCommands(commands) => v3::ToClient::ToClientCommands( + commands + .into_iter() + .map(|cmd| v3::CommandWrapper { + index: cmd.index, + inner: match cmd.inner { + v2::Command::CommandStartActor(start) => { + v3::Command::CommandStartActor(v3::CommandStartActor { + actor_id: start.actor_id, + generation: start.generation, + config: v3::ActorConfig { + name: start.config.name, + key: start.config.key, + create_ts: start.config.create_ts, + input: start.config.input, + }, + hibernating_request_ids: Vec::new(), + }) + } + v2::Command::CommandStopActor(stop) => { + v3::Command::CommandStopActor(v3::CommandStopActor { + actor_id: stop.actor_id, + generation: stop.generation, + }) + } }, }) - } - v2::ToClient::ToClientClose => v1::ToClient::ToClientClose, - v2::ToClient::ToClientCommands(commands) => v1::ToClient::ToClientCommands( - commands - .into_iter() - .map(|cmd| v1::CommandWrapper { - index: cmd.index, - inner: match cmd.inner { - v2::Command::CommandStartActor(start) => { - v1::Command::CommandStartActor(v1::CommandStartActor { - actor_id: start.actor_id, - generation: start.generation, - config: v1::ActorConfig { - name: start.config.name, - key: start.config.key, - create_ts: start.config.create_ts, - input: start.config.input, - }, - }) - } - v2::Command::CommandStopActor(stop) => { - v1::Command::CommandStopActor(v1::CommandStopActor { - actor_id: stop.actor_id, - generation: stop.generation, - }) - } - }, - }) - .collect(), - ), - v2::ToClient::ToClientAckEvents(ack) => { - v1::ToClient::ToClientAckEvents(v1::ToClientAckEvents { - last_event_idx: ack.last_event_idx, - }) - } - v2::ToClient::ToClientKvResponse(resp) => { - v1::ToClient::ToClientKvResponse(v1::ToClientKvResponse { - request_id: resp.request_id, - data: convert_kv_response_data_v2_to_v1(resp.data), + .collect(), + ), + v2::ToClient::ToClientAckEvents(ack) => { + v3::ToClient::ToClientAckEvents(v3::ToClientAckEvents { + last_event_idx: ack.last_event_idx, + }) + } + v2::ToClient::ToClientKvResponse(resp) => { + v3::ToClient::ToClientKvResponse(v3::ToClientKvResponse { + request_id: resp.request_id, + data: convert_kv_response_data_v2_to_v3(resp.data), + }) + } + v2::ToClient::ToClientTunnelMessage(msg) => { + v3::ToClient::ToClientTunnelMessage(v3::ToClientTunnelMessage { + request_id: msg.request_id, + message_id: msg.message_id, + message_kind: convert_to_client_tunnel_message_kind_v2_to_v3( + msg.message_kind, + ), + gateway_reply_to: msg.gateway_reply_to, + }) + } + }; + + Ok(ToClient::V3(inner)) + } else { + bail!("unexpected version"); + } + } + + fn v3_to_v2(self) -> Result { + if let ToClient::V3(x) = self { + let inner = match x { + v3::ToClient::ToClientInit(init) => v2::ToClient::ToClientInit(v2::ToClientInit { + runner_id: init.runner_id, + last_event_idx: init.last_event_idx, + metadata: v2::ProtocolMetadata { + runner_lost_threshold: init.metadata.runner_lost_threshold, + }, + }), + v3::ToClient::ToClientClose => v2::ToClient::ToClientClose, + v3::ToClient::ToClientCommands(commands) => v2::ToClient::ToClientCommands( + commands + .into_iter() + .map(|cmd| v2::CommandWrapper { + index: cmd.index, + inner: match cmd.inner { + v3::Command::CommandStartActor(start) => { + v2::Command::CommandStartActor(v2::CommandStartActor { + actor_id: start.actor_id, + generation: start.generation, + config: v2::ActorConfig { + name: start.config.name, + key: start.config.key, + create_ts: start.config.create_ts, + input: start.config.input, + }, + }) + } + v3::Command::CommandStopActor(stop) => { + v2::Command::CommandStopActor(v2::CommandStopActor { + actor_id: stop.actor_id, + generation: stop.generation, + }) + } + }, }) - } - v2::ToClient::ToClientTunnelMessage(msg) => { - v1::ToClient::ToClientTunnelMessage(v1::ToClientTunnelMessage { - request_id: msg.request_id, - message_id: msg.message_id, - message_kind: convert_to_client_tunnel_message_kind_v2_to_v1( - msg.message_kind, - )?, - gateway_reply_to: msg.gateway_reply_to, + .collect(), + ), + v3::ToClient::ToClientAckEvents(ack) => { + v2::ToClient::ToClientAckEvents(v2::ToClientAckEvents { + last_event_idx: ack.last_event_idx, + }) + } + v3::ToClient::ToClientKvResponse(resp) => { + v2::ToClient::ToClientKvResponse(v2::ToClientKvResponse { + request_id: resp.request_id, + data: convert_kv_response_data_v3_to_v2(resp.data), + }) + } + v3::ToClient::ToClientTunnelMessage(msg) => { + v2::ToClient::ToClientTunnelMessage(v2::ToClientTunnelMessage { + request_id: msg.request_id, + message_id: msg.message_id, + message_kind: convert_to_client_tunnel_message_kind_v3_to_v2( + msg.message_kind, + )?, + gateway_reply_to: msg.gateway_reply_to, + }) + } + }; + + Ok(ToClient::V2(inner)) + } else { + bail!("unexpected version"); + } + } + + fn v2_to_v1(self) -> Result { + if let ToClient::V2(x) = self { + let inner = match x { + v2::ToClient::ToClientInit(init) => v1::ToClient::ToClientInit(v1::ToClientInit { + runner_id: init.runner_id, + last_event_idx: init.last_event_idx, + metadata: v1::ProtocolMetadata { + runner_lost_threshold: init.metadata.runner_lost_threshold, + }, + }), + v2::ToClient::ToClientClose => v1::ToClient::ToClientClose, + v2::ToClient::ToClientCommands(commands) => v1::ToClient::ToClientCommands( + commands + .into_iter() + .map(|cmd| v1::CommandWrapper { + index: cmd.index, + inner: match cmd.inner { + v2::Command::CommandStartActor(start) => { + v1::Command::CommandStartActor(v1::CommandStartActor { + actor_id: start.actor_id, + generation: start.generation, + config: v1::ActorConfig { + name: start.config.name, + key: start.config.key, + create_ts: start.config.create_ts, + input: start.config.input, + }, + }) + } + v2::Command::CommandStopActor(stop) => { + v1::Command::CommandStopActor(v1::CommandStopActor { + actor_id: stop.actor_id, + generation: stop.generation, + }) + } + }, }) - } - }; + .collect(), + ), + v2::ToClient::ToClientAckEvents(ack) => { + v1::ToClient::ToClientAckEvents(v1::ToClientAckEvents { + last_event_idx: ack.last_event_idx, + }) + } + v2::ToClient::ToClientKvResponse(resp) => { + v1::ToClient::ToClientKvResponse(v1::ToClientKvResponse { + request_id: resp.request_id, + data: convert_kv_response_data_v2_to_v1(resp.data), + }) + } + v2::ToClient::ToClientTunnelMessage(msg) => { + v1::ToClient::ToClientTunnelMessage(v1::ToClientTunnelMessage { + request_id: msg.request_id, + message_id: msg.message_id, + message_kind: convert_to_client_tunnel_message_kind_v2_to_v1( + msg.message_kind, + )?, + gateway_reply_to: msg.gateway_reply_to, + }) + } + }; - Ok(ToClient::V1(inner)) - } + Ok(ToClient::V1(inner)) + } else { + bail!("unexpected version"); } } } @@ -194,17 +331,18 @@ impl ToClient { pub enum ToServer { V1(v1::ToServer), V2(v2::ToServer), + V3(v3::ToServer), } impl OwnedVersionedData for ToServer { - type Latest = v2::ToServer; + type Latest = v3::ToServer; - fn wrap_latest(latest: v2::ToServer) -> Self { - ToServer::V2(latest) + fn wrap_latest(latest: v3::ToServer) -> Self { + ToServer::V3(latest) } fn unwrap_latest(self) -> Result { - if let ToServer::V2(data) = self { + if let ToServer::V3(data) = self { Ok(data) } else { bail!("version not latest"); @@ -215,6 +353,7 @@ impl OwnedVersionedData for ToServer { match version { 1 => Ok(ToServer::V1(serde_bare::from_slice(payload)?)), 2 => Ok(ToServer::V2(serde_bare::from_slice(payload)?)), + 3 => Ok(ToServer::V3(serde_bare::from_slice(payload)?)), _ => bail!("invalid version: {version}"), } } @@ -223,169 +362,292 @@ impl OwnedVersionedData for ToServer { match self { ToServer::V1(data) => serde_bare::to_vec(&data).map_err(Into::into), ToServer::V2(data) => serde_bare::to_vec(&data).map_err(Into::into), + ToServer::V3(data) => serde_bare::to_vec(&data).map_err(Into::into), } } fn deserialize_converters() -> Vec Result> { - vec![Self::v1_to_v2] + vec![Self::v1_to_v2, Self::v2_to_v3] } fn serialize_converters() -> Vec Result> { - vec![Self::v2_to_v1] + vec![Self::v3_to_v2, Self::v2_to_v1] } } impl ToServer { fn v1_to_v2(self) -> Result { - match self { - ToServer::V1(x) => { - let inner = match x { - v1::ToServer::ToServerInit(init) => { - v2::ToServer::ToServerInit(v2::ToServerInit { - name: init.name, - version: init.version, - total_slots: init.total_slots, - last_command_idx: init.last_command_idx, - prepopulate_actor_names: init.prepopulate_actor_names.map(|map| { - map.into_iter() - .map(|(k, v)| { - ( - k, - v2::ActorName { - metadata: v.metadata, - }, - ) - }) - .collect() - }), - metadata: init.metadata, - }) - } - v1::ToServer::ToServerEvents(events) => v2::ToServer::ToServerEvents( - events - .into_iter() - .map(|event| v2::EventWrapper { - index: event.index, - inner: convert_event_v1_to_v2(event.inner), + if let ToServer::V1(x) = self { + let inner = match x { + v1::ToServer::ToServerInit(init) => v2::ToServer::ToServerInit(v2::ToServerInit { + name: init.name, + version: init.version, + total_slots: init.total_slots, + last_command_idx: init.last_command_idx, + prepopulate_actor_names: init.prepopulate_actor_names.map(|map| { + map.into_iter() + .map(|(k, v)| { + ( + k, + v2::ActorName { + metadata: v.metadata, + }, + ) }) - .collect(), - ), - v1::ToServer::ToServerAckCommands(ack) => { - v2::ToServer::ToServerAckCommands(v2::ToServerAckCommands { - last_command_idx: ack.last_command_idx, + .collect() + }), + metadata: init.metadata, + }), + v1::ToServer::ToServerEvents(events) => v2::ToServer::ToServerEvents( + events + .into_iter() + .map(|event| v2::EventWrapper { + index: event.index, + inner: convert_event_v1_to_v2(event.inner), }) - } - v1::ToServer::ToServerStopping => v2::ToServer::ToServerStopping, - v1::ToServer::ToServerPing(ping) => { - v2::ToServer::ToServerPing(v2::ToServerPing { ts: ping.ts }) - } - v1::ToServer::ToServerKvRequest(req) => { - v2::ToServer::ToServerKvRequest(v2::ToServerKvRequest { - actor_id: req.actor_id, - request_id: req.request_id, - data: convert_kv_request_data_v1_to_v2(req.data), + .collect(), + ), + v1::ToServer::ToServerAckCommands(ack) => { + v2::ToServer::ToServerAckCommands(v2::ToServerAckCommands { + last_command_idx: ack.last_command_idx, + }) + } + v1::ToServer::ToServerStopping => v2::ToServer::ToServerStopping, + v1::ToServer::ToServerPing(ping) => { + v2::ToServer::ToServerPing(v2::ToServerPing { ts: ping.ts }) + } + v1::ToServer::ToServerKvRequest(req) => { + v2::ToServer::ToServerKvRequest(v2::ToServerKvRequest { + actor_id: req.actor_id, + request_id: req.request_id, + data: convert_kv_request_data_v1_to_v2(req.data), + }) + } + v1::ToServer::ToServerTunnelMessage(msg) => { + v2::ToServer::ToServerTunnelMessage(v2::ToServerTunnelMessage { + request_id: msg.request_id, + message_id: msg.message_id, + message_kind: convert_to_server_tunnel_message_kind_v1_to_v2( + msg.message_kind, + ), + }) + } + }; + + Ok(ToServer::V2(inner)) + } else { + bail!("unexpected version"); + } + } + + fn v2_to_v3(self) -> Result { + if let ToServer::V2(x) = self { + let inner = match x { + v2::ToServer::ToServerInit(init) => v3::ToServer::ToServerInit(v3::ToServerInit { + name: init.name, + version: init.version, + total_slots: init.total_slots, + last_command_idx: init.last_command_idx, + prepopulate_actor_names: init.prepopulate_actor_names.map(|map| { + map.into_iter() + .map(|(k, v)| { + ( + k, + v3::ActorName { + metadata: v.metadata, + }, + ) + }) + .collect() + }), + metadata: init.metadata, + }), + v2::ToServer::ToServerEvents(events) => v3::ToServer::ToServerEvents( + events + .into_iter() + .map(|event| v3::EventWrapper { + index: event.index, + inner: convert_event_v2_to_v3(event.inner), }) - } - v1::ToServer::ToServerTunnelMessage(msg) => { - v2::ToServer::ToServerTunnelMessage(v2::ToServerTunnelMessage { - request_id: msg.request_id, - message_id: msg.message_id, - message_kind: convert_to_server_tunnel_message_kind_v1_to_v2( - msg.message_kind, - ), + .collect(), + ), + v2::ToServer::ToServerAckCommands(ack) => { + v3::ToServer::ToServerAckCommands(v3::ToServerAckCommands { + last_command_idx: ack.last_command_idx, + }) + } + v2::ToServer::ToServerStopping => v3::ToServer::ToServerStopping, + v2::ToServer::ToServerPing(ping) => { + v3::ToServer::ToServerPing(v3::ToServerPing { ts: ping.ts }) + } + v2::ToServer::ToServerKvRequest(req) => { + v3::ToServer::ToServerKvRequest(v3::ToServerKvRequest { + actor_id: req.actor_id, + request_id: req.request_id, + data: convert_kv_request_data_v2_to_v3(req.data), + }) + } + v2::ToServer::ToServerTunnelMessage(msg) => { + v3::ToServer::ToServerTunnelMessage(v3::ToServerTunnelMessage { + request_id: msg.request_id, + message_id: msg.message_id, + message_kind: convert_to_server_tunnel_message_kind_v2_to_v3( + msg.message_kind, + ), + }) + } + }; + + Ok(ToServer::V3(inner)) + } else { + bail!("unexpected version"); + } + } + + fn v3_to_v2(self) -> Result { + if let ToServer::V3(x) = self { + let inner = match x { + v3::ToServer::ToServerInit(init) => v2::ToServer::ToServerInit(v2::ToServerInit { + name: init.name, + version: init.version, + total_slots: init.total_slots, + last_command_idx: init.last_command_idx, + prepopulate_actor_names: init.prepopulate_actor_names.map(|map| { + map.into_iter() + .map(|(k, v)| { + ( + k, + v2::ActorName { + metadata: v.metadata, + }, + ) + }) + .collect() + }), + metadata: init.metadata, + }), + v3::ToServer::ToServerEvents(events) => v2::ToServer::ToServerEvents( + events + .into_iter() + .map(|event| v2::EventWrapper { + index: event.index, + inner: convert_event_v3_to_v2(event.inner), }) - } - }; + .collect(), + ), + v3::ToServer::ToServerAckCommands(ack) => { + v2::ToServer::ToServerAckCommands(v2::ToServerAckCommands { + last_command_idx: ack.last_command_idx, + }) + } + v3::ToServer::ToServerStopping => v2::ToServer::ToServerStopping, + v3::ToServer::ToServerPing(ping) => { + v2::ToServer::ToServerPing(v2::ToServerPing { ts: ping.ts }) + } + v3::ToServer::ToServerKvRequest(req) => { + v2::ToServer::ToServerKvRequest(v2::ToServerKvRequest { + actor_id: req.actor_id, + request_id: req.request_id, + data: convert_kv_request_data_v3_to_v2(req.data), + }) + } + v3::ToServer::ToServerTunnelMessage(msg) => { + v2::ToServer::ToServerTunnelMessage(v2::ToServerTunnelMessage { + request_id: msg.request_id, + message_id: msg.message_id, + message_kind: convert_to_server_tunnel_message_kind_v3_to_v2( + msg.message_kind, + )?, + }) + } + }; - Ok(ToServer::V2(inner)) - } - value @ ToServer::V2(_) => Ok(value), + Ok(ToServer::V2(inner)) + } else { + bail!("unexpected version"); } } fn v2_to_v1(self) -> Result { - match self { - ToServer::V1(_) => Ok(self), - ToServer::V2(x) => { - let inner = match x { - v2::ToServer::ToServerInit(init) => { - v1::ToServer::ToServerInit(v1::ToServerInit { - name: init.name, - version: init.version, - total_slots: init.total_slots, - last_command_idx: init.last_command_idx, - prepopulate_actor_names: init.prepopulate_actor_names.map(|map| { - map.into_iter() - .map(|(k, v)| { - ( - k, - v1::ActorName { - metadata: v.metadata, - }, - ) - }) - .collect() - }), - metadata: init.metadata, - }) - } - v2::ToServer::ToServerEvents(events) => v1::ToServer::ToServerEvents( - events - .into_iter() - .map(|event| v1::EventWrapper { - index: event.index, - inner: convert_event_v2_to_v1(event.inner), + if let ToServer::V2(x) = self { + let inner = match x { + v2::ToServer::ToServerInit(init) => v1::ToServer::ToServerInit(v1::ToServerInit { + name: init.name, + version: init.version, + total_slots: init.total_slots, + last_command_idx: init.last_command_idx, + prepopulate_actor_names: init.prepopulate_actor_names.map(|map| { + map.into_iter() + .map(|(k, v)| { + ( + k, + v1::ActorName { + metadata: v.metadata, + }, + ) }) - .collect(), - ), - v2::ToServer::ToServerAckCommands(ack) => { - v1::ToServer::ToServerAckCommands(v1::ToServerAckCommands { - last_command_idx: ack.last_command_idx, - }) - } - v2::ToServer::ToServerStopping => v1::ToServer::ToServerStopping, - v2::ToServer::ToServerPing(ping) => { - v1::ToServer::ToServerPing(v1::ToServerPing { ts: ping.ts }) - } - v2::ToServer::ToServerKvRequest(req) => { - v1::ToServer::ToServerKvRequest(v1::ToServerKvRequest { - actor_id: req.actor_id, - request_id: req.request_id, - data: convert_kv_request_data_v2_to_v1(req.data), + .collect() + }), + metadata: init.metadata, + }), + v2::ToServer::ToServerEvents(events) => v1::ToServer::ToServerEvents( + events + .into_iter() + .map(|event| v1::EventWrapper { + index: event.index, + inner: convert_event_v2_to_v1(event.inner), }) - } - v2::ToServer::ToServerTunnelMessage(msg) => { - v1::ToServer::ToServerTunnelMessage(v1::ToServerTunnelMessage { - request_id: msg.request_id, - message_id: msg.message_id, - message_kind: convert_to_server_tunnel_message_kind_v2_to_v1( - msg.message_kind, - )?, - }) - } - }; + .collect(), + ), + v2::ToServer::ToServerAckCommands(ack) => { + v1::ToServer::ToServerAckCommands(v1::ToServerAckCommands { + last_command_idx: ack.last_command_idx, + }) + } + v2::ToServer::ToServerStopping => v1::ToServer::ToServerStopping, + v2::ToServer::ToServerPing(ping) => { + v1::ToServer::ToServerPing(v1::ToServerPing { ts: ping.ts }) + } + v2::ToServer::ToServerKvRequest(req) => { + v1::ToServer::ToServerKvRequest(v1::ToServerKvRequest { + actor_id: req.actor_id, + request_id: req.request_id, + data: convert_kv_request_data_v2_to_v1(req.data), + }) + } + v2::ToServer::ToServerTunnelMessage(msg) => { + v1::ToServer::ToServerTunnelMessage(v1::ToServerTunnelMessage { + request_id: msg.request_id, + message_id: msg.message_id, + message_kind: convert_to_server_tunnel_message_kind_v2_to_v1( + msg.message_kind, + )?, + }) + } + }; - Ok(ToServer::V1(inner)) - } + Ok(ToServer::V1(inner)) + } else { + bail!("unexpected version"); } } } pub enum ToGateway { - // No change between v1 and v2 - V2(v2::ToGateway), + // No change between v1 and v3 + V3(v3::ToGateway), } impl OwnedVersionedData for ToGateway { - type Latest = v2::ToGateway; + type Latest = v3::ToGateway; - fn wrap_latest(latest: v2::ToGateway) -> Self { - ToGateway::V2(latest) + fn wrap_latest(latest: v3::ToGateway) -> Self { + ToGateway::V3(latest) } fn unwrap_latest(self) -> Result { #[allow(irrefutable_let_patterns)] - if let ToGateway::V2(data) = self { + if let ToGateway::V3(data) = self { Ok(data) } else { bail!("version not latest"); @@ -394,33 +656,33 @@ impl OwnedVersionedData for ToGateway { fn deserialize_version(payload: &[u8], version: u16) -> Result { match version { - 1 | 2 => Ok(ToGateway::V2(serde_bare::from_slice(payload)?)), + 1 | 2 | 3 => Ok(ToGateway::V3(serde_bare::from_slice(payload)?)), _ => bail!("invalid version: {version}"), } } fn serialize_version(self, _version: u16) -> Result> { match self { - ToGateway::V2(data) => serde_bare::to_vec(&data).map_err(Into::into), + ToGateway::V3(data) => serde_bare::to_vec(&data).map_err(Into::into), } } } pub enum ToServerlessServer { - // No change between v1 and v2 - V2(v2::ToServerlessServer), + // No change between v1 and v3 + V3(v3::ToServerlessServer), } impl OwnedVersionedData for ToServerlessServer { - type Latest = v2::ToServerlessServer; + type Latest = v3::ToServerlessServer; - fn wrap_latest(latest: v2::ToServerlessServer) -> Self { - ToServerlessServer::V2(latest) + fn wrap_latest(latest: v3::ToServerlessServer) -> Self { + ToServerlessServer::V3(latest) } fn unwrap_latest(self) -> Result { #[allow(irrefutable_let_patterns)] - if let ToServerlessServer::V2(data) = self { + if let ToServerlessServer::V3(data) = self { Ok(data) } else { bail!("version not latest"); @@ -429,14 +691,14 @@ impl OwnedVersionedData for ToServerlessServer { fn deserialize_version(payload: &[u8], version: u16) -> Result { match version { - 1 | 2 => Ok(ToServerlessServer::V2(serde_bare::from_slice(payload)?)), + 1 | 2 | 3 => Ok(ToServerlessServer::V3(serde_bare::from_slice(payload)?)), _ => bail!("invalid version: {version}"), } } fn serialize_version(self, _version: u16) -> Result> { match self { - ToServerlessServer::V2(data) => serde_bare::to_vec(&data).map_err(Into::into), + ToServerlessServer::V3(data) => serde_bare::to_vec(&data).map_err(Into::into), } } } @@ -885,3 +1147,457 @@ fn convert_kv_metadata_v2_to_v1(metadata: v2::KvMetadata) -> v1::KvMetadata { create_ts: metadata.create_ts, } } + +fn convert_to_client_tunnel_message_kind_v2_to_v3( + kind: v2::ToClientTunnelMessageKind, +) -> v3::ToClientTunnelMessageKind { + match kind { + v2::ToClientTunnelMessageKind::TunnelAck => v3::ToClientTunnelMessageKind::TunnelAck, + v2::ToClientTunnelMessageKind::ToClientRequestStart(req) => { + v3::ToClientTunnelMessageKind::ToClientRequestStart(v3::ToClientRequestStart { + actor_id: req.actor_id, + method: req.method, + path: req.path, + headers: req.headers, + body: req.body, + stream: req.stream, + }) + } + v2::ToClientTunnelMessageKind::ToClientRequestChunk(chunk) => { + v3::ToClientTunnelMessageKind::ToClientRequestChunk(v3::ToClientRequestChunk { + body: chunk.body, + finish: chunk.finish, + }) + } + v2::ToClientTunnelMessageKind::ToClientRequestAbort => { + v3::ToClientTunnelMessageKind::ToClientRequestAbort + } + v2::ToClientTunnelMessageKind::ToClientWebSocketOpen(ws) => { + v3::ToClientTunnelMessageKind::ToClientWebSocketOpen(v3::ToClientWebSocketOpen { + actor_id: ws.actor_id, + path: ws.path, + headers: ws.headers, + }) + } + v2::ToClientTunnelMessageKind::ToClientWebSocketMessage(msg) => { + v3::ToClientTunnelMessageKind::ToClientWebSocketMessage(v3::ToClientWebSocketMessage { + // Default to 0 for v2 messages (hibernation disabled by default) + index: 0, + data: msg.data, + binary: msg.binary, + }) + } + v2::ToClientTunnelMessageKind::ToClientWebSocketClose(close) => { + v3::ToClientTunnelMessageKind::ToClientWebSocketClose(v3::ToClientWebSocketClose { + code: close.code, + reason: close.reason, + }) + } + } +} + +fn convert_to_client_tunnel_message_kind_v3_to_v2( + kind: v3::ToClientTunnelMessageKind, +) -> Result { + Ok(match kind { + v3::ToClientTunnelMessageKind::TunnelAck => v2::ToClientTunnelMessageKind::TunnelAck, + v3::ToClientTunnelMessageKind::ToClientRequestStart(req) => { + v2::ToClientTunnelMessageKind::ToClientRequestStart(v2::ToClientRequestStart { + actor_id: req.actor_id, + method: req.method, + path: req.path, + headers: req.headers, + body: req.body, + stream: req.stream, + }) + } + v3::ToClientTunnelMessageKind::ToClientRequestChunk(chunk) => { + v2::ToClientTunnelMessageKind::ToClientRequestChunk(v2::ToClientRequestChunk { + body: chunk.body, + finish: chunk.finish, + }) + } + v3::ToClientTunnelMessageKind::ToClientRequestAbort => { + v2::ToClientTunnelMessageKind::ToClientRequestAbort + } + v3::ToClientTunnelMessageKind::ToClientWebSocketOpen(ws) => { + v2::ToClientTunnelMessageKind::ToClientWebSocketOpen(v2::ToClientWebSocketOpen { + actor_id: ws.actor_id, + path: ws.path, + headers: ws.headers, + }) + } + v3::ToClientTunnelMessageKind::ToClientWebSocketMessage(msg) => { + v2::ToClientTunnelMessageKind::ToClientWebSocketMessage(v2::ToClientWebSocketMessage { + data: msg.data, + binary: msg.binary, + index: msg.index, + }) + } + v3::ToClientTunnelMessageKind::ToClientWebSocketClose(close) => { + v2::ToClientTunnelMessageKind::ToClientWebSocketClose(v2::ToClientWebSocketClose { + code: close.code, + reason: close.reason, + }) + } + }) +} + +fn convert_to_server_tunnel_message_kind_v2_to_v3( + kind: v2::ToServerTunnelMessageKind, +) -> v3::ToServerTunnelMessageKind { + match kind { + v2::ToServerTunnelMessageKind::TunnelAck => v3::ToServerTunnelMessageKind::TunnelAck, + v2::ToServerTunnelMessageKind::ToServerResponseStart(resp) => { + v3::ToServerTunnelMessageKind::ToServerResponseStart(v3::ToServerResponseStart { + status: resp.status, + headers: resp.headers, + body: resp.body, + stream: resp.stream, + }) + } + v2::ToServerTunnelMessageKind::ToServerResponseChunk(chunk) => { + v3::ToServerTunnelMessageKind::ToServerResponseChunk(v3::ToServerResponseChunk { + body: chunk.body, + finish: chunk.finish, + }) + } + v2::ToServerTunnelMessageKind::ToServerResponseAbort => { + v3::ToServerTunnelMessageKind::ToServerResponseAbort + } + v2::ToServerTunnelMessageKind::ToServerWebSocketOpen(open) => { + v3::ToServerTunnelMessageKind::ToServerWebSocketOpen(v3::ToServerWebSocketOpen { + can_hibernate: open.can_hibernate, + }) + } + v2::ToServerTunnelMessageKind::ToServerWebSocketMessage(msg) => { + v3::ToServerTunnelMessageKind::ToServerWebSocketMessage(v3::ToServerWebSocketMessage { + data: msg.data, + binary: msg.binary, + }) + } + v2::ToServerTunnelMessageKind::ToServerWebSocketMessageAck(ack) => { + v3::ToServerTunnelMessageKind::ToServerWebSocketMessageAck( + v3::ToServerWebSocketMessageAck { index: ack.index }, + ) + } + v2::ToServerTunnelMessageKind::ToServerWebSocketClose(close) => { + v3::ToServerTunnelMessageKind::ToServerWebSocketClose(v3::ToServerWebSocketClose { + code: close.code, + reason: close.reason, + retry: close.retry, + }) + } + } +} + +fn convert_to_server_tunnel_message_kind_v3_to_v2( + kind: v3::ToServerTunnelMessageKind, +) -> Result { + Ok(match kind { + v3::ToServerTunnelMessageKind::TunnelAck => v2::ToServerTunnelMessageKind::TunnelAck, + v3::ToServerTunnelMessageKind::ToServerResponseStart(resp) => { + v2::ToServerTunnelMessageKind::ToServerResponseStart(v2::ToServerResponseStart { + status: resp.status, + headers: resp.headers, + body: resp.body, + stream: resp.stream, + }) + } + v3::ToServerTunnelMessageKind::ToServerResponseChunk(chunk) => { + v2::ToServerTunnelMessageKind::ToServerResponseChunk(v2::ToServerResponseChunk { + body: chunk.body, + finish: chunk.finish, + }) + } + v3::ToServerTunnelMessageKind::ToServerResponseAbort => { + v2::ToServerTunnelMessageKind::ToServerResponseAbort + } + v3::ToServerTunnelMessageKind::ToServerWebSocketOpen(open) => { + v2::ToServerTunnelMessageKind::ToServerWebSocketOpen(v2::ToServerWebSocketOpen { + can_hibernate: open.can_hibernate, + last_msg_index: -1, + }) + } + v3::ToServerTunnelMessageKind::ToServerWebSocketMessage(msg) => { + v2::ToServerTunnelMessageKind::ToServerWebSocketMessage(v2::ToServerWebSocketMessage { + data: msg.data, + binary: msg.binary, + }) + } + v3::ToServerTunnelMessageKind::ToServerWebSocketMessageAck(ack) => { + v2::ToServerTunnelMessageKind::ToServerWebSocketMessageAck( + v2::ToServerWebSocketMessageAck { index: ack.index }, + ) + } + v3::ToServerTunnelMessageKind::ToServerWebSocketClose(close) => { + v2::ToServerTunnelMessageKind::ToServerWebSocketClose(v2::ToServerWebSocketClose { + code: close.code, + reason: close.reason, + retry: close.retry, + }) + } + }) +} + +fn convert_event_v2_to_v3(event: v2::Event) -> v3::Event { + match event { + v2::Event::EventActorIntent(intent) => v3::Event::EventActorIntent(v3::EventActorIntent { + actor_id: intent.actor_id, + generation: intent.generation, + intent: convert_actor_intent_v2_to_v3(intent.intent), + }), + v2::Event::EventActorStateUpdate(state) => { + v3::Event::EventActorStateUpdate(v3::EventActorStateUpdate { + actor_id: state.actor_id, + generation: state.generation, + state: convert_actor_state_v2_to_v3(state.state), + }) + } + v2::Event::EventActorSetAlarm(alarm) => { + v3::Event::EventActorSetAlarm(v3::EventActorSetAlarm { + actor_id: alarm.actor_id, + generation: alarm.generation, + alarm_ts: alarm.alarm_ts, + }) + } + } +} + +fn convert_event_v3_to_v2(event: v3::Event) -> v2::Event { + match event { + v3::Event::EventActorIntent(intent) => v2::Event::EventActorIntent(v2::EventActorIntent { + actor_id: intent.actor_id, + generation: intent.generation, + intent: convert_actor_intent_v3_to_v2(intent.intent), + }), + v3::Event::EventActorStateUpdate(state) => { + v2::Event::EventActorStateUpdate(v2::EventActorStateUpdate { + actor_id: state.actor_id, + generation: state.generation, + state: convert_actor_state_v3_to_v2(state.state), + }) + } + v3::Event::EventActorSetAlarm(alarm) => { + v2::Event::EventActorSetAlarm(v2::EventActorSetAlarm { + actor_id: alarm.actor_id, + generation: alarm.generation, + alarm_ts: alarm.alarm_ts, + }) + } + } +} + +fn convert_actor_intent_v2_to_v3(intent: v2::ActorIntent) -> v3::ActorIntent { + match intent { + v2::ActorIntent::ActorIntentSleep => v3::ActorIntent::ActorIntentSleep, + v2::ActorIntent::ActorIntentStop => v3::ActorIntent::ActorIntentStop, + } +} + +fn convert_actor_intent_v3_to_v2(intent: v3::ActorIntent) -> v2::ActorIntent { + match intent { + v3::ActorIntent::ActorIntentSleep => v2::ActorIntent::ActorIntentSleep, + v3::ActorIntent::ActorIntentStop => v2::ActorIntent::ActorIntentStop, + } +} + +fn convert_actor_state_v2_to_v3(state: v2::ActorState) -> v3::ActorState { + match state { + v2::ActorState::ActorStateRunning => v3::ActorState::ActorStateRunning, + v2::ActorState::ActorStateStopped(stopped) => { + v3::ActorState::ActorStateStopped(v3::ActorStateStopped { + code: convert_stop_code_v2_to_v3(stopped.code), + message: stopped.message, + }) + } + } +} + +fn convert_actor_state_v3_to_v2(state: v3::ActorState) -> v2::ActorState { + match state { + v3::ActorState::ActorStateRunning => v2::ActorState::ActorStateRunning, + v3::ActorState::ActorStateStopped(stopped) => { + v2::ActorState::ActorStateStopped(v2::ActorStateStopped { + code: convert_stop_code_v3_to_v2(stopped.code), + message: stopped.message, + }) + } + } +} + +fn convert_stop_code_v2_to_v3(code: v2::StopCode) -> v3::StopCode { + match code { + v2::StopCode::Ok => v3::StopCode::Ok, + v2::StopCode::Error => v3::StopCode::Error, + } +} + +fn convert_stop_code_v3_to_v2(code: v3::StopCode) -> v2::StopCode { + match code { + v3::StopCode::Ok => v2::StopCode::Ok, + v3::StopCode::Error => v2::StopCode::Error, + } +} + +fn convert_kv_request_data_v2_to_v3(data: v2::KvRequestData) -> v3::KvRequestData { + match data { + v2::KvRequestData::KvGetRequest(req) => { + v3::KvRequestData::KvGetRequest(v3::KvGetRequest { keys: req.keys }) + } + v2::KvRequestData::KvListRequest(req) => { + v3::KvRequestData::KvListRequest(v3::KvListRequest { + query: convert_kv_list_query_v2_to_v3(req.query), + reverse: req.reverse, + limit: req.limit, + }) + } + v2::KvRequestData::KvPutRequest(req) => v3::KvRequestData::KvPutRequest(v3::KvPutRequest { + keys: req.keys, + values: req.values, + }), + v2::KvRequestData::KvDeleteRequest(req) => { + v3::KvRequestData::KvDeleteRequest(v3::KvDeleteRequest { keys: req.keys }) + } + v2::KvRequestData::KvDropRequest => v3::KvRequestData::KvDropRequest, + } +} + +fn convert_kv_request_data_v3_to_v2(data: v3::KvRequestData) -> v2::KvRequestData { + match data { + v3::KvRequestData::KvGetRequest(req) => { + v2::KvRequestData::KvGetRequest(v2::KvGetRequest { keys: req.keys }) + } + v3::KvRequestData::KvListRequest(req) => { + v2::KvRequestData::KvListRequest(v2::KvListRequest { + query: convert_kv_list_query_v3_to_v2(req.query), + reverse: req.reverse, + limit: req.limit, + }) + } + v3::KvRequestData::KvPutRequest(req) => v2::KvRequestData::KvPutRequest(v2::KvPutRequest { + keys: req.keys, + values: req.values, + }), + v3::KvRequestData::KvDeleteRequest(req) => { + v2::KvRequestData::KvDeleteRequest(v2::KvDeleteRequest { keys: req.keys }) + } + v3::KvRequestData::KvDropRequest => v2::KvRequestData::KvDropRequest, + } +} + +fn convert_kv_response_data_v2_to_v3(data: v2::KvResponseData) -> v3::KvResponseData { + match data { + v2::KvResponseData::KvErrorResponse(err) => { + v3::KvResponseData::KvErrorResponse(v3::KvErrorResponse { + message: err.message, + }) + } + v2::KvResponseData::KvGetResponse(resp) => { + v3::KvResponseData::KvGetResponse(v3::KvGetResponse { + keys: resp.keys, + values: resp.values, + metadata: resp + .metadata + .into_iter() + .map(convert_kv_metadata_v2_to_v3) + .collect(), + }) + } + v2::KvResponseData::KvListResponse(resp) => { + v3::KvResponseData::KvListResponse(v3::KvListResponse { + keys: resp.keys, + values: resp.values, + metadata: resp + .metadata + .into_iter() + .map(convert_kv_metadata_v2_to_v3) + .collect(), + }) + } + v2::KvResponseData::KvPutResponse => v3::KvResponseData::KvPutResponse, + v2::KvResponseData::KvDeleteResponse => v3::KvResponseData::KvDeleteResponse, + v2::KvResponseData::KvDropResponse => v3::KvResponseData::KvDropResponse, + } +} + +fn convert_kv_response_data_v3_to_v2(data: v3::KvResponseData) -> v2::KvResponseData { + match data { + v3::KvResponseData::KvErrorResponse(err) => { + v2::KvResponseData::KvErrorResponse(v2::KvErrorResponse { + message: err.message, + }) + } + v3::KvResponseData::KvGetResponse(resp) => { + v2::KvResponseData::KvGetResponse(v2::KvGetResponse { + keys: resp.keys, + values: resp.values, + metadata: resp + .metadata + .into_iter() + .map(convert_kv_metadata_v3_to_v2) + .collect(), + }) + } + v3::KvResponseData::KvListResponse(resp) => { + v2::KvResponseData::KvListResponse(v2::KvListResponse { + keys: resp.keys, + values: resp.values, + metadata: resp + .metadata + .into_iter() + .map(convert_kv_metadata_v3_to_v2) + .collect(), + }) + } + v3::KvResponseData::KvPutResponse => v2::KvResponseData::KvPutResponse, + v3::KvResponseData::KvDeleteResponse => v2::KvResponseData::KvDeleteResponse, + v3::KvResponseData::KvDropResponse => v2::KvResponseData::KvDropResponse, + } +} + +fn convert_kv_list_query_v2_to_v3(query: v2::KvListQuery) -> v3::KvListQuery { + match query { + v2::KvListQuery::KvListAllQuery => v3::KvListQuery::KvListAllQuery, + v2::KvListQuery::KvListRangeQuery(range) => { + v3::KvListQuery::KvListRangeQuery(v3::KvListRangeQuery { + start: range.start, + end: range.end, + exclusive: range.exclusive, + }) + } + v2::KvListQuery::KvListPrefixQuery(prefix) => { + v3::KvListQuery::KvListPrefixQuery(v3::KvListPrefixQuery { key: prefix.key }) + } + } +} + +fn convert_kv_list_query_v3_to_v2(query: v3::KvListQuery) -> v2::KvListQuery { + match query { + v3::KvListQuery::KvListAllQuery => v2::KvListQuery::KvListAllQuery, + v3::KvListQuery::KvListRangeQuery(range) => { + v2::KvListQuery::KvListRangeQuery(v2::KvListRangeQuery { + start: range.start, + end: range.end, + exclusive: range.exclusive, + }) + } + v3::KvListQuery::KvListPrefixQuery(prefix) => { + v2::KvListQuery::KvListPrefixQuery(v2::KvListPrefixQuery { key: prefix.key }) + } + } +} + +fn convert_kv_metadata_v2_to_v3(metadata: v2::KvMetadata) -> v3::KvMetadata { + v3::KvMetadata { + version: metadata.version, + create_ts: metadata.create_ts, + } +} + +fn convert_kv_metadata_v3_to_v2(metadata: v3::KvMetadata) -> v2::KvMetadata { + v2::KvMetadata { + version: metadata.version, + create_ts: metadata.create_ts, + } +} diff --git a/engine/sdks/schemas/runner-protocol/v3.bare b/engine/sdks/schemas/runner-protocol/v3.bare new file mode 100644 index 0000000000..769807fe63 --- /dev/null +++ b/engine/sdks/schemas/runner-protocol/v3.bare @@ -0,0 +1,403 @@ +# Runner Protocol v1 + +# MARK: Core Primitives + +type Id str +type Json str + +# MARK: KV + +# Basic types +type KvKey data +type KvValue data +type KvMetadata struct { + version: data + createTs: i64 +} + +# Query types +type KvListAllQuery void +type KvListRangeQuery struct { + start: KvKey + end: KvKey + exclusive: bool +} + +type KvListPrefixQuery struct { + key: KvKey +} + +type KvListQuery union { + KvListAllQuery | + KvListRangeQuery | + KvListPrefixQuery +} + +# Request types +type KvGetRequest struct { + keys: list +} + +type KvListRequest struct { + query: KvListQuery + reverse: optional + limit: optional +} + +type KvPutRequest struct { + keys: list + values: list +} + +type KvDeleteRequest struct { + keys: list +} + +type KvDropRequest void + +# Response types +type KvErrorResponse struct { + message: str +} + +type KvGetResponse struct { + keys: list + values: list + metadata: list +} + +type KvListResponse struct { + keys: list + values: list + metadata: list +} + +type KvPutResponse void +type KvDeleteResponse void +type KvDropResponse void + +# Request/Response unions +type KvRequestData union { + KvGetRequest | + KvListRequest | + KvPutRequest | + KvDeleteRequest | + KvDropRequest +} + +type KvResponseData union { + KvErrorResponse | + KvGetResponse | + KvListResponse | + KvPutResponse | + KvDeleteResponse | + KvDropResponse +} + +# MARK: Actor + +# Core +type StopCode enum { + OK + ERROR +} + +type ActorName struct { + metadata: Json +} + +type ActorConfig struct { + name: str + key: optional + createTs: i64 + input: optional +} + +# Intent +type ActorIntentSleep void + +type ActorIntentStop void + +type ActorIntent union { + ActorIntentSleep | + ActorIntentStop +} + +# State +type ActorStateRunning void + +type ActorStateStopped struct { + code: StopCode + message: optional +} + +type ActorState union { + ActorStateRunning | + ActorStateStopped +} + +# MARK: Events +type EventActorIntent struct { + actorId: Id + generation: u32 + intent: ActorIntent +} + +type EventActorStateUpdate struct { + actorId: Id + generation: u32 + state: ActorState +} + +type EventActorSetAlarm struct { + actorId: Id + generation: u32 + alarmTs: optional +} + +type Event union { + EventActorIntent | + EventActorStateUpdate | + EventActorSetAlarm +} + +type EventWrapper struct { + index: i64 + inner: Event +} + +# MARK: Commands +# +type CommandStartActor struct { + actorId: Id + generation: u32 + config: ActorConfig + hibernatingRequestIds: list +} + +type CommandStopActor struct { + actorId: Id + generation: u32 +} + +type Command union { + CommandStartActor | + CommandStopActor +} + +type CommandWrapper struct { + index: i64 + inner: Command +} + +# MARK: Tunnel + +type RequestId data[16] # UUIDv4 +type MessageId data[16] # UUIDv4 + + +# Ack +type TunnelAck void + +# HTTP +type ToClientRequestStart struct { + actorId: Id + method: str + path: str + headers: map + body: optional + stream: bool +} + +type ToClientRequestChunk struct { + body: data + finish: bool +} + +type ToClientRequestAbort void + +type ToServerResponseStart struct { + status: u16 + headers: map + body: optional + stream: bool +} + +type ToServerResponseChunk struct { + body: data + finish: bool +} + +type ToServerResponseAbort void + +# WebSocket +type ToClientWebSocketOpen struct { + actorId: Id + path: str + headers: map +} + +type ToClientWebSocketMessage struct { + index: u16 + data: data + binary: bool +} + +type ToClientWebSocketClose struct { + code: optional + reason: optional +} + +type ToServerWebSocketOpen struct { + canHibernate: bool +} + +type ToServerWebSocketMessage struct { + data: data + binary: bool +} + +type ToServerWebSocketMessageAck struct { + index: u16 +} + +type ToServerWebSocketClose struct { + code: optional + reason: optional + retry: bool +} + +# To Server +type ToServerTunnelMessageKind union { + TunnelAck | + + # HTTP + ToServerResponseStart | + ToServerResponseChunk | + ToServerResponseAbort | + + # WebSocket + ToServerWebSocketOpen | + ToServerWebSocketMessage | + ToServerWebSocketMessageAck | + ToServerWebSocketClose +} + +type ToServerTunnelMessage struct { + requestId: RequestId + messageId: MessageId + messageKind: ToServerTunnelMessageKind +} + +# To Client +type ToClientTunnelMessageKind union { + TunnelAck | + + # HTTP + ToClientRequestStart | + ToClientRequestChunk | + ToClientRequestAbort | + + # WebSocket + ToClientWebSocketOpen | + ToClientWebSocketMessage | + ToClientWebSocketClose +} + +type ToClientTunnelMessage struct { + requestId: RequestId + messageId: MessageId + messageKind: ToClientTunnelMessageKind + + # Subject to send replies to. + # + # Only sent when opening a new request from gateway -> pegboard-runner-ws. + # + # Should be stripped before sending to the runner. + gatewayReplyTo: optional +} + +# MARK: To Server +type ToServerInit struct { + name: str + version: u32 + totalSlots: u32 + lastCommandIdx: optional + prepopulateActorNames: optional> + metadata: optional +} + +type ToServerEvents list + +type ToServerAckCommands struct { + lastCommandIdx: i64 +} + +type ToServerStopping void + +type ToServerPing struct { + ts: i64 +} + +type ToServerKvRequest struct { + actorId: Id + requestId: u32 + data: KvRequestData +} + +type ToServer union { + ToServerInit | + ToServerEvents | + ToServerAckCommands | + ToServerStopping | + ToServerPing | + ToServerKvRequest | + ToServerTunnelMessage +} + +# MARK: To Client +type ProtocolMetadata struct { + runnerLostThreshold: i64 +} + +type ToClientInit struct { + runnerId: Id + lastEventIdx: i64 + metadata: ProtocolMetadata +} + +type ToClientCommands list + +type ToClientAckEvents struct { + lastEventIdx: i64 +} + +type ToClientKvResponse struct { + requestId: u32 + data: KvResponseData +} + +type ToClientClose void + +type ToClient union { + ToClientInit | + ToClientClose | + ToClientCommands | + ToClientAckEvents | + ToClientKvResponse | + ToClientTunnelMessage +} + +# MARK: To Gateway +type ToGateway struct { + message: ToServerTunnelMessage +} + +# MARK: Serverless +type ToServerlessServerInit struct { + runnerId: Id +} + +type ToServerlessServer union { + ToServerlessServerInit +} diff --git a/engine/sdks/typescript/runner-protocol/src/index.ts b/engine/sdks/typescript/runner-protocol/src/index.ts index c6405665cb..ff759d7b4e 100644 --- a/engine/sdks/typescript/runner-protocol/src/index.ts +++ b/engine/sdks/typescript/runner-protocol/src/index.ts @@ -805,10 +805,30 @@ export function writeEventWrapper(bc: bare.ByteCursor, x: EventWrapper): void { writeEvent(bc, x.inner) } +function read8(bc: bare.ByteCursor): readonly ArrayBuffer[] { + const len = bare.readUintSafe(bc) + if (len === 0) { + return [] + } + const result = [bare.readData(bc)] + for (let i = 1; i < len; i++) { + result[i] = bare.readData(bc) + } + return result +} + +function write8(bc: bare.ByteCursor, x: readonly ArrayBuffer[]): void { + bare.writeUintSafe(bc, x.length) + for (let i = 0; i < x.length; i++) { + bare.writeData(bc, x[i]) + } +} + export type CommandStartActor = { readonly actorId: Id readonly generation: u32 readonly config: ActorConfig + readonly hibernatingRequestIds: readonly ArrayBuffer[] } export function readCommandStartActor(bc: bare.ByteCursor): CommandStartActor { @@ -816,6 +836,7 @@ export function readCommandStartActor(bc: bare.ByteCursor): CommandStartActor { actorId: readId(bc), generation: bare.readU32(bc), config: readActorConfig(bc), + hibernatingRequestIds: read8(bc), } } @@ -823,6 +844,7 @@ export function writeCommandStartActor(bc: bare.ByteCursor, x: CommandStartActor writeId(bc, x.actorId) bare.writeU32(bc, x.generation) writeActorConfig(bc, x.config) + write8(bc, x.hibernatingRequestIds) } export type CommandStopActor = { @@ -923,7 +945,7 @@ export function writeMessageId(bc: bare.ByteCursor, x: MessageId): void { */ export type TunnelAck = null -function read8(bc: bare.ByteCursor): ReadonlyMap { +function read9(bc: bare.ByteCursor): ReadonlyMap { const len = bare.readUintSafe(bc) const result = new Map() for (let i = 0; i < len; i++) { @@ -938,7 +960,7 @@ function read8(bc: bare.ByteCursor): ReadonlyMap { return result } -function write8(bc: bare.ByteCursor, x: ReadonlyMap): void { +function write9(bc: bare.ByteCursor, x: ReadonlyMap): void { bare.writeUintSafe(bc, x.size) for (const kv of x) { bare.writeString(bc, kv[0]) @@ -963,7 +985,7 @@ export function readToClientRequestStart(bc: bare.ByteCursor): ToClientRequestSt actorId: readId(bc), method: bare.readString(bc), path: bare.readString(bc), - headers: read8(bc), + headers: read9(bc), body: read6(bc), stream: bare.readBool(bc), } @@ -973,7 +995,7 @@ export function writeToClientRequestStart(bc: bare.ByteCursor, x: ToClientReques writeId(bc, x.actorId) bare.writeString(bc, x.method) bare.writeString(bc, x.path) - write8(bc, x.headers) + write9(bc, x.headers) write6(bc, x.body) bare.writeBool(bc, x.stream) } @@ -1007,7 +1029,7 @@ export type ToServerResponseStart = { export function readToServerResponseStart(bc: bare.ByteCursor): ToServerResponseStart { return { status: bare.readU16(bc), - headers: read8(bc), + headers: read9(bc), body: read6(bc), stream: bare.readBool(bc), } @@ -1015,7 +1037,7 @@ export function readToServerResponseStart(bc: bare.ByteCursor): ToServerResponse export function writeToServerResponseStart(bc: bare.ByteCursor, x: ToServerResponseStart): void { bare.writeU16(bc, x.status) - write8(bc, x.headers) + write9(bc, x.headers) write6(bc, x.body) bare.writeBool(bc, x.stream) } @@ -1052,14 +1074,14 @@ export function readToClientWebSocketOpen(bc: bare.ByteCursor): ToClientWebSocke return { actorId: readId(bc), path: bare.readString(bc), - headers: read8(bc), + headers: read9(bc), } } export function writeToClientWebSocketOpen(bc: bare.ByteCursor, x: ToClientWebSocketOpen): void { writeId(bc, x.actorId) bare.writeString(bc, x.path) - write8(bc, x.headers) + write9(bc, x.headers) } export type ToClientWebSocketMessage = { @@ -1082,11 +1104,11 @@ export function writeToClientWebSocketMessage(bc: bare.ByteCursor, x: ToClientWe bare.writeBool(bc, x.binary) } -function read9(bc: bare.ByteCursor): u16 | null { +function read10(bc: bare.ByteCursor): u16 | null { return bare.readBool(bc) ? bare.readU16(bc) : null } -function write9(bc: bare.ByteCursor, x: u16 | null): void { +function write10(bc: bare.ByteCursor, x: u16 | null): void { bare.writeBool(bc, x != null) if (x != null) { bare.writeU16(bc, x) @@ -1100,31 +1122,28 @@ export type ToClientWebSocketClose = { export function readToClientWebSocketClose(bc: bare.ByteCursor): ToClientWebSocketClose { return { - code: read9(bc), + code: read10(bc), reason: read5(bc), } } export function writeToClientWebSocketClose(bc: bare.ByteCursor, x: ToClientWebSocketClose): void { - write9(bc, x.code) + write10(bc, x.code) write5(bc, x.reason) } export type ToServerWebSocketOpen = { readonly canHibernate: boolean - readonly lastMsgIndex: i64 } export function readToServerWebSocketOpen(bc: bare.ByteCursor): ToServerWebSocketOpen { return { canHibernate: bare.readBool(bc), - lastMsgIndex: bare.readI64(bc), } } export function writeToServerWebSocketOpen(bc: bare.ByteCursor, x: ToServerWebSocketOpen): void { bare.writeBool(bc, x.canHibernate) - bare.writeI64(bc, x.lastMsgIndex) } export type ToServerWebSocketMessage = { @@ -1166,14 +1185,14 @@ export type ToServerWebSocketClose = { export function readToServerWebSocketClose(bc: bare.ByteCursor): ToServerWebSocketClose { return { - code: read9(bc), + code: read10(bc), reason: read5(bc), retry: bare.readBool(bc), } } export function writeToServerWebSocketClose(bc: bare.ByteCursor, x: ToServerWebSocketClose): void { - write9(bc, x.code) + write10(bc, x.code) write5(bc, x.reason) bare.writeBool(bc, x.retry) } @@ -1394,7 +1413,7 @@ export function writeToClientTunnelMessage(bc: bare.ByteCursor, x: ToClientTunne write5(bc, x.gatewayReplyTo) } -function read10(bc: bare.ByteCursor): ReadonlyMap { +function read11(bc: bare.ByteCursor): ReadonlyMap { const len = bare.readUintSafe(bc) const result = new Map() for (let i = 0; i < len; i++) { @@ -1409,7 +1428,7 @@ function read10(bc: bare.ByteCursor): ReadonlyMap { return result } -function write10(bc: bare.ByteCursor, x: ReadonlyMap): void { +function write11(bc: bare.ByteCursor, x: ReadonlyMap): void { bare.writeUintSafe(bc, x.size) for (const kv of x) { bare.writeString(bc, kv[0]) @@ -1417,22 +1436,22 @@ function write10(bc: bare.ByteCursor, x: ReadonlyMap): void { } } -function read11(bc: bare.ByteCursor): ReadonlyMap | null { - return bare.readBool(bc) ? read10(bc) : null +function read12(bc: bare.ByteCursor): ReadonlyMap | null { + return bare.readBool(bc) ? read11(bc) : null } -function write11(bc: bare.ByteCursor, x: ReadonlyMap | null): void { +function write12(bc: bare.ByteCursor, x: ReadonlyMap | null): void { bare.writeBool(bc, x != null) if (x != null) { - write10(bc, x) + write11(bc, x) } } -function read12(bc: bare.ByteCursor): Json | null { +function read13(bc: bare.ByteCursor): Json | null { return bare.readBool(bc) ? readJson(bc) : null } -function write12(bc: bare.ByteCursor, x: Json | null): void { +function write13(bc: bare.ByteCursor, x: Json | null): void { bare.writeBool(bc, x != null) if (x != null) { writeJson(bc, x) @@ -1457,8 +1476,8 @@ export function readToServerInit(bc: bare.ByteCursor): ToServerInit { version: bare.readU32(bc), totalSlots: bare.readU32(bc), lastCommandIdx: read7(bc), - prepopulateActorNames: read11(bc), - metadata: read12(bc), + prepopulateActorNames: read12(bc), + metadata: read13(bc), } } @@ -1467,8 +1486,8 @@ export function writeToServerInit(bc: bare.ByteCursor, x: ToServerInit): void { bare.writeU32(bc, x.version) bare.writeU32(bc, x.totalSlots) write7(bc, x.lastCommandIdx) - write11(bc, x.prepopulateActorNames) - write12(bc, x.metadata) + write12(bc, x.prepopulateActorNames) + write13(bc, x.metadata) } export type ToServerEvents = readonly EventWrapper[] diff --git a/engine/sdks/typescript/runner/src/mod.ts b/engine/sdks/typescript/runner/src/mod.ts index 0c1b33137d..9d30b830d1 100644 --- a/engine/sdks/typescript/runner/src/mod.ts +++ b/engine/sdks/typescript/runner/src/mod.ts @@ -13,7 +13,7 @@ import { importWebSocket } from "./websocket.js"; import type { WebSocketTunnelAdapter } from "./websocket-tunnel-adapter"; const KV_EXPIRE: number = 30_000; -const PROTOCOL_VERSION: number = 2; +const PROTOCOL_VERSION: number = 3; const RUNNER_PING_INTERVAL = 3_000; /** Warn once the backlog significantly exceeds the server's ack batch size. */ diff --git a/rivetkit-openapi/openapi.json b/rivetkit-openapi/openapi.json index 611d6443e2..6f7096cccc 100644 --- a/rivetkit-openapi/openapi.json +++ b/rivetkit-openapi/openapi.json @@ -1,7 +1,7 @@ { "openapi": "3.0.0", "info": { - "version": "2.0.23", + "version": "2.0.24-rc.1", "title": "RivetKit API" }, "components": { diff --git a/scripts/run/docker/engine-rocksdb.sh b/scripts/run/docker/engine-rocksdb.sh index b3896cdb82..42690862ba 100755 --- a/scripts/run/docker/engine-rocksdb.sh +++ b/scripts/run/docker/engine-rocksdb.sh @@ -6,11 +6,11 @@ REPO_ROOT="$(cd "${SCRIPT_DIR}/../.." && pwd)" cd "${REPO_ROOT}" -RUST_LOG="${RUST_LOG:-debug}" \ +RUST_LOG="${RUST_LOG:-"opentelemetry_sdk=off,opentelemetry-otlp=info,tower::buffer::worker=info,debug"}" \ RIVET__PEGBOARD__RETRY_RESET_DURATION="100" \ RIVET__PEGBOARD__BASE_RETRY_TIMEOUT="100" \ RIVET__PEGBOARD__RESCHEDULE_BACKOFF_MAX_EXPONENT="1" \ RIVET__PEGBOARD__RUNNER_ELIGIBLE_THRESHOLD="5000" \ RIVET__PEGBOARD__RUNNER_LOST_THRESHOLD="7000" \ -cargo run --bin rivet-engine -- start "$@" | tee /tmp/rivet-engine.log +cargo run --bin rivet-engine -- start "$@" | tee -i /tmp/rivet-engine.log diff --git a/website/public/llms.txt b/website/public/llms.txt index 6ab0fea412..92b2c848d1 100644 --- a/website/public/llms.txt +++ b/website/public/llms.txt @@ -28,6 +28,7 @@ https://rivet.dev/blog/2025-10-19-weekly-updates https://rivet.dev/blog/2025-10-20-how-we-built-websocket-servers-for-vercel-functions https://rivet.dev/blog/2025-10-20-weekly-updates https://rivet.dev/blog/2025-10-24-weekly-updates +https://rivet.dev/blog/2025-11-02-weekly-updates https://rivet.dev/blog/godot-multiplayer-compared-to-unity https://rivet.dev/changelog https://rivet.dev/changelog.json @@ -56,6 +57,7 @@ https://rivet.dev/changelog/2025-10-19-weekly-updates https://rivet.dev/changelog/2025-10-20-how-we-built-websocket-servers-for-vercel-functions https://rivet.dev/changelog/2025-10-20-weekly-updates https://rivet.dev/changelog/2025-10-24-weekly-updates +https://rivet.dev/changelog/2025-11-02-weekly-updates https://rivet.dev/changelog/godot-multiplayer-compared-to-unity https://rivet.dev/cloud https://rivet.dev/docs/actors