From 7fadc23c5ab65bbf428d79b6ca5813e7244bf6a4 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Thu, 13 Nov 2025 10:47:14 -0800 Subject: [PATCH] chore(rivetkit): implement new hibernating ws protocol --- .../typescript/runner-protocol/src/index.ts | 8 +- engine/sdks/typescript/runner/src/mod.ts | 34 +- engine/sdks/typescript/runner/src/tunnel.ts | 296 ++++---- engine/sdks/typescript/runner/src/utils.ts | 43 ++ .../runner/src/websocket-tunnel-adapter.ts | 673 +++++++++--------- .../typescript/runner/tests/utils.test.ts | 194 +++++ rivetkit-openapi/openapi.json | 279 +------- .../src/drivers/engine/actor-driver.ts | 84 +-- 8 files changed, 785 insertions(+), 826 deletions(-) create mode 100644 engine/sdks/typescript/runner/tests/utils.test.ts diff --git a/engine/sdks/typescript/runner-protocol/src/index.ts b/engine/sdks/typescript/runner-protocol/src/index.ts index 69bc8d5e0b..ff759d7b4e 100644 --- a/engine/sdks/typescript/runner-protocol/src/index.ts +++ b/engine/sdks/typescript/runner-protocol/src/index.ts @@ -1,4 +1,4 @@ - +import assert from "node:assert" import * as bare from "@bare-ts/lib" const DEFAULT_CONFIG = /* @__PURE__ */ bare.Config({}) @@ -1925,9 +1925,3 @@ export function decodeToServerlessServer(bytes: Uint8Array): ToServerlessServer } return result } - - -function assert(condition: boolean, message?: string): asserts condition { - if (!condition) throw new Error(message ?? "Assertion failed") -} - diff --git a/engine/sdks/typescript/runner/src/mod.ts b/engine/sdks/typescript/runner/src/mod.ts index 9d30b830d1..c113a36b7d 100644 --- a/engine/sdks/typescript/runner/src/mod.ts +++ b/engine/sdks/typescript/runner/src/mod.ts @@ -57,30 +57,40 @@ export interface RunnerConfig { requestId: protocol.RequestId, request: Request, ) => Promise; - websocket?: ( + websocket: ( runner: Runner, actorId: string, ws: any, requestId: protocol.RequestId, request: Request, - ) => Promise; + ) => void; onActorStart: ( actorId: string, generation: number, config: ActorConfig, ) => Promise; + /** + * Called on actor start. + * + * Returns the message indices for hibernating request IDs. + * + * Disconnect any connections that are not in the request ID list. + */ + restoreHibernativeRequests( + actorId: string, + requestId: protocol.RequestId[], + ): number[]; onActorStop: (actorId: string, generation: number) => Promise; - getActorHibernationConfig: ( + canWebSocketHibernate: ( actorId: string, requestId: ArrayBuffer, request: Request, - ) => HibernationConfig; + ) => boolean; noAutoShutdown?: boolean; } export interface HibernationConfig { - enabled: boolean; - lastMsgIndex: number | undefined; + lastMsgIndex: number; } export interface KvListOptions { @@ -105,7 +115,6 @@ export class Runner { } #actors: Map = new Map(); - #actorWebSockets: Map> = new Map(); // WebSocket #pegboardWebSocket?: WebSocket; @@ -831,6 +840,11 @@ export class Runner { webSockets: new Set(), }; + this.#tunnel.restoreHibernatingRequests( + actorId, + startCommand.hibernatingRequestIds, + ); + this.#actors.set(actorId, instance); this.#sendActorStateUpdate(actorId, generation, "running"); @@ -1427,8 +1441,10 @@ export class Runner { } } - sendWebsocketMessageAck(requestId: ArrayBuffer, index: number) { - this.#tunnel?.__ackWebsocketMessage(requestId, index); + sendHibernatableWebSocketMessageAck(requestId: ArrayBuffer, index: number) { + if (!this.#tunnel) + throw new Error("missing tunnel to send message ack"); + this.#tunnel.sendHibernatableWebSocketMessageAck(requestId, index); } getServerlessInitPacket(): string | undefined { diff --git a/engine/sdks/typescript/runner/src/tunnel.ts b/engine/sdks/typescript/runner/src/tunnel.ts index d0fa360e84..8ccce68eb1 100644 --- a/engine/sdks/typescript/runner/src/tunnel.ts +++ b/engine/sdks/typescript/runner/src/tunnel.ts @@ -9,7 +9,11 @@ import { stringifyToServerTunnelMessageKind, } from "./stringify"; import { unreachable } from "./utils"; -import { WebSocketTunnelAdapter } from "./websocket-tunnel-adapter"; +import { + SET_CAN_HIBERNATE_SYMBOL, + SET_REQUEST_SYMBOL, + WebSocketTunnelAdapter, +} from "./websocket-tunnel-adapter"; const GC_INTERVAL = 60000; // 60 seconds const MESSAGE_ACK_TIMEOUT = 5000; // 5 seconds @@ -22,6 +26,12 @@ interface PendingRequest { actorId?: string; } +export interface HibernatingWebSocketMetadata { + path: string; + headers: Record; + messageIndex: number; +} + interface PendingTunnelMessage { sentAt: number; requestIdStr: string; @@ -91,6 +101,97 @@ export class Tunnel { this.#actorWebSockets.clear(); } + restoreHibernatingRequests(actorId: string, requestIds: RequestId[]) { + // Restore indices + const messageIndices = this.#runner.config.restoreHibernativeRequests( + actorId, + requestIds, + ); + if (messageIndices.length !== requestIds.length) { + throw new Error( + `Expected ${requestIds.length} message indices to be returned from restoreHibernativeRequestIds, received ${messageIndices.length}`, + ); + } + + for (const i = 0; i < requestIds.length; i++) { + const requestId = requestIds[i]; + const messageIndex = messageIndices[i]; + + this.#createWebSocket( + actorId, + requestId, + idToStr(requestId), + true, + messageIndex, + ); + } + + this.log?.info({ + msg: "restored hibernatable requests", + actorId, + requestIds: requestIds.length, + }); + } + + /** + * Called from WebSocketOpen message and when restoring hibernatable WebSockets. + */ + #createWebSocket( + actorId: string, + requestId: RequestId, + requestIdStr: string, + hibernatable: boolean, + messageIndex: number, + request: Request, + ): WebSocketTunnelAdapter { + // Create WebSocket adapter + const adapter = new WebSocketTunnelAdapter( + requestIdStr, + hibernatable, + request, + (data: ArrayBuffer | string, isBinary: boolean) => { + // Send message through tunnel + const dataBuffer = + typeof data === "string" + ? (new TextEncoder().encode(data).buffer as ArrayBuffer) + : data; + + this.#sendMessage(requestId, { + tag: "ToServerWebSocketMessage", + val: { + data: dataBuffer, + binary: isBinary, + }, + }); + }, + (code?: number, reason?: string, retry: boolean = false) => { + // Send close through tunnel + this.#sendMessage(requestId, { + tag: "ToServerWebSocketClose", + val: { + code: code || null, + reason: reason || null, + retry, + }, + }); + + // Remove from map + this.#actorWebSockets.delete(requestIdStr); + + // Clean up actor tracking + const actor = this.#runner.getActor(actorId); + if (actor) { + actor.webSockets.delete(requestIdStr); + } + }, + ); + + // Store adapter + this.#actorWebSockets.set(requestIdStr, adapter); + + // TODO: assign request to ws so we can pass this through + } + #sendMessage( requestId: RequestId, messageKind: protocol.ToServerTunnelMessageKind, @@ -343,7 +444,7 @@ export class Tunnel { case "ToClientWebSocketMessage": { this.#sendAck(message.requestId, message.messageId); - const _unhandled = await this.#handleWebSocketMessage( + this.#handleWebSocketMessage( message.requestId, message.messageKind.val, ); @@ -558,31 +659,9 @@ export class Tunnel { return; } - const websocketHandler = this.#runner.config.websocket; - - if (!websocketHandler) { - this.log?.error({ - msg: "no websocket handler configured for tunnel", - }); - // Send close immediately - this.#sendMessage(requestId, { - tag: "ToServerWebSocketClose", - val: { - code: 1011, - reason: "Not Implemented", - retry: false, - }, - }); - return; - } - // Close existing WebSocket if one already exists for this request ID. - // There should always be a close message sent before another open - // message for the same message ID. - // - // This should never occur if all is functioning correctly, but this - // prevents any edge case that would result in duplicate WebSockets for - // the same request. + // This should never happen, but prevents any potential duplicate + // WebSockets from retransmits. const existingAdapter = this.#actorWebSockets.get(requestIdStr); if (existingAdapter) { this.log?.warn({ @@ -594,104 +673,53 @@ export class Tunnel { existingAdapter.__closeWithoutCallback(1000, "ws.duplicate_open"); } - // Track this WebSocket for the actor - if (actor) { - actor.webSockets.add(requestIdStr); - } - + // Create WebSocket try { - // Create WebSocket adapter - const adapter = new WebSocketTunnelAdapter( - requestIdStr, - (data: ArrayBuffer | string, isBinary: boolean) => { - // Send message through tunnel - const dataBuffer = - typeof data === "string" - ? (new TextEncoder().encode(data) - .buffer as ArrayBuffer) - : data; - - this.#sendMessage(requestId, { - tag: "ToServerWebSocketMessage", - val: { - data: dataBuffer, - binary: isBinary, - }, - }); - }, - (code?: number, reason?: string, retry: boolean = false) => { - // Send close through tunnel - this.#sendMessage(requestId, { - tag: "ToServerWebSocketClose", - val: { - code: code || null, - reason: reason || null, - retry, - }, - }); - - // Remove from map - this.#actorWebSockets.delete(requestIdStr); + actor.webSockets.add(requestIdStr); - // Clean up actor tracking - if (actor) { - actor.webSockets.delete(requestIdStr); - } - }, + const request = buildRequestForWebSocket( + open.path, + Object.fromEntries(open.headers), ); - // Store adapter - this.#actorWebSockets.set(requestIdStr, adapter); - - // Convert headers to map - // - // We need to manually ensure the original Upgrade/Connection WS - // headers are present - const headerInit: Record = {}; - if (open.headers) { - for (const [k, v] of open.headers as ReadonlyMap< - string, - string - >) { - headerInit[k] = v; - } - } - headerInit["Upgrade"] = "websocket"; - headerInit["Connection"] = "Upgrade"; + const canHibernate = this.#runner.config.canWebSocketHibernate( + actor.actorId, + requestId, + request, + ); - const request = new Request(`http://localhost${open.path}`, { - method: "GET", - headers: headerInit, - }); + const adapter = this.#createWebSocket( + actor.actorId, + requestId, + requestIdStr, + canHibernate, + -1, + request, + ); - // Send open confirmation - const hibernationConfig = - this.#runner.config.getActorHibernationConfig( - actor.actorId, - requestId, - request, - ); - adapter.canHibernate = hibernationConfig.enabled; + // Call WebSocket handler. This handler will add event listeners + // for `open`, etc. If this handler throws, then the WebSocket will + // be closed before sending the open event. + this.#runner.config.websocket( + this.#runner, + open.actorId, + adapter, + requestId, + request, + ); + // Open the WebSocket after `config.socket` so (a) the event + // handlers can be added and (b) any errors in `config.websocket` + // will cause the WebSocket to terminate before the open event. this.#sendMessage(requestId, { tag: "ToServerWebSocketOpen", val: { - canHibernate: hibernationConfig.enabled, - lastMsgIndex: BigInt(hibernationConfig.lastMsgIndex ?? -1), + canHibernate, + lastMsgIndex: -1n, }, }); - // Notify adapter that connection is open adapter._handleOpen(requestId); - - // Call websocket handler - await websocketHandler( - this.#runner, - open.actorId, - adapter, - requestId, - request, - ); } catch (error) { this.log?.error({ msg: "error handling websocket open", error }); // Send close on error @@ -713,11 +741,10 @@ export class Tunnel { } } - /// Returns false if the message was sent off - async #handleWebSocketMessage( + #handleWebSocketMessage( requestId: ArrayBuffer, msg: protocol.ToClientWebSocketMessage, - ): Promise { + ) { const requestIdStr = idToStr(requestId); const adapter = this.#actorWebSockets.get(requestIdStr); if (adapter) { @@ -725,18 +752,16 @@ export class Tunnel { ? new Uint8Array(msg.data) : new TextDecoder().decode(new Uint8Array(msg.data)); - return adapter._handleMessage( - requestId, - data, - msg.index, - msg.binary, - ); + adapter._handleMessage(requestId, data, msg.index, msg.binary); } else { - return true; + this.log?.warn({ + msg: "missing websocket for incoming websocket message", + requestId, + }); } } - __ackWebsocketMessage(requestId: ArrayBuffer, index: number) { + sendHibernatableWebSocketMessageAck(requestId: ArrayBuffer, index: number) { this.log?.debug({ msg: "ack ws msg", requestId: idToStr(requestId), @@ -782,3 +807,32 @@ function generateUuidBuffer(): ArrayBuffer { function idToStr(id: ArrayBuffer): string { return uuidstringify(new Uint8Array(id)); } + +/** + * Builds a request that represents the incoming request for a given WebSocket. + * + * This request is not a real request and will never be sent. It's used to be passed to the actor to behave like a real incoming request. + */ +function buildRequestForWebSocket( + path: string, + headers: Record, +): Request { + // We need to manually ensure the original Upgrade/Connection WS + // headers are present + const fullHeaders = { + ...headers, + Upgrade: "websocket", + Connection: "Upgrade", + }; + + if (!path.startsWith("/")) { + throw new Error("path must start with leading slash"); + } + + const request = new Request(`http://actor${path}`, { + method: "GET", + headers: fullHeaders, + }); + + return request; +} diff --git a/engine/sdks/typescript/runner/src/utils.ts b/engine/sdks/typescript/runner/src/utils.ts index 4bf6693d26..d68a200790 100644 --- a/engine/sdks/typescript/runner/src/utils.ts +++ b/engine/sdks/typescript/runner/src/utils.ts @@ -64,3 +64,46 @@ export function parseWebSocketCloseReason( rayId, }; } + +const U16_MAX = 65535; + +/** + * Wrapping greater than comparison for u16 values. + * Based on shared_state.rs wrapping_gt implementation. + */ +export function wrappingGtU16(a: number, b: number): boolean { + return a !== b && wrappingSub(a, b, U16_MAX) < U16_MAX / 2; +} + +/** + * Wrapping less than comparison for u16 values. + * Based on shared_state.rs wrapping_lt implementation. + */ +export function wrappingLtU16(a: number, b: number): boolean { + return a !== b && wrappingSub(b, a, U16_MAX) < U16_MAX / 2; +} + +/** + * Wrapping greater than or equal comparison for u16 values. + */ +export function wrappingGteU16(a: number, b: number): boolean { + return a === b || wrappingGtU16(a, b); +} + +/** + * Wrapping less than or equal comparison for u16 values. + */ +export function wrappingLteU16(a: number, b: number): boolean { + return a === b || wrappingLtU16(a, b); +} + +/** + * Performs wrapping subtraction for unsigned integers. + */ +function wrappingSub(a: number, b: number, max: number): number { + const result = a - b; + if (result < 0) { + return result + max + 1; + } + return result; +} diff --git a/engine/sdks/typescript/runner/src/websocket-tunnel-adapter.ts b/engine/sdks/typescript/runner/src/websocket-tunnel-adapter.ts index ddebb72bc8..bba9bfce60 100644 --- a/engine/sdks/typescript/runner/src/websocket-tunnel-adapter.ts +++ b/engine/sdks/typescript/runner/src/websocket-tunnel-adapter.ts @@ -1,10 +1,12 @@ // WebSocket-like adapter for tunneled connections // Implements a subset of the WebSocket interface for compatibility with runner code +import type { Logger } from "pino"; import { logger } from "./log"; +import type { Tunnel } from "./tunnel"; +import { wrappingLte } from "./utils"; export class WebSocketTunnelAdapter { - #webSocketId: string; #readyState: number = 0; // CONNECTING #eventListeners: Map void>> = new Map(); #onopen: ((this: any, ev: any) => any) | null = null; @@ -16,18 +18,53 @@ export class WebSocketTunnelAdapter { #extensions = ""; #protocol = ""; #url = ""; + + /** + * Called when a new message index is received for a HWS. This should + * persist the message index somewhere in order to restore the WebSocket. + * + * The receiver of this is in charge of sending Runner::sendHibernatableWebSocketMessageAck. + */ + #persistHibernatableWebSocketMessageIndex: (messageIndex: number) => void; + + /** + * Called when sending a message from this WebSocket. + * + * Used to send a tunnel message from Tunnel. + */ #sendCallback: (data: ArrayBuffer | string, isBinary: boolean) => void; + + /** + * Called when closing this WebSocket. + * + * Used to send a tunnel message from Tunnel + */ #closeCallback: (code?: number, reason?: string, retry?: boolean) => void; - #canHibernate: boolean = false; - // Event buffering for events fired before listeners are attached - #bufferedEvents: Array<{ - type: string; - event: any; - }> = []; + #tunnel: Tunnel; + + get #log(): Logger | undefined { + return this.#tunnel.log; + } constructor( - webSocketId: string, + tunnel: Tunnel, + // TODO: This should be internal only + /** @experimental */ + public readonly actorId: string, + // TODO: This should be internal only + /** @experimental */ + public readonly requestId: string, + /** @experimental */ + public readonly hibernatable: boolean, + /** @experimental */ + public readonly request: Request, + // TODO: This should be internal only + /** @experimental */ + public messageIndex: number, + persistHibernatableWebSocketMessageIndex: ( + messageIndex: number, + ) => void, sendCallback: (data: ArrayBuffer | string, isBinary: boolean) => void, closeCallback: ( code?: number, @@ -35,19 +72,290 @@ export class WebSocketTunnelAdapter { retry?: boolean, ) => void, ) { - this.#webSocketId = webSocketId; + this.#tunnel = tunnel; + this.#persistHibernatableWebSocketMessageIndex = + persistHibernatableWebSocketMessageIndex; this.#sendCallback = sendCallback; this.#closeCallback = closeCallback; } - get readyState(): number { - return this.#readyState; - } - + // MARK: - Lifecycle get bufferedAmount(): number { return this.#bufferedAmount; } + _handleOpen(requestId: ArrayBuffer): void { + if (this.#readyState !== 0) { + // CONNECTING + return; + } + + this.#readyState = 1; // OPEN + + const event = { + type: "open", + rivetRequestId: requestId, + target: this, + }; + + this.#fireEvent("open", event); + } + + _handleMessage( + requestId: ArrayBuffer, + data: string | Uint8Array, + messageIndex: number, + isBinary: boolean, + ): boolean { + if (this.#readyState !== 1) { + return true; + } + + const previousIndex = this.messageIndex; + + // Ignore duplicate old messages + // + // This should only happen if something goes wrong + // between persisting the previous index and acking the + // message index to the gateway. If the ack is never + // received by the gateway (due to a crash or network + // issue), the gateway will resend all messages from + // the last ack on reconnect. + if (wrappingLte(messageIndex, previousIndex)) { + this.#log?.info({ + msg: "received duplicate hibernating websocket message, this indicates the actor failed to ack the message index before restarting", + requestId, + actorId: this.actorId, + previousIndex, + expectedIndex: previousIndex + 1, + receivedIndex: messageIndex, + }); + + return true; + } + + // Close message if skipped message in sequence + // + // There is no scenario where this should ever happen + if (messageIndex !== previousIndex + 1) { + const closeReason = "ws.message_index_skip"; + + this.#log?.warn({ + msg: "hibernatable websocket message index out of sequence, closing connection", + requestId, + actorId: this.actorId, + previousIndex, + expectedIndex: previousIndex + 1, + receivedIndex: messageIndex, + closeReason, + gap: messageIndex - previousIndex - 1, + }); + + // Close the WebSocket and skip processing + this.close(1008, closeReason); + + return true; + } + + // Update to the next index + this.messageIndex = messageIndex; + if (this.hibernatable) { + this.#persistHibernatableWebSocketMessageIndex(messageIndex); + } + + // Dispatch event + let messageData: any; + if (isBinary) { + // Handle binary data based on binaryType + if (this.#binaryType === "nodebuffer") { + // Convert to Buffer for Node.js compatibility + messageData = Buffer.from(data as Uint8Array); + } else if (this.#binaryType === "arraybuffer") { + // Convert to ArrayBuffer + if (data instanceof Uint8Array) { + messageData = data.buffer.slice( + data.byteOffset, + data.byteOffset + data.byteLength, + ); + } else { + messageData = data; + } + } else { + // Blob type - not commonly used in Node.js + throw new Error( + "Blob binaryType not supported in tunnel adapter", + ); + } + } else { + messageData = data; + } + + const event = { + type: "message", + data: messageData, + rivetRequestId: requestId, + rivetMessageIndex: messageIndex, + target: this, + }; + + this.#fireEvent("message", event); + + return false; + } + + _handleClose(requestId: ArrayBuffer, code?: number, reason?: string): void { + if (this.#readyState === 3) { + return; + } + + this.#readyState = 3; + + const event = { + type: "close", + wasClean: true, + code: code || 1000, + reason: reason || "", + rivetRequestId: requestId, + target: this, + }; + + this.#fireEvent("close", event); + } + + _handleError(error: Error): void { + const event = { + type: "error", + target: this, + error, + }; + + this.#fireEvent("error", event); + } + + __closeWithRetry(code?: number, reason?: string): void { + this.#closeInner(code, reason, true, true); + } + + __closeWithoutCallback(code?: number, reason?: string): void { + this.#closeInner(code, reason, false, false); + } + + #fireEvent(type: string, event: any): void { + // Call all registered event listeners + const listeners = this.#eventListeners.get(type); + let hasListeners = false; + + if (listeners && listeners.size > 0) { + hasListeners = true; + for (const listener of listeners) { + try { + listener.call(this, event); + } catch (error) { + logger()?.error({ + msg: "error in websocket event listener", + error, + type, + }); + } + } + } + + // Call the onX property if set + switch (type) { + case "open": + if (this.#onopen) { + hasListeners = true; + try { + this.#onopen.call(this, event); + } catch (error) { + logger()?.error({ + msg: "error in onopen handler", + error, + }); + } + } + break; + case "close": + if (this.#onclose) { + hasListeners = true; + try { + this.#onclose.call(this, event); + } catch (error) { + logger()?.error({ + msg: "error in onclose handler", + error, + }); + } + } + break; + case "error": + if (this.#onerror) { + hasListeners = true; + try { + this.#onerror.call(this, event); + } catch (error) { + logger()?.error({ + msg: "error in onerror handler", + error, + }); + } + } + break; + case "message": + if (this.#onmessage) { + hasListeners = true; + try { + this.#onmessage.call(this, event); + } catch (error) { + logger()?.error({ + msg: "error in onmessage handler", + error, + }); + } + } + break; + } + } + + #closeInner( + code: number | undefined, + reason: string | undefined, + retry: boolean, + callback: boolean, + ): void { + if ( + this.#readyState === 2 || // CLOSING + this.#readyState === 3 // CLOSED + ) { + return; + } + + this.#readyState = 2; // CLOSING + + // Send close through tunnel + if (callback) { + this.#closeCallback(code, reason, retry); + } + + // Update state and fire event + this.#readyState = 3; // CLOSED + + const closeEvent = { + wasClean: true, + code: code || 1000, + reason: reason || "", + type: "close", + target: this, + }; + + this.#fireEvent("close", closeEvent); + } + + // MARK: - WebSocket Compatible API + get readyState(): number { + return this.#readyState; + } + get binaryType(): string { return this.#binaryType; } @@ -74,26 +382,12 @@ export class WebSocketTunnelAdapter { return this.#url; } - /** @experimental */ - get canHibernate(): boolean { - return this.#canHibernate; - } - - /** @experimental */ - set canHibernate(value: boolean) { - this.#canHibernate = value; - } - get onopen(): ((this: any, ev: any) => any) | null { return this.#onopen; } set onopen(value: ((this: any, ev: any) => any) | null) { this.#onopen = value; - // Flush any buffered open events when onopen is set - if (value) { - this.#flushBufferedEvents("open"); - } } get onclose(): ((this: any, ev: any) => any) | null { @@ -102,10 +396,6 @@ export class WebSocketTunnelAdapter { set onclose(value: ((this: any, ev: any) => any) | null) { this.#onclose = value; - // Flush any buffered close events when onclose is set - if (value) { - this.#flushBufferedEvents("close"); - } } get onerror(): ((this: any, ev: any) => any) | null { @@ -114,10 +404,6 @@ export class WebSocketTunnelAdapter { set onerror(value: ((this: any, ev: any) => any) | null) { this.#onerror = value; - // Flush any buffered error events when onerror is set - if (value) { - this.#flushBufferedEvents("error"); - } } get onmessage(): ((this: any, ev: any) => any) | null { @@ -126,10 +412,6 @@ export class WebSocketTunnelAdapter { set onmessage(value: ((this: any, ev: any) => any) | null) { this.#onmessage = value; - // Flush any buffered message events when onmessage is set - if (value) { - this.#flushBufferedEvents("message"); - } } send(data: string | ArrayBuffer | ArrayBufferView | Blob | Buffer): void { @@ -201,49 +483,7 @@ export class WebSocketTunnelAdapter { } close(code?: number, reason?: string): void { - this.closeInner(code, reason, false, true); - } - - __closeWithRetry(code?: number, reason?: string): void { - this.closeInner(code, reason, true, true); - } - - __closeWithoutCallback(code?: number, reason?: string): void { - this.closeInner(code, reason, false, false); - } - - closeInner( - code: number | undefined, - reason: string | undefined, - retry: boolean, - callback: boolean, - ): void { - if ( - this.#readyState === 2 || // CLOSING - this.#readyState === 3 // CLOSED - ) { - return; - } - - this.#readyState = 2; // CLOSING - - // Send close through tunnel - if (callback) { - this.#closeCallback(code, reason, retry); - } - - // Update state and fire event - this.#readyState = 3; // CLOSED - - const closeEvent = { - wasClean: true, - code: code || 1000, - reason: reason || "", - type: "close", - target: this, - }; - - this.#fireEvent("close", closeEvent); + this.#closeInner(code, reason, false, true); } addEventListener( @@ -258,9 +498,6 @@ export class WebSocketTunnelAdapter { this.#eventListeners.set(type, listeners); } listeners.add(listener); - - // Flush any buffered events for this type - this.#flushBufferedEvents(type); } } @@ -278,278 +515,15 @@ export class WebSocketTunnelAdapter { } dispatchEvent(event: any): boolean { - // Simple implementation + // TODO: return true; } - #fireEvent(type: string, event: any): void { - // Call all registered event listeners - const listeners = this.#eventListeners.get(type); - let hasListeners = false; - - if (listeners && listeners.size > 0) { - hasListeners = true; - for (const listener of listeners) { - try { - listener.call(this, event); - } catch (error) { - logger()?.error({ - msg: "error in websocket event listener", - error, - type, - }); - } - } - } - - // Call the onX property if set - switch (type) { - case "open": - if (this.#onopen) { - hasListeners = true; - try { - this.#onopen.call(this, event); - } catch (error) { - logger()?.error({ - msg: "error in onopen handler", - error, - }); - } - } - break; - case "close": - if (this.#onclose) { - hasListeners = true; - try { - this.#onclose.call(this, event); - } catch (error) { - logger()?.error({ - msg: "error in onclose handler", - error, - }); - } - } - break; - case "error": - if (this.#onerror) { - hasListeners = true; - try { - this.#onerror.call(this, event); - } catch (error) { - logger()?.error({ - msg: "error in onerror handler", - error, - }); - } - } - break; - case "message": - if (this.#onmessage) { - hasListeners = true; - try { - this.#onmessage.call(this, event); - } catch (error) { - logger()?.error({ - msg: "error in onmessage handler", - error, - }); - } - } - break; - } - - // Buffer the event if no listeners are registered - if (!hasListeners) { - this.#bufferedEvents.push({ type, event }); - } - } - - #flushBufferedEvents(type: string): void { - const eventsToFlush = this.#bufferedEvents.filter( - (buffered) => buffered.type === type, - ); - this.#bufferedEvents = this.#bufferedEvents.filter( - (buffered) => buffered.type !== type, - ); - - for (const { event } of eventsToFlush) { - // Re-fire the event, which will now have listeners - const listeners = this.#eventListeners.get(type); - if (listeners) { - for (const listener of listeners) { - try { - listener.call(this, event); - } catch (error) { - logger()?.error({ - msg: "error in websocket event listener", - error, - type, - }); - } - } - } - - // Also call the onX handler if it exists - switch (type) { - case "open": - if (this.#onopen) { - try { - this.#onopen.call(this, event); - } catch (error) { - logger()?.error({ - msg: "error in onopen handler", - error, - }); - } - } - break; - case "close": - if (this.#onclose) { - try { - this.#onclose.call(this, event); - } catch (error) { - logger()?.error({ - msg: "error in onclose handler", - error, - }); - } - } - break; - case "error": - if (this.#onerror) { - try { - this.#onerror.call(this, event); - } catch (error) { - logger()?.error({ - msg: "error in onerror handler", - error, - }); - } - } - break; - case "message": - if (this.#onmessage) { - try { - this.#onmessage.call(this, event); - } catch (error) { - logger()?.error({ - msg: "error in onmessage handler", - error, - }); - } - } - break; - } - } - } - - // Internal methods called by the Tunnel class - _handleOpen(requestId: ArrayBuffer): void { - if (this.#readyState !== 0) { - // CONNECTING - return; - } - - this.#readyState = 1; // OPEN - - const event = { - type: "open", - rivetRequestId: requestId, - target: this, - }; - - this.#fireEvent("open", event); - } - - /// Returns false if the message was sent off. - _handleMessage( - requestId: ArrayBuffer, - data: string | Uint8Array, - index: number, - isBinary: boolean, - ): boolean { - if (this.#readyState !== 1) { - // OPEN - return true; - } - - let messageData: any; - - if (isBinary) { - // Handle binary data based on binaryType - if (this.#binaryType === "nodebuffer") { - // Convert to Buffer for Node.js compatibility - messageData = Buffer.from(data as Uint8Array); - } else if (this.#binaryType === "arraybuffer") { - // Convert to ArrayBuffer - if (data instanceof Uint8Array) { - messageData = data.buffer.slice( - data.byteOffset, - data.byteOffset + data.byteLength, - ); - } else { - messageData = data; - } - } else { - // Blob type - not commonly used in Node.js - throw new Error( - "Blob binaryType not supported in tunnel adapter", - ); - } - } else { - messageData = data; - } - - const event = { - type: "message", - data: messageData, - rivetRequestId: requestId, - rivetMessageIndex: index, - target: this, - }; - - this.#fireEvent("message", event); - - return false; - } - - _handleClose(requestId: ArrayBuffer, code?: number, reason?: string): void { - if (this.#readyState === 3) { - // CLOSED - return; - } - - this.#readyState = 3; // CLOSED - - const event = { - type: "close", - wasClean: true, - code: code || 1000, - reason: reason || "", - rivetRequestId: requestId, - target: this, - }; - - this.#fireEvent("close", event); - } - - _handleError(error: Error): void { - const event = { - type: "error", - target: this, - error, - }; - - this.#fireEvent("error", event); - } - - // WebSocket constants for compatibility static readonly CONNECTING = 0; static readonly OPEN = 1; static readonly CLOSING = 2; static readonly CLOSED = 3; - // Instance constants readonly CONNECTING = 0; readonly OPEN = 1; readonly CLOSING = 2; @@ -566,6 +540,7 @@ export class WebSocketTunnelAdapter { if (cb) cb(new Error("Pong not supported in tunnel adapter")); } + /** @experimental */ terminate(): void { // Immediate close without close frame this.#readyState = 3; // CLOSED diff --git a/engine/sdks/typescript/runner/tests/utils.test.ts b/engine/sdks/typescript/runner/tests/utils.test.ts new file mode 100644 index 0000000000..6259921683 --- /dev/null +++ b/engine/sdks/typescript/runner/tests/utils.test.ts @@ -0,0 +1,194 @@ +import { describe, expect, it } from "vitest"; +import { + wrappingGteU16, + wrappingGtU16, + wrappingLteU16, + wrappingLtU16, +} from "../src/utils"; + +describe("wrappingGtU16", () => { + it("should return true when a > b in normal case", () => { + expect(wrappingGtU16(100, 50)).toBe(true); + expect(wrappingGtU16(1000, 999)).toBe(true); + }); + + it("should return false when a < b in normal case", () => { + expect(wrappingGtU16(50, 100)).toBe(false); + expect(wrappingGtU16(999, 1000)).toBe(false); + }); + + it("should return false when a == b", () => { + expect(wrappingGtU16(100, 100)).toBe(false); + expect(wrappingGtU16(0, 0)).toBe(false); + expect(wrappingGtU16(65535, 65535)).toBe(false); + }); + + it("should handle wrapping around u16 max", () => { + // When values wrap around, 1 is "greater than" 65535 + expect(wrappingGtU16(1, 65535)).toBe(true); + expect(wrappingGtU16(100, 65500)).toBe(true); + }); + + it("should handle edge cases near u16 boundaries", () => { + // 65535 is not greater than 0 (wrapped) + expect(wrappingGtU16(65535, 0)).toBe(false); + // But 0 is greater than 65535 if we consider wrapping + expect(wrappingGtU16(0, 65535)).toBe(true); + }); + + it("should handle values at exactly half the range", () => { + // U16_MAX / 2 = 32767.5, so values with distance <= 32767 return true + const lessThanHalf = 32766; + expect(wrappingGtU16(lessThanHalf, 0)).toBe(true); + expect(wrappingGtU16(0, lessThanHalf)).toBe(false); + + // At distance 32767, still less than 32767.5, so comparison returns true + const atHalfDistance = 32767; + expect(wrappingGtU16(atHalfDistance, 0)).toBe(true); + expect(wrappingGtU16(0, atHalfDistance)).toBe(false); + + // At distance 32768, greater than 32767.5, so comparison returns false + const overHalfDistance = 32768; + expect(wrappingGtU16(overHalfDistance, 0)).toBe(false); + expect(wrappingGtU16(0, overHalfDistance)).toBe(false); + }); +}); + +describe("wrappingLtU16", () => { + it("should return true when a < b in normal case", () => { + expect(wrappingLtU16(50, 100)).toBe(true); + expect(wrappingLtU16(999, 1000)).toBe(true); + }); + + it("should return false when a > b in normal case", () => { + expect(wrappingLtU16(100, 50)).toBe(false); + expect(wrappingLtU16(1000, 999)).toBe(false); + }); + + it("should return false when a == b", () => { + expect(wrappingLtU16(100, 100)).toBe(false); + expect(wrappingLtU16(0, 0)).toBe(false); + expect(wrappingLtU16(65535, 65535)).toBe(false); + }); + + it("should handle wrapping around u16 max", () => { + // When values wrap around, 65535 is "less than" 1 + expect(wrappingLtU16(65535, 1)).toBe(true); + expect(wrappingLtU16(65500, 100)).toBe(true); + }); + + it("should handle edge cases near u16 boundaries", () => { + // 0 is not less than 65535 (wrapped) + expect(wrappingLtU16(0, 65535)).toBe(false); + // But 65535 is less than 0 if we consider wrapping + expect(wrappingLtU16(65535, 0)).toBe(true); + }); + + it("should handle values at exactly half the range", () => { + // U16_MAX / 2 = 32767.5, so values with distance <= 32767 return true + const lessThanHalf = 32766; + expect(wrappingLtU16(0, lessThanHalf)).toBe(true); + expect(wrappingLtU16(lessThanHalf, 0)).toBe(false); + + // At distance 32767, still less than 32767.5, so comparison returns true + const atHalfDistance = 32767; + expect(wrappingLtU16(0, atHalfDistance)).toBe(true); + expect(wrappingLtU16(atHalfDistance, 0)).toBe(false); + + // At distance 32768, greater than 32767.5, so comparison returns false + const overHalfDistance = 32768; + expect(wrappingLtU16(0, overHalfDistance)).toBe(false); + expect(wrappingLtU16(overHalfDistance, 0)).toBe(false); + }); +}); + +describe("wrappingGtU16 and wrappingLtU16 consistency", () => { + it("should be inverse of each other for different values", () => { + const testCases: [number, number][] = [ + [100, 200], + [200, 100], + [0, 65535], + [65535, 0], + [1, 65534], + [32767, 32768], + ]; + + for (const [a, b] of testCases) { + const gt = wrappingGtU16(a, b); + const lt = wrappingLtU16(a, b); + const eq = a === b; + + // For any pair, exactly one of gt, lt, or eq should be true + expect(Number(gt) + Number(lt) + Number(eq)).toBe(1); + } + }); + + it("should satisfy transitivity for sequential values", () => { + // If we have sequential indices, a < b < c should hold + const a = 100; + const b = 101; + const c = 102; + + expect(wrappingLtU16(a, b)).toBe(true); + expect(wrappingLtU16(b, c)).toBe(true); + expect(wrappingLtU16(a, c)).toBe(true); + }); + + it("should handle sequence across wrap boundary", () => { + // Test a sequence that wraps: 65534, 65535, 0, 1 + const values = [65534, 65535, 0, 1]; + + for (let i = 0; i < values.length - 1; i++) { + expect(wrappingLtU16(values[i], values[i + 1])).toBe(true); + expect(wrappingGtU16(values[i + 1], values[i])).toBe(true); + } + }); +}); + +describe("wrappingGteU16", () => { + it("should return true when a > b", () => { + expect(wrappingGteU16(100, 50)).toBe(true); + expect(wrappingGteU16(1000, 999)).toBe(true); + }); + + it("should return true when a == b", () => { + expect(wrappingGteU16(100, 100)).toBe(true); + expect(wrappingGteU16(0, 0)).toBe(true); + expect(wrappingGteU16(65535, 65535)).toBe(true); + }); + + it("should return false when a < b", () => { + expect(wrappingGteU16(50, 100)).toBe(false); + expect(wrappingGteU16(999, 1000)).toBe(false); + }); + + it("should handle wrapping around u16 max", () => { + expect(wrappingGteU16(1, 65535)).toBe(true); + expect(wrappingGteU16(100, 65500)).toBe(true); + expect(wrappingGteU16(0, 65535)).toBe(true); + }); +}); + +describe("wrappingLteU16", () => { + it("should return true when a < b", () => { + expect(wrappingLteU16(50, 100)).toBe(true); + expect(wrappingLteU16(999, 1000)).toBe(true); + }); + + it("should return true when a == b", () => { + expect(wrappingLteU16(100, 100)).toBe(true); + expect(wrappingLteU16(0, 0)).toBe(true); + expect(wrappingLteU16(65535, 65535)).toBe(true); + }); + + it("should return false when a > b", () => { + expect(wrappingLteU16(100, 50)).toBe(false); + expect(wrappingLteU16(1000, 999)).toBe(false); + }); + + it("should handle wrapping around u16 max", () => { + expect(wrappingLteU16(65535, 1)).toBe(true); + expect(wrappingLteU16(65500, 100)).toBe(true); + expect(wrappingLteU16(65535, 0)).toBe(true); + }); +}); diff --git a/rivetkit-openapi/openapi.json b/rivetkit-openapi/openapi.json index 90803b4757..4ab454fa07 100644 --- a/rivetkit-openapi/openapi.json +++ b/rivetkit-openapi/openapi.json @@ -113,6 +113,7 @@ }, "put": { "requestBody": { + "required": true, "content": { "application/json": { "schema": { @@ -225,6 +226,7 @@ }, "post": { "requestBody": { + "required": true, "content": { "application/json": { "schema": { @@ -385,283 +387,6 @@ } } } - }, - "/gateway/{actorId}/health": { - "get": { - "parameters": [ - { - "name": "actorId", - "in": "path", - "required": true, - "schema": { - "type": "string" - }, - "description": "The ID of the actor to target" - } - ], - "responses": { - "200": { - "description": "Health check", - "content": { - "text/plain": { - "schema": { - "type": "string" - } - } - } - } - } - } - }, - "/gateway/{actorId}/action/{action}": { - "post": { - "parameters": [ - { - "name": "actorId", - "in": "path", - "required": true, - "schema": { - "type": "string" - }, - "description": "The ID of the actor to target" - }, - { - "name": "action", - "in": "path", - "required": true, - "schema": { - "type": "string" - }, - "description": "The name of the action to execute" - } - ], - "requestBody": { - "content": { - "application/json": { - "schema": { - "type": "object", - "properties": { - "args": {} - }, - "additionalProperties": false - } - } - } - }, - "responses": { - "200": { - "description": "Action executed successfully", - "content": { - "application/json": { - "schema": { - "type": "object", - "properties": { - "output": {} - }, - "additionalProperties": false - } - } - } - }, - "400": { - "description": "Invalid action" - }, - "500": { - "description": "Internal error" - } - } - } - }, - "/gateway/{actorId}/request/{path}": { - "get": { - "parameters": [ - { - "name": "actorId", - "in": "path", - "required": true, - "schema": { - "type": "string" - }, - "description": "The ID of the actor to target" - }, - { - "name": "path", - "in": "path", - "required": true, - "schema": { - "type": "string" - }, - "description": "The HTTP path to forward to the actor" - } - ], - "responses": { - "200": { - "description": "Response from actor's raw HTTP handler" - } - } - }, - "post": { - "parameters": [ - { - "name": "actorId", - "in": "path", - "required": true, - "schema": { - "type": "string" - }, - "description": "The ID of the actor to target" - }, - { - "name": "path", - "in": "path", - "required": true, - "schema": { - "type": "string" - }, - "description": "The HTTP path to forward to the actor" - } - ], - "responses": { - "200": { - "description": "Response from actor's raw HTTP handler" - } - } - }, - "put": { - "parameters": [ - { - "name": "actorId", - "in": "path", - "required": true, - "schema": { - "type": "string" - }, - "description": "The ID of the actor to target" - }, - { - "name": "path", - "in": "path", - "required": true, - "schema": { - "type": "string" - }, - "description": "The HTTP path to forward to the actor" - } - ], - "responses": { - "200": { - "description": "Response from actor's raw HTTP handler" - } - } - }, - "delete": { - "parameters": [ - { - "name": "actorId", - "in": "path", - "required": true, - "schema": { - "type": "string" - }, - "description": "The ID of the actor to target" - }, - { - "name": "path", - "in": "path", - "required": true, - "schema": { - "type": "string" - }, - "description": "The HTTP path to forward to the actor" - } - ], - "responses": { - "200": { - "description": "Response from actor's raw HTTP handler" - } - } - }, - "patch": { - "parameters": [ - { - "name": "actorId", - "in": "path", - "required": true, - "schema": { - "type": "string" - }, - "description": "The ID of the actor to target" - }, - { - "name": "path", - "in": "path", - "required": true, - "schema": { - "type": "string" - }, - "description": "The HTTP path to forward to the actor" - } - ], - "responses": { - "200": { - "description": "Response from actor's raw HTTP handler" - } - } - }, - "head": { - "parameters": [ - { - "name": "actorId", - "in": "path", - "required": true, - "schema": { - "type": "string" - }, - "description": "The ID of the actor to target" - }, - { - "name": "path", - "in": "path", - "required": true, - "schema": { - "type": "string" - }, - "description": "The HTTP path to forward to the actor" - } - ], - "responses": { - "200": { - "description": "Response from actor's raw HTTP handler" - } - } - }, - "options": { - "parameters": [ - { - "name": "actorId", - "in": "path", - "required": true, - "schema": { - "type": "string" - }, - "description": "The ID of the actor to target" - }, - { - "name": "path", - "in": "path", - "required": true, - "schema": { - "type": "string" - }, - "description": "The HTTP path to forward to the actor" - } - ], - "responses": { - "200": { - "description": "Response from actor's raw HTTP handler" - } - } - } } } } \ No newline at end of file diff --git a/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts b/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts index d18e55da79..3034b7020f 100644 --- a/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts +++ b/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts @@ -78,12 +78,12 @@ export class EngineActorDriver implements ActorDriver { // protocol is updated to send the intent directly (see RVT-5284) #actorStopIntent: Map = new Map(); - // WebSocket message acknowledgment debouncing for hibernatable websockets - #hibernatableWebSocketAckQueue: Map< - string, - { requestIdBuf: ArrayBuffer; messageIndex: number } - > = new Map(); - #wsAckFlushInterval?: NodeJS.Timeout; + // // WebSocket message acknowledgment debouncing for hibernatable websockets + // #hibernatableWebSocketAckQueue: Map< + // string, + // { requestIdBuf: ArrayBuffer; messageIndex: number } + // > = new Map(); + // #wsAckFlushInterval?: NodeJS.Timeout; constructor( registryConfig: RegistryConfig, @@ -558,7 +558,7 @@ export class EngineActorDriver implements ActorDriver { return await this.#actorRouter.fetch(request, { actorId }); } - async #runnerWebSocket( + #runnerWebSocket( _runner: Runner, actorId: string, websocketRaw: any, @@ -648,50 +648,7 @@ export class EngineActorDriver implements ActorDriver { const currentEntry = this.#hibernatableWebSocketAckQueue.get(requestId); if (currentEntry) { - const previousIndex = currentEntry.messageIndex; - - // Check for out-of-sequence messages - if (event.rivetMessageIndex !== previousIndex + 1) { - let closeReason: string; - let sequenceType: string; - - if (event.rivetMessageIndex < previousIndex) { - closeReason = "ws.message_index_regressed"; - sequenceType = "regressed"; - } else if ( - event.rivetMessageIndex === previousIndex - ) { - closeReason = "ws.message_index_duplicate"; - sequenceType = "duplicate"; - } else { - closeReason = "ws.message_index_skip"; - sequenceType = "gap/skipped"; - } - - logger().warn({ - msg: "hibernatable websocket message index out of sequence, closing connection", - requestId, - actorId, - previousIndex, - expectedIndex: previousIndex + 1, - receivedIndex: event.rivetMessageIndex, - sequenceType, - closeReason, - gap: - event.rivetMessageIndex > previousIndex - ? event.rivetMessageIndex - - previousIndex - - 1 - : 0, - }); - - // Close the WebSocket and skip processing - wsContext.close(1008, closeReason); - return; - } - - // Update to the next index - currentEntry.messageIndex = event.rivetMessageIndex; + // TODO: } else { this.#hibernatableWebSocketAckQueue.set(requestId, { requestIdBuf, @@ -699,18 +656,19 @@ export class EngineActorDriver implements ActorDriver { }); } - // Update msgIndex for next WebSocket open msgIndex restoration - const oldMsgIndex = hibernatableWs.msgIndex; - hibernatableWs.msgIndex = event.rivetMessageIndex; - hibernatableWs.lastSeenTimestamp = Date.now(); - - logger().debug({ - msg: "updated hibernatable websocket msgIndex in engine driver", - requestId, - oldMsgIndex: oldMsgIndex.toString(), - newMsgIndex: event.rivetMessageIndex, - actorId, - }); + // TODO: Move this to the handler + // // Update msgIndex for next WebSocket open msgIndex restoration + // const oldMsgIndex = hibernatableWs.msgIndex; + // hibernatableWs.msgIndex = event.rivetMessageIndex; + // hibernatableWs.lastSeenTimestamp = Date.now(); + // + // logger().debug({ + // msg: "updated hibernatable websocket msgIndex in engine driver", + // requestId, + // oldMsgIndex: oldMsgIndex.toString(), + // newMsgIndex: event.rivetMessageIndex, + // actorId, + // }); } } else { // Warn if we receive a message for a hibernatable websocket but can't find the actor