From c0334fa5b049ba60504b72f7d18a00db32df819a Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Sat, 8 Nov 2025 16:47:40 -0800 Subject: [PATCH] chore(rivetkit): remove deprecated conn tokens & sse support --- .../src/frontend/App.tsx | 2 +- rivetkit-openapi/openapi.json | 2 +- rivetkit-rust/packages/client/src/common.rs | 4 +- .../cloudflare-workers/src/manager-driver.ts | 10 - .../packages/rivetkit/package.json | 2 +- .../rivetkit/schemas/actor-persist/v3.bare | 33 +++ .../rivetkit/schemas/client-protocol/v2.bare | 82 ++++++ .../rivetkit/src/actor/conn-drivers.ts | 45 --- .../rivetkit/src/actor/conn-socket.ts | 1 + .../packages/rivetkit/src/actor/conn.ts | 44 +-- .../packages/rivetkit/src/actor/errors.ts | 16 - .../packages/rivetkit/src/actor/instance.ts | 275 +++++------------- .../packages/rivetkit/src/actor/mod.ts | 11 +- .../packages/rivetkit/src/actor/persisted.ts | 1 - .../rivetkit/src/actor/protocol/old.ts | 7 - .../rivetkit/src/actor/router-endpoints.ts | 237 +-------------- .../packages/rivetkit/src/actor/router.ts | 95 +----- .../rivetkit/src/client/actor-conn.ts | 254 ++-------------- .../packages/rivetkit/src/client/client.ts | 4 - .../packages/rivetkit/src/client/config.ts | 3 - .../packages/rivetkit/src/client/mod.ts | 1 - .../packages/rivetkit/src/client/raw-utils.ts | 6 +- .../src/common/actor-router-consts.ts | 15 +- .../packages/rivetkit/src/common/log.ts | 2 +- .../rivetkit/src/driver-helpers/mod.ts | 11 +- .../rivetkit/src/driver-helpers/utils.ts | 3 +- .../rivetkit/src/driver-test-suite/mod.ts | 53 +--- .../test-inline-client-driver.ts | 54 +--- .../tests/actor-conn-state.ts | 5 +- .../src/driver-test-suite/tests/actor-conn.ts | 6 +- .../driver-test-suite/tests/actor-driver.ts | 7 - .../tests/raw-http-direct-registry.ts | 14 +- .../tests/raw-websocket-direct-registry.ts | 14 +- .../driver-test-suite/tests/request-access.ts | 4 +- .../rivetkit/src/driver-test-suite/utils.ts | 9 +- .../src/drivers/engine/actor-driver.ts | 17 +- .../src/drivers/file-system/manager.ts | 24 +- .../packages/rivetkit/src/manager/driver.ts | 4 - .../packages/rivetkit/src/manager/gateway.ts | 24 +- .../rivetkit/src/manager/protocol/mod.ts | 2 - .../rivetkit/src/manager/protocol/query.ts | 4 - .../packages/rivetkit/src/manager/router.ts | 39 +-- .../packages/rivetkit/src/mod.ts | 1 - .../packages/rivetkit/src/registry/mod.ts | 1 - .../actor-websocket-client.ts | 17 +- .../rivetkit/src/remote-manager-driver/mod.ts | 8 - .../rivetkit/src/schemas/actor-persist/mod.ts | 2 +- .../src/schemas/actor-persist/versioned.ts | 110 +++++-- .../src/schemas/client-protocol/mod.ts | 2 +- .../src/schemas/client-protocol/versioned.ts | 70 +++-- .../rivetkit/tests/driver-engine.test.ts | 2 - .../actors/fetch-and-websocket-handler.mdx | 4 +- 52 files changed, 436 insertions(+), 1227 deletions(-) create mode 100644 rivetkit-typescript/packages/rivetkit/schemas/actor-persist/v3.bare create mode 100644 rivetkit-typescript/packages/rivetkit/schemas/client-protocol/v2.bare diff --git a/examples/cursors-raw-websocket/src/frontend/App.tsx b/examples/cursors-raw-websocket/src/frontend/App.tsx index d34ad3b956..c79fbb3573 100644 --- a/examples/cursors-raw-websocket/src/frontend/App.tsx +++ b/examples/cursors-raw-websocket/src/frontend/App.tsx @@ -90,7 +90,7 @@ export function App() { console.log("found actor", actorId); const wsOrigin = rivetUrl.replace(/^http/, "ws"); - const wsUrl = `${wsOrigin}/gateway/${actorId}/raw/websocket?sessionId=${encodeURIComponent(sessionId)}`; + const wsUrl = `${wsOrigin}/gateway/${actorId}/websocket?sessionId=${encodeURIComponent(sessionId)}`; console.log("ws url:", wsUrl); diff --git a/rivetkit-openapi/openapi.json b/rivetkit-openapi/openapi.json index 55f4224dad..6f7096cccc 100644 --- a/rivetkit-openapi/openapi.json +++ b/rivetkit-openapi/openapi.json @@ -1,7 +1,7 @@ { "openapi": "3.0.0", "info": { - "version": "2.0.22", + "version": "2.0.24-rc.1", "title": "RivetKit API" }, "components": { diff --git a/rivetkit-rust/packages/client/src/common.rs b/rivetkit-rust/packages/client/src/common.rs index fca42ca950..6187828cc6 100644 --- a/rivetkit-rust/packages/client/src/common.rs +++ b/rivetkit-rust/packages/client/src/common.rs @@ -21,7 +21,7 @@ pub const HEADER_RIVET_ACTOR: &str = "x-rivet-actor"; pub const HEADER_RIVET_TOKEN: &str = "x-rivet-token"; // Paths -pub const PATH_CONNECT_WEBSOCKET: &str = "/connect/websocket"; +pub const PATH_CONNECT_WEBSOCKET: &str = "/connect"; // WebSocket protocol prefixes pub const WS_PROTOCOL_STANDARD: &str = "rivet"; @@ -63,4 +63,4 @@ impl ToString for EncodingKind { // Max size of each entry is 128 bytes -pub type ActorKey = Vec; \ No newline at end of file +pub type ActorKey = Vec; diff --git a/rivetkit-typescript/packages/cloudflare-workers/src/manager-driver.ts b/rivetkit-typescript/packages/cloudflare-workers/src/manager-driver.ts index 7acbafa120..43013b8825 100644 --- a/rivetkit-typescript/packages/cloudflare-workers/src/manager-driver.ts +++ b/rivetkit-typescript/packages/cloudflare-workers/src/manager-driver.ts @@ -10,9 +10,7 @@ import { type ManagerDisplayInformation, type ManagerDriver, WS_PROTOCOL_ACTOR, - WS_PROTOCOL_CONN_ID, WS_PROTOCOL_CONN_PARAMS, - WS_PROTOCOL_CONN_TOKEN, WS_PROTOCOL_ENCODING, WS_PROTOCOL_STANDARD, WS_PROTOCOL_TARGET, @@ -76,8 +74,6 @@ export class CloudflareActorsManagerDriver implements ManagerDriver { actorId: string, encoding: Encoding, params: unknown, - connId?: string, - connToken?: string, ): Promise { const env = getCloudflareAmbientEnv(); @@ -101,12 +97,6 @@ export class CloudflareActorsManagerDriver implements ManagerDriver { `${WS_PROTOCOL_CONN_PARAMS}${encodeURIComponent(JSON.stringify(params))}`, ); } - if (connId) { - protocols.push(`${WS_PROTOCOL_CONN_ID}${connId}`); - } - if (connToken) { - protocols.push(`${WS_PROTOCOL_CONN_TOKEN}${connToken}`); - } const headers: Record = { Upgrade: "websocket", diff --git a/rivetkit-typescript/packages/rivetkit/package.json b/rivetkit-typescript/packages/rivetkit/package.json index 8300061401..79635eb3c5 100644 --- a/rivetkit-typescript/packages/rivetkit/package.json +++ b/rivetkit-typescript/packages/rivetkit/package.json @@ -153,7 +153,7 @@ ], "scripts": { "build": "tsup src/mod.ts src/client/mod.ts src/common/log.ts src/common/websocket.ts src/actor/errors.ts src/topologies/coordinate/mod.ts src/topologies/partition/mod.ts src/utils.ts src/driver-helpers/mod.ts src/driver-test-suite/mod.ts src/test/mod.ts src/inspector/mod.ts", - "build:schema": "./scripts/compile-bare.ts compile schemas/client-protocol/v1.bare -o dist/schemas/client-protocol/v1.ts && ./scripts/compile-bare.ts compile schemas/file-system-driver/v1.bare -o dist/schemas/file-system-driver/v1.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v1.bare -o dist/schemas/actor-persist/v1.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v2.bare -o dist/schemas/actor-persist/v2.ts", + "build:schema": "./scripts/compile-bare.ts compile schemas/client-protocol/v1.bare -o dist/schemas/client-protocol/v1.ts && ./scripts/compile-bare.ts compile schemas/client-protocol/v2.bare -o dist/schemas/client-protocol/v2.ts && ./scripts/compile-bare.ts compile schemas/file-system-driver/v1.bare -o dist/schemas/file-system-driver/v1.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v1.bare -o dist/schemas/actor-persist/v1.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v2.bare -o dist/schemas/actor-persist/v2.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v3.bare -o dist/schemas/actor-persist/v3.ts", "check-types": "tsc --noEmit", "test": "vitest run", "test:watch": "vitest", diff --git a/rivetkit-typescript/packages/rivetkit/schemas/actor-persist/v3.bare b/rivetkit-typescript/packages/rivetkit/schemas/actor-persist/v3.bare new file mode 100644 index 0000000000..1a362c9794 --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/schemas/actor-persist/v3.bare @@ -0,0 +1,33 @@ +# MARK: Connection +# Connection associated with hibernatable WebSocket that should persist across lifecycles. +type PersistedHibernatableConn struct { + # Connection ID generated by RivetKit + id: str + parameters: data + state: data + + # Request ID of the hibernatable WebSocket + hibernatableRequestId: data + # Last seen message from this WebSocket + lastSeenTimestamp: i64 + # Last seem message index for this WebSocket + msgIndex: i64 +} + +# MARK: Schedule Event +type PersistedScheduleEvent struct { + eventId: str + timestamp: i64 + action: str + args: optional +} + +# MARK: Actor +type PersistedActor struct { + # Input data passed to the actor on initialization + input: optional + hasInitialized: bool + state: data + hibernatableConns: list + scheduledEvents: list +} diff --git a/rivetkit-typescript/packages/rivetkit/schemas/client-protocol/v2.bare b/rivetkit-typescript/packages/rivetkit/schemas/client-protocol/v2.bare new file mode 100644 index 0000000000..003eeff50a --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/schemas/client-protocol/v2.bare @@ -0,0 +1,82 @@ +# MARK: Message To Client +type Init struct { + actorId: str + connectionId: str +} + +type Error struct { + group: str + code: str + message: str + metadata: optional + actionId: optional +} + +type ActionResponse struct { + id: uint + output: data +} + +type Event struct { + name: str + # CBOR array + args: data +} + +type ToClientBody union { + Init | + Error | + ActionResponse | + Event +} + +type ToClient struct { + body: ToClientBody +} + +# MARK: Message To Server +type ActionRequest struct { + id: uint + name: str + # CBOR array + args: data +} + +type SubscriptionRequest struct { + eventName: str + subscribe: bool +} + +type ToServerBody union { + ActionRequest | + SubscriptionRequest +} + +type ToServer struct { + body: ToServerBody +} + +# MARK: HTTP Action +type HttpActionRequest struct { + # CBOR array + args: data +} + +type HttpActionResponse struct { + output: data +} + +# MARK: HTTP Error +type HttpResponseError struct { + group: str + code: str + message: str + metadata: optional +} + +# MARK: HTTP Resolve +type HttpResolveRequest void + +type HttpResolveResponse struct { + actorId: str +} diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/conn-drivers.ts b/rivetkit-typescript/packages/rivetkit/src/actor/conn-drivers.ts index ff7fae5016..9b8d26a354 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/conn-drivers.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/conn-drivers.ts @@ -1,4 +1,3 @@ -import type { SSEStreamingApi } from "hono/streaming"; import type { WSContext } from "hono/ws"; import type { WebSocket } from "ws"; import type { AnyConn } from "@/actor/conn"; @@ -11,7 +10,6 @@ import { assertUnreachable, type promiseWithResolvers } from "@/utils"; export enum ConnDriverKind { WEBSOCKET = 0, - SSE = 1, HTTP = 2, } @@ -29,16 +27,10 @@ export interface ConnDriverWebSocketState { closePromise: ReturnType>; } -export interface ConnDriverSseState { - encoding: Encoding; - stream: SSEStreamingApi; -} - export type ConnDriverHttpState = Record; export type ConnDriverState = | { [ConnDriverKind.WEBSOCKET]: ConnDriverWebSocketState } - | { [ConnDriverKind.SSE]: ConnDriverSseState } | { [ConnDriverKind.HTTP]: ConnDriverHttpState }; export interface ConnDriver { @@ -152,41 +144,6 @@ const WEBSOCKET_DRIVER: ConnDriver = { }, }; -// MARK: SSE -const SSE_DRIVER: ConnDriver = { - sendMessage: ( - _actor: AnyActorInstance, - _conn: AnyConn, - state: ConnDriverSseState, - message: CachedSerializer, - ) => { - state.stream.writeSSE({ - data: encodeDataToString(message.serialize(state.encoding)), - }); - }, - - disconnect: async ( - _actor: AnyActorInstance, - _conn: AnyConn, - state: ConnDriverSseState, - _reason?: string, - ) => { - state.stream.close(); - }, - - getConnectionReadyState: ( - _actor: AnyActorInstance, - _conn: AnyConn, - state: ConnDriverSseState, - ): ConnReadyState | undefined => { - if (state.stream.aborted || state.stream.closed) { - return ConnReadyState.CLOSED; - } - - return ConnReadyState.OPEN; - }, -}; - // MARK: HTTP const HTTP_DRIVER: ConnDriver = { getConnectionReadyState(_actor, _conn) { @@ -202,7 +159,6 @@ const HTTP_DRIVER: ConnDriver = { /** List of all connection drivers. */ export const CONN_DRIVERS: Record> = { [ConnDriverKind.WEBSOCKET]: WEBSOCKET_DRIVER, - [ConnDriverKind.SSE]: SSE_DRIVER, [ConnDriverKind.HTTP]: HTTP_DRIVER, }; @@ -210,7 +166,6 @@ export function getConnDriverKindFromState( state: ConnDriverState, ): ConnDriverKind { if (ConnDriverKind.WEBSOCKET in state) return ConnDriverKind.WEBSOCKET; - else if (ConnDriverKind.SSE in state) return ConnDriverKind.SSE; else if (ConnDriverKind.HTTP in state) return ConnDriverKind.HTTP; else assertUnreachable(state); } diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/conn-socket.ts b/rivetkit-typescript/packages/rivetkit/src/actor/conn-socket.ts index c4157d216b..79f0745392 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/conn-socket.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/conn-socket.ts @@ -1,6 +1,7 @@ import type { ConnDriverState } from "./conn-drivers"; export interface ConnSocket { + /** This is the request ID provided by the given framework. If not provided this is a random UUID. */ requestId: string; requestIdBuf?: ArrayBuffer; hibernatable: boolean; diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/conn.ts b/rivetkit-typescript/packages/rivetkit/src/actor/conn.ts index c771e14289..9f108c0b2c 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/conn.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/conn.ts @@ -1,14 +1,10 @@ import * as cbor from "cbor-x"; -import invariant from "invariant"; -import { PersistedHibernatableWebSocket } from "@/schemas/actor-persist/mod"; import type * as protocol from "@/schemas/client-protocol/mod"; import { TO_CLIENT_VERSIONED } from "@/schemas/client-protocol/versioned"; import { arrayBuffersEqual, bufferToArrayBuffer } from "@/utils"; import { CONN_DRIVERS, - ConnDriverKind, type ConnDriverState, - ConnReadyState, getConnDriverKindFromState, } from "./conn-drivers"; import type { ConnSocket } from "./conn-socket"; @@ -17,15 +13,6 @@ import * as errors from "./errors"; import { type ActorInstance, PERSIST_SYMBOL } from "./instance"; import type { PersistedConn } from "./persisted"; import { CachedSerializer } from "./protocol/serde"; -import { generateSecureToken } from "./utils"; - -export function generateConnId(): string { - return crypto.randomUUID(); -} - -export function generateConnToken(): string { - return generateSecureToken(32); -} export function generateConnRequestId(): string { return crypto.randomUUID(); @@ -35,8 +22,6 @@ export type ConnId = string; export type AnyConn = Conn; -export type ConnectionStatus = "connected" | "reconnecting"; - /** * Represents a client connection to a actor. * @@ -53,7 +38,11 @@ export class Conn { /** * The proxied state that notifies of changes automatically. * - * Any data that should be stored indefinitely should be held within this object. + * Any data that should be stored indefinitely should be held within this + * object. + * + * This will only be persisted if using hibernatable WebSockets. If not, + * this is just used to hole state. */ __persist: PersistedConn; @@ -68,15 +57,6 @@ export class Conn { */ __socket?: ConnSocket; - get __status(): ConnectionStatus { - // TODO: isHibernatible might be true while the actual hibernatable websocket has disconnected - if (this.__socket || this.isHibernatable) { - return "connected"; - } else { - return "reconnecting"; - } - } - public get params(): CP { return this.__persist.params; } @@ -113,20 +93,6 @@ export class Conn { return this.__persist.connId; } - /** - * Token used to authenticate this request. - */ - public get _token(): string { - return this.__persist.token; - } - - /** - * Status of the connection. - */ - public get status(): ConnectionStatus { - return this.__status; - } - /** * @experimental * diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/errors.ts b/rivetkit-typescript/packages/rivetkit/src/actor/errors.ts index fdf3a3f35d..07a517dbd0 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/errors.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/errors.ts @@ -133,22 +133,6 @@ export class InvalidEncoding extends ActorError { } } -export class ConnNotFound extends ActorError { - constructor(id?: string) { - super("connection", "not_found", `Connection not found for ID: ${id}`, { - public: true, - }); - } -} - -export class IncorrectConnToken extends ActorError { - constructor() { - super("connection", "incorrect_token", "Incorrect connection token.", { - public: true, - }); - } -} - export class MessageTooLong extends ActorError { constructor() { super( diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/instance.ts b/rivetkit-typescript/packages/rivetkit/src/actor/instance.ts index e84786de7f..12dd4ff854 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/instance.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/instance.ts @@ -25,13 +25,7 @@ import { } from "@/utils"; import { ActionContext } from "./action"; import type { ActorConfig, OnConnectOptions } from "./config"; -import { - Conn, - type ConnId, - generateConnId, - generateConnRequestId, - generateConnToken, -} from "./conn"; +import { Conn, type ConnId, generateConnRequestId } from "./conn"; import { CONN_DRIVERS, type ConnDriver, @@ -200,7 +194,6 @@ export class ActorInstance { #connections = new Map>(); #subscriptionIndex = new Map>>(); - #checkConnLivenessInterval?: NodeJS.Timeout; #sleepTimeout?: NodeJS.Timeout; @@ -240,7 +233,6 @@ export class ActorInstance { id, params: conn.params as any, state: conn.__stateEnabled ? conn.state : undefined, - status: conn.status, subscriptions: conn.subscriptions.size, lastSeen: conn.lastSeen, stateEnabled: conn.__stateEnabled, @@ -428,25 +420,6 @@ export class ActorInstance { // Must be called after setting `#ready` or else it will not schedule sleep this.#resetSleepTimer(); - // Start conn liveness interval - // - // Check for liveness immediately since we may have connections that - // were in `reconnecting` state when the actor went to sleep that we - // need to purge. - // - // We don't use alarms for connection liveness since alarms require - // durability & are expensive. Connection liveness is safe to assume - // it only needs to be ran while the actor is awake and does not need - // to manually wake the actor. The only case this is not true is if the - // connection liveness timeout is greater than the actor sleep timeout - // OR if the actor is manually put to sleep. In this case, the connections - // will be stuck in a `reconnecting` state until the actor is awaken again. - this.#checkConnLivenessInterval = setInterval( - this.#checkConnectionsLiveness.bind(this), - this.#config.options.connectionLivenessInterval, - ); - this.#checkConnectionsLiveness(); - // Trigger any pending alarms await this._onAlarm(); } @@ -904,7 +877,7 @@ export class ActorInstance { } /** - * Call when conn is disconnected. Used by transports. + * Call when conn is disconnected. * * If a clean diconnect, will be removed immediately. * @@ -1016,11 +989,10 @@ export class ActorInstance { // biome-ignore lint/suspicious/noExplicitAny: TypeScript bug with ExtractActorConnParams, params: any, request?: Request, - connectionId?: string, - connectionToken?: string, ): Promise> { this.#assertReady(); + // TODO: Remove this for ws hibernation v2 since we don't receive an open message for ws // Check for hibernatable websocket reconnection if (socket.requestIdBuf && socket.hibernatable) { this.rLog.debug({ @@ -1089,80 +1061,6 @@ export class ActorInstance { } } - // If connection ID and token are provided, try to reconnect - if (connectionId && connectionToken) { - this.rLog.debug({ - msg: "checking for existing connection", - connectionId, - }); - const existingConn = this.#connections.get(connectionId); - if (existingConn && existingConn._token === connectionToken) { - // This is a valid reconnection - this.rLog.debug({ - msg: "reconnecting existing connection", - connectionId, - }); - - // If there's an existing driver state, clean it up without marking as clean disconnect - if (existingConn.__driverState) { - const driverKind = getConnDriverKindFromState( - existingConn.__driverState, - ); - const driver = CONN_DRIVERS[driverKind]; - if (driver.disconnect) { - // Call driver disconnect to clean up directly. Don't use Conn.disconnect since that will remove the connection entirely. - driver.disconnect( - this, - existingConn, - (existingConn.__driverState as any)[driverKind], - "Reconnecting with new driver state", - ); - } - } - - // Update with new driver state - existingConn.__socket = socket; - existingConn.__persist.lastSeen = Date.now(); - - // Update sleep timer since connection is now active - this.#resetSleepTimer(); - - this.inspector.emitter.emit("connectionUpdated"); - - // Send init message for reconnection - existingConn._sendMessage( - new CachedSerializer( - { - body: { - tag: "Init", - val: { - actorId: this.id, - connectionId: existingConn.id, - connectionToken: existingConn._token, - }, - }, - }, - TO_CLIENT_VERSIONED, - ), - ); - - return existingConn; - } else { - this.rLog.debug({ - msg: "connection not found or token mismatch, creating new connection", - connectionId, - }); - } - } - - // Generate new connection ID and token if not provided or if reconnection failed - const newConnId = generateConnId(); - const newConnToken = generateConnToken(); - - if (this.#connections.has(newConnId)) { - throw new Error(`Connection already exists: ${newConnId}`); - } - // Prepare connection state let connState: CS | undefined; @@ -1211,8 +1109,7 @@ export class ActorInstance { // Create connection const persist: PersistedConn = { - connId: newConnId, - token: newConnToken, + connId: crypto.randomUUID(), params: params, state: connState as CS, lastSeen: Date.now(), @@ -1280,7 +1177,6 @@ export class ActorInstance { val: { actorId: this.id, connectionId: conn.id, - connectionToken: conn._token, }, }, }, @@ -1409,63 +1305,6 @@ export class ActorInstance { throw new errors.InternalError("Actor is stopping"); } - /** - * Check the liveness of all connections. - * Sets up a recurring check based on the configured interval. - */ - #checkConnectionsLiveness() { - this.#rLog.debug({ msg: "checking connections liveness" }); - - let connected = 0; - let reconnecting = 0; - let removed = 0; - for (const conn of this.#connections.values()) { - if (conn.__status === "connected") { - connected += 1; - this.#rLog.debug({ - msg: "connection is alive", - connId: conn.id, - }); - } else { - reconnecting += 1; - - const lastSeen = conn.__persist.lastSeen; - const sinceLastSeen = Date.now() - lastSeen; - if ( - sinceLastSeen < - this.#config.options.connectionLivenessTimeout - ) { - this.#rLog.debug({ - msg: "connection might be alive, will check later", - connId: conn.id, - lastSeen, - sinceLastSeen, - }); - continue; - } - - // Connection is dead, remove it - this.#rLog.info({ - msg: "connection is dead, removing", - connId: conn.id, - lastSeen, - }); - - // Assume that the connection is dead here, no need to disconnect anything - removed += 1; - this.#removeConn(conn); - } - } - - this.#rLog.debug({ - msg: "checked connection liveness", - total: connected + reconnecting, - connected, - reconnecting, - removed, - }); - } - /** * Check if the actor is ready to handle requests. */ @@ -2070,10 +1909,11 @@ export class ActorInstance { // Check for active conns. This will also cover active actions, since all actions have a connection. for (const conn of this.#connections.values()) { // TODO: Enable this when hibernation is implemented. We're waiting on support for Guard to not auto-wake the actor if it sleeps. - // if (conn.status === "connected" && !conn.isHibernatable) + // if (!conn.isHibernatable) // return false; - if (conn.status === "connected") return CanSleep.ActiveConns; + // if (!conn.isHibernatable) return CanSleep.ActiveConns; + return CanSleep.ActiveConns; } return CanSleep.Yes; @@ -2196,8 +2036,6 @@ export class ActorInstance { // Clear timeouts if (this.#pendingSaveTimeout) clearTimeout(this.#pendingSaveTimeout); - if (this.#checkConnLivenessInterval) - clearInterval(this.#checkConnLivenessInterval); // Write state await this.saveState({ immediate: true, allowStoppingState: true }); @@ -2259,6 +2097,36 @@ export class ActorInstance { #convertToBarePersisted( persist: PersistedActor, ): bareSchema.PersistedActor { + // Merge connections with hibernatableWebSocket data into hibernatableConns + const hibernatableConns: bareSchema.PersistedHibernatableConn[] = []; + + for (const conn of persist.connections) { + if (conn.hibernatableRequestId) { + // Find matching hibernatable WebSocket + const ws = persist.hibernatableWebSocket.find((ws) => + arrayBuffersEqual( + ws.requestId, + conn.hibernatableRequestId!, + ), + ); + + if (ws) { + hibernatableConns.push({ + id: conn.connId, + parameters: bufferToArrayBuffer( + cbor.encode(conn.params || {}), + ), + state: bufferToArrayBuffer( + cbor.encode(conn.state || {}), + ), + hibernatableRequestId: conn.hibernatableRequestId, + lastSeenTimestamp: ws.lastSeenTimestamp, + msgIndex: ws.msgIndex, + }); + } + } + } + return { input: persist.input !== undefined @@ -2266,32 +2134,12 @@ export class ActorInstance { : null, hasInitialized: persist.hasInitiated, state: bufferToArrayBuffer(cbor.encode(persist.state)), - connections: persist.connections.map((conn) => ({ - id: conn.connId, - token: conn.token, - parameters: bufferToArrayBuffer(cbor.encode(conn.params || {})), - state: bufferToArrayBuffer(cbor.encode(conn.state || {})), - subscriptions: conn.subscriptions.map((sub) => ({ - eventName: sub.eventName, - })), - lastSeen: BigInt(conn.lastSeen), - hibernatableRequestId: conn.hibernatableRequestId ?? null, - })), + hibernatableConns, scheduledEvents: persist.scheduledEvents.map((event) => ({ eventId: event.eventId, timestamp: BigInt(event.timestamp), - kind: { - tag: "GenericPersistedScheduleEvent" as const, - val: { - action: event.kind.generic.actionName, - args: event.kind.generic.args ?? null, - }, - }, - })), - hibernatableWebSocket: persist.hibernatableWebSocket.map((ws) => ({ - requestId: ws.requestId, - lastSeenTimestamp: ws.lastSeenTimestamp, - msgIndex: ws.msgIndex, + action: event.kind.generic.actionName, + args: event.kind.generic.args ?? null, })), }; } @@ -2299,38 +2147,45 @@ export class ActorInstance { #convertFromBarePersisted( bareData: bareSchema.PersistedActor, ): PersistedActor { + // Split hibernatableConns back into connections and hibernatableWebSocket + const connections: PersistedConn[] = []; + const hibernatableWebSocket: PersistedHibernatableWebSocket[] = []; + + for (const conn of bareData.hibernatableConns) { + connections.push({ + connId: conn.id, + params: cbor.decode(new Uint8Array(conn.parameters)), + state: cbor.decode(new Uint8Array(conn.state)), + subscriptions: [], + lastSeen: 0, // Will be set from lastSeenTimestamp + hibernatableRequestId: conn.hibernatableRequestId, + }); + + hibernatableWebSocket.push({ + requestId: conn.hibernatableRequestId, + lastSeenTimestamp: conn.lastSeenTimestamp, + msgIndex: conn.msgIndex, + }); + } + return { input: bareData.input ? cbor.decode(new Uint8Array(bareData.input)) : undefined, hasInitiated: bareData.hasInitialized, state: cbor.decode(new Uint8Array(bareData.state)), - connections: bareData.connections.map((conn) => ({ - connId: conn.id, - token: conn.token, - params: cbor.decode(new Uint8Array(conn.parameters)), - state: cbor.decode(new Uint8Array(conn.state)), - subscriptions: conn.subscriptions.map((sub) => ({ - eventName: sub.eventName, - })), - lastSeen: Number(conn.lastSeen), - hibernatableRequestId: conn.hibernatableRequestId ?? undefined, - })), + connections, scheduledEvents: bareData.scheduledEvents.map((event) => ({ eventId: event.eventId, timestamp: Number(event.timestamp), kind: { generic: { - actionName: event.kind.val.action, - args: event.kind.val.args, + actionName: event.action, + args: event.args, }, }, })), - hibernatableWebSocket: bareData.hibernatableWebSocket.map((ws) => ({ - requestId: ws.requestId, - lastSeenTimestamp: ws.lastSeenTimestamp, - msgIndex: ws.msgIndex, - })), + hibernatableWebSocket, }; } } diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/mod.ts b/rivetkit-typescript/packages/rivetkit/src/actor/mod.ts index 242291c019..4f934113a1 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/mod.ts @@ -55,8 +55,8 @@ export function actor< export type { Encoding } from "@/actor/protocol/serde"; export { ALLOWED_PUBLIC_HEADERS, - PATH_CONNECT_WEBSOCKET, - PATH_RAW_WEBSOCKET_PREFIX, + PATH_CONNECT, + PATH_WEBSOCKET_PREFIX, } from "@/common/actor-router-consts"; export type { UniversalErrorEvent, @@ -74,12 +74,7 @@ export type { export type { ActorKey } from "@/manager/protocol/query"; export type { ActionContext } from "./action"; export type * from "./config"; -export type { - Conn, - ConnectionStatus, - generateConnId, - generateConnToken, -} from "./conn"; +export type { Conn } from "./conn"; export type { ActorContext } from "./context"; export type { ActionContextOf, diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/persisted.ts b/rivetkit-typescript/packages/rivetkit/src/actor/persisted.ts index 762739b7f5..e236b47e67 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/persisted.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/persisted.ts @@ -11,7 +11,6 @@ export interface PersistedActor { /** Object representing connection that gets persisted to storage. */ export interface PersistedConn { connId: string; - token: string; params: CP; state: CS; subscriptions: PersistedSubscription[]; diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/protocol/old.ts b/rivetkit-typescript/packages/rivetkit/src/actor/protocol/old.ts index 364ce69423..2bb281e8cb 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/protocol/old.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/protocol/old.ts @@ -19,13 +19,6 @@ import { ActionContext } from "../action"; import type { Conn } from "../conn"; import type { ActorInstance } from "../instance"; -export const TransportSchema = z.enum(["websocket", "sse"]); - -/** - * Transport mechanism used to communicate between client & actor. - */ -export type Transport = z.infer; - interface MessageEventOpts { encoding: Encoding; maxIncomingMessageSize: number; diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/router-endpoints.ts b/rivetkit-typescript/packages/rivetkit/src/actor/router-endpoints.ts index 38fc64cd89..163592cc8e 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/router-endpoints.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/router-endpoints.ts @@ -1,15 +1,8 @@ import * as cbor from "cbor-x"; import type { Context as HonoContext, HonoRequest } from "hono"; -import { type SSEStreamingApi, streamSSE } from "hono/streaming"; import type { WSContext } from "hono/ws"; -import invariant from "invariant"; import { ActionContext } from "@/actor/action"; -import type { AnyConn } from "@/actor/conn"; -import { - generateConnId, - generateConnRequestId, - generateConnToken, -} from "@/actor/conn"; +import { type AnyConn, generateConnRequestId } from "@/actor/conn"; import { ConnDriverKind } from "@/actor/conn-drivers"; import * as errors from "@/actor/errors"; import { type AnyActorInstance, PERSIST_SYMBOL } from "@/actor/instance"; @@ -17,9 +10,7 @@ import type { InputData } from "@/actor/protocol/serde"; import { type Encoding, EncodingSchema } from "@/actor/protocol/serde"; import { HEADER_ACTOR_QUERY, - HEADER_CONN_ID, HEADER_CONN_PARAMS, - HEADER_CONN_TOKEN, HEADER_ENCODING, } from "@/common/actor-router-consts"; import type { UpgradeWebSocketArgs } from "@/common/inline-websocket-adapter2"; @@ -47,8 +38,6 @@ import type { ActorDriver } from "./driver"; import { loggerWithoutContext } from "./log"; import { parseMessage } from "./protocol/old"; -export const SSE_PING_INTERVAL = 1000; - export interface ConnectWebSocketOpts { req?: HonoRequest; encoding: Encoding; @@ -62,18 +51,6 @@ export interface ConnectWebSocketOutput { onClose: () => void; } -export interface ConnectSseOpts { - req?: HonoRequest; - encoding: Encoding; - params: unknown; - actorId: string; -} - -export interface ConnectSseOutput { - onOpen: (stream: SSEStreamingApi) => void; - onClose: () => Promise; -} - export interface ActionOpts { req?: HonoRequest; params: unknown; @@ -89,7 +66,6 @@ export interface ActionOutput { export interface ConnsMessageOpts { req?: HonoRequest; connId: string; - connToken: string; message: protocol.ToServer; actorId: string; } @@ -117,8 +93,6 @@ export async function handleWebSocketConnect( parameters: unknown, requestId: string, requestIdBuf: ArrayBuffer | undefined, - connId: string | undefined, - connToken: string | undefined, ): Promise { const exposeInternalError = req ? getRequestExposeInternalError(req) @@ -176,12 +150,8 @@ export async function handleWebSocketConnect( try { let conn: AnyConn; - // Create or reconnect connection actor.rLog.debug({ - msg: connId - ? "websocket reconnection attempt" - : "new websocket connection", - connId, + msg: "new websocket connection", actorId, }); @@ -208,8 +178,6 @@ export async function handleWebSocketConnect( }, parameters, req, - connId, - connToken, ); // Store connection so we can clean on close @@ -339,122 +307,6 @@ export async function handleWebSocketConnect( }; } -/** - * Creates an SSE connection handler - */ -export async function handleSseConnect( - c: HonoContext, - _runConfig: RunnerConfig, - actorDriver: ActorDriver, - actorId: string, -) { - c.header("Content-Encoding", "Identity"); - - const encoding = getRequestEncoding(c.req); - const parameters = getRequestConnParams(c.req); - const requestId = generateConnRequestId(); - - // Check for reconnection parameters - const connId = c.req.header(HEADER_CONN_ID); - const connToken = c.req.header(HEADER_CONN_TOKEN); - - // Return the main handler with all async work inside - return streamSSE(c, async (stream) => { - let actor: AnyActorInstance | undefined; - let conn: AnyConn | undefined; - - try { - // Do all async work inside the handler - actor = await actorDriver.loadActor(actorId); - - // Create or reconnect connection - actor.rLog.debug({ - msg: connId ? "sse reconnection attempt" : "sse open", - connId, - }); - - conn = await actor.createConn( - { - requestId: requestId, - hibernatable: false, - driverState: { - [ConnDriverKind.SSE]: { - encoding, - stream: stream, - }, - }, - }, - parameters, - c.req.raw, - connId, - connToken, - ); - - // Wait for close - const abortResolver = promiseWithResolvers(); - - // HACK: This is required so the abort handler below works - // - // See https://github.com/honojs/hono/issues/1770#issuecomment-2461966225 - stream.onAbort(() => {}); - - // Handle stream abort (when client closes the connection) - c.req.raw.signal.addEventListener("abort", async () => { - invariant(actor, "actor should exist"); - const rLog = actor.rLog ?? loggerWithoutContext(); - try { - rLog.debug("sse stream aborted"); - - // Cleanup - if (conn) { - actor.__connDisconnected(conn, false, requestId); - } - - abortResolver.resolve(undefined); - } catch (error) { - rLog.error({ msg: "error closing sse connection", error }); - abortResolver.resolve(undefined); - } - }); - - // // HACK: Will throw if not configured - // try { - // c.executionCtx.waitUntil(abortResolver.promise); - // } catch {} - - // Send ping every second to keep the connection alive - // - // NOTE: This is required on Cloudflare Workers in order to detect when the connection is closed - while (true) { - if (stream.closed || stream.aborted) { - actor?.rLog.debug({ - msg: "sse stream closed", - closed: stream.closed, - aborted: stream.aborted, - }); - break; - } - - await stream.writeSSE({ event: "ping", data: "" }); - await stream.sleep(SSE_PING_INTERVAL); - } - } catch (error) { - loggerWithoutContext().error({ - msg: "error in sse connection", - error, - }); - - // Cleanup on error - if (conn && actor !== undefined) { - actor.__connDisconnected(conn, false, requestId); - } - - // Close the stream on error - stream.close(); - } - }); -} - /** * Creates an action handler */ @@ -524,83 +376,6 @@ export async function handleAction( }); } -/** - * Create a connection message handler - */ -export async function handleConnectionMessage( - c: HonoContext, - _runConfig: RunnerConfig, - actorDriver: ActorDriver, - connId: string, - connToken: string, - actorId: string, -) { - const encoding = getRequestEncoding(c.req); - - // Validate incoming request - const arrayBuffer = await c.req.arrayBuffer(); - const message = deserializeWithEncoding( - encoding, - new Uint8Array(arrayBuffer), - TO_SERVER_VERSIONED, - ); - - const actor = await actorDriver.loadActor(actorId); - - // Find connection - const conn = actor.conns.get(connId); - if (!conn) { - throw new errors.ConnNotFound(connId); - } - - // Authenticate connection - if (conn._token !== connToken) { - throw new errors.IncorrectConnToken(); - } - - // Process message - await actor.processMessage(message, conn); - - return c.json({}); -} - -export async function handleConnectionClose( - c: HonoContext, - _runConfig: RunnerConfig, - actorDriver: ActorDriver, - connId: string, - connToken: string, - actorId: string, -) { - const actor = await actorDriver.loadActor(actorId); - - // Find connection - const conn = actor.conns.get(connId); - if (!conn) { - throw new errors.ConnNotFound(connId); - } - - // Authenticate connection - if (conn._token !== connToken) { - throw new errors.IncorrectConnToken(); - } - - // Check if this is an SSE connection - if ( - !conn.__socket?.driverState || - !(ConnDriverKind.SSE in conn.__socket.driverState) - ) { - throw new errors.UserError( - "Connection close is only supported for SSE connections", - ); - } - - // Close the SSE connection - await conn.disconnect("Connection closed by client request"); - - return c.json({}); -} - export async function handleRawWebSocketHandler( req: Request | undefined, path: string, @@ -730,18 +505,18 @@ export function getRequestConnParams(req: HonoRequest): unknown { } /** - * Truncase the PATH_RAW_WEBSOCKET_PREFIX path prefix in order to pass a clean + * Truncase the PATH_WEBSOCKET_PREFIX path prefix in order to pass a clean * path to the onWebSocket handler. * * Example: - * - `/raw/websocket/foo` -> `/foo` - * - `/raw/websocket` -> `/` + * - `/websocket/foo` -> `/foo` + * - `/websocket` -> `/` */ export function truncateRawWebSocketPathPrefix(path: string): string { // Extract the path after prefix and preserve query parameters // Use URL API for cleaner parsing const url = new URL(path, "http://actor"); - const pathname = url.pathname.replace(/^\/raw\/websocket\/?/, "") || "/"; + const pathname = url.pathname.replace(/^\/websocket\/?/, "") || "/"; const normalizedPath = (pathname.startsWith("/") ? pathname : "/" + pathname) + url.search; diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/router.ts b/rivetkit-typescript/packages/rivetkit/src/actor/router.ts index 4cb69f177e..c347e98cfe 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/router.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/router.ts @@ -1,33 +1,21 @@ -import { Hono, type Context as HonoContext } from "hono"; +import { Hono } from "hono"; import invariant from "invariant"; import { EncodingSchema } from "@/actor/protocol/serde"; import { type ActionOpts, type ActionOutput, - type ConnectSseOpts, - type ConnectSseOutput, type ConnectWebSocketOpts, type ConnectWebSocketOutput, type ConnsMessageOpts, handleAction, - handleConnectionClose, - handleConnectionMessage, handleRawWebSocketHandler, - handleSseConnect, handleWebSocketConnect, } from "@/actor/router-endpoints"; import { - HEADER_CONN_ID, - HEADER_CONN_PARAMS, - HEADER_CONN_TOKEN, - HEADER_ENCODING, - PATH_CONNECT_WEBSOCKET, - PATH_RAW_WEBSOCKET_PREFIX, - WS_PROTOCOL_CONN_ID, + PATH_CONNECT, + PATH_WEBSOCKET_PREFIX, WS_PROTOCOL_CONN_PARAMS, - WS_PROTOCOL_CONN_TOKEN, WS_PROTOCOL_ENCODING, - WS_PROTOCOL_TOKEN, } from "@/common/actor-router-consts"; import { handleRouteError, @@ -50,8 +38,6 @@ import { loggerWithoutContext } from "./log"; export type { ConnectWebSocketOpts, ConnectWebSocketOutput, - ConnectSseOpts, - ConnectSseOutput, ActionOpts, ActionOutput, ConnsMessageOpts, @@ -114,25 +100,20 @@ export function createActorRouter( return c.text(`Connection not found: ${connId}`, 404); } - // Force close the websocket/SSE connection without clean shutdown + // Force close the connection without clean shutdown const driverState = conn.__driverState; if (driverState && ConnDriverKind.WEBSOCKET in driverState) { const ws = driverState[ConnDriverKind.WEBSOCKET].websocket; // Force close without sending close frame (ws.raw as any).terminate(); - } else if (driverState && ConnDriverKind.SSE in driverState) { - const stream = driverState[ConnDriverKind.SSE].stream; - - // Force close the SSE stream - stream.abort(); } return c.json({ success: true }); }); } - router.get(PATH_CONNECT_WEBSOCKET, async (c) => { + router.get(PATH_CONNECT, async (c) => { const upgradeWebSocket = runConfig.getUpgradeWebSocket?.(); if (upgradeWebSocket) { return upgradeWebSocket(async (c) => { @@ -140,8 +121,6 @@ export function createActorRouter( const protocols = c.req.header("sec-websocket-protocol"); let encodingRaw: string | undefined; let connParamsRaw: string | undefined; - let connIdRaw: string | undefined; - let connTokenRaw: string | undefined; if (protocols) { const protocolList = protocols @@ -160,16 +139,6 @@ export function createActorRouter( WS_PROTOCOL_CONN_PARAMS.length, ), ); - } else if (protocol.startsWith(WS_PROTOCOL_CONN_ID)) { - connIdRaw = protocol.substring( - WS_PROTOCOL_CONN_ID.length, - ); - } else if ( - protocol.startsWith(WS_PROTOCOL_CONN_TOKEN) - ) { - connTokenRaw = protocol.substring( - WS_PROTOCOL_CONN_TOKEN.length, - ); } } } @@ -188,22 +157,13 @@ export function createActorRouter( connParams, generateConnRequestId(), undefined, - connIdRaw, - connTokenRaw, ); })(c, noopNext()); } else { - return c.text( - "WebSockets are not enabled for this driver. Use SSE instead.", - 400, - ); + return c.text("WebSockets are not enabled for this driver.", 400); } }); - router.get("/connect/sse", async (c) => { - return handleSseConnect(c, runConfig, actorDriver, c.env.actorId); - }); - router.post("/action/:action", async (c) => { const actionName = c.req.param("action"); @@ -216,40 +176,8 @@ export function createActorRouter( ); }); - router.post("/connections/message", async (c) => { - const connId = c.req.header(HEADER_CONN_ID); - const connToken = c.req.header(HEADER_CONN_TOKEN); - if (!connId || !connToken) { - throw new Error("Missing required parameters"); - } - return handleConnectionMessage( - c, - runConfig, - actorDriver, - connId, - connToken, - c.env.actorId, - ); - }); - - router.post("/connections/close", async (c) => { - const connId = c.req.header(HEADER_CONN_ID); - const connToken = c.req.header(HEADER_CONN_TOKEN); - if (!connId || !connToken) { - throw new Error("Missing required parameters"); - } - return handleConnectionClose( - c, - runConfig, - actorDriver, - connId, - connToken, - c.env.actorId, - ); - }); - - // Raw HTTP endpoints - /http/* - router.all("/raw/http/*", async (c) => { + // Raw HTTP endpoints - /request/* + router.all("/request/*", async (c) => { const actor = await actorDriver.loadActor(c.env.actorId); // TODO: This is not a clean way of doing this since `/http/` might exist mid-path @@ -284,7 +212,7 @@ export function createActorRouter( }); // Raw WebSocket endpoint - /websocket/* - router.get(`${PATH_RAW_WEBSOCKET_PREFIX}*`, async (c) => { + router.get(`${PATH_WEBSOCKET_PREFIX}*`, async (c) => { const upgradeWebSocket = runConfig.getUpgradeWebSocket?.(); if (upgradeWebSocket) { return upgradeWebSocket(async (c) => { @@ -308,10 +236,7 @@ export function createActorRouter( ); })(c, noopNext()); } else { - return c.text( - "WebSockets are not enabled for this driver. Use SSE instead.", - 400, - ); + return c.text("WebSockets are not enabled for this driver.", 400); } }); diff --git a/rivetkit-typescript/packages/rivetkit/src/client/actor-conn.ts b/rivetkit-typescript/packages/rivetkit/src/client/actor-conn.ts index 09b380be51..73f8c16e5f 100644 --- a/rivetkit-typescript/packages/rivetkit/src/client/actor-conn.ts +++ b/rivetkit-typescript/packages/rivetkit/src/client/actor-conn.ts @@ -6,11 +6,9 @@ import type { AnyActorDefinition } from "@/actor/definition"; import { inputDataToBuffer } from "@/actor/protocol/old"; import { type Encoding, jsonStringifyCompat } from "@/actor/protocol/serde"; import { - HEADER_CONN_ID, HEADER_CONN_PARAMS, - HEADER_CONN_TOKEN, HEADER_ENCODING, - PATH_CONNECT_WEBSOCKET, + PATH_CONNECT, } from "@/common/actor-router-consts"; import { importEventSource } from "@/common/eventsource"; import type { @@ -40,7 +38,7 @@ import { } from "@/utils"; import type { ActorDefinitionActions } from "./actor-common"; import { queryActor } from "./actor-query"; -import { ACTOR_CONNS_SYMBOL, type ClientRaw, TRANSPORT_SYMBOL } from "./client"; +import { ACTOR_CONNS_SYMBOL, type ClientRaw } from "./client"; import * as errors from "./errors"; import { logger } from "./log"; import { @@ -79,10 +77,6 @@ export interface SendHttpMessageOpts { signal?: AbortSignal; } -export type ConnTransport = - | { websocket: UniversalWebSocket } - | { sse: UniversalEventSource }; - export const CONNECT_SYMBOL = Symbol("connect"); /** @@ -96,15 +90,10 @@ export class ActorConnRaw { /* Will be aborted on dispose. */ #abortController = new AbortController(); - /** If attempting to connect. Helpful for knowing if in a retry loop when reconnecting. */ #connecting = false; - // Connection info, used for reconnection and HTTP requests #actorId?: string; #connectionId?: string; - #connectionToken?: string; - - #transport?: ConnTransport; #messageQueue: protocol.ToServer[] = []; #actionsInFlight = new Map(); @@ -126,6 +115,8 @@ export class ActorConnRaw { /** Promise used to indicate the socket has connected successfully. This will be rejected if the connection fails. */ #onOpenPromise?: ReturnType>; + #websocket?: UniversalWebSocket; + #client: ClientRaw; #driver: ManagerDriver; #params: unknown; @@ -264,14 +255,7 @@ enc throw new Error("#onOpenPromise already defined"); this.#onOpenPromise = promiseWithResolvers(); - // Connect transport - if (this.#client[TRANSPORT_SYMBOL] === "websocket") { - await this.#connectWebSocket(); - } else if (this.#client[TRANSPORT_SYMBOL] === "sse") { - await this.#connectSse(); - } else { - assertUnreachable(this.#client[TRANSPORT_SYMBOL]); - } + await this.#connectWebSocket(); // Wait for result await this.#onOpenPromise.promise; @@ -287,31 +271,19 @@ enc this.#driver, ); - // Check if we have connection info for reconnection - const isReconnection = this.#connectionId && this.#connectionToken; - if (isReconnection) { - logger().debug({ - msg: "attempting websocket reconnection", - connectionId: this.#connectionId, - }); - } - const ws = await this.#driver.openWebSocket( - PATH_CONNECT_WEBSOCKET, + PATH_CONNECT, actorId, this.#encoding, this.#params, - // Pass connection ID and token for reconnection if available - isReconnection ? this.#connectionId : undefined, - isReconnection ? this.#connectionToken : undefined, ); logger().debug({ - msg: "transport set to new websocket", + msg: "opened websocket", connectionId: this.#connectionId, readyState: ws.readyState, messageQueueLength: this.#messageQueue.length, }); - this.#transport = { websocket: ws }; + this.#websocket = ws; ws.addEventListener("open", () => { logger().debug({ msg: "client websocket open", @@ -350,70 +322,6 @@ enc }); } - async #connectSse() { - const EventSource = await importEventSource(); - - // Get the actor ID - const { actorId } = await queryActor( - undefined, - this.#actorQuery, - this.#driver, - ); - logger().debug({ msg: "found actor for sse connection", actorId }); - invariant(actorId, "Missing actor ID"); - - logger().debug({ - msg: "opening sse connection", - actorId, - encoding: this.#encoding, - }); - - const isReconnection = this.#connectionId && this.#connectionToken; - - const eventSource = new EventSource("http://actor/connect/sse", { - fetch: (input, init) => { - return this.#driver.sendRequest( - actorId, - new Request(input, { - ...init, - headers: { - ...init?.headers, - "User-Agent": httpUserAgent(), - [HEADER_ENCODING]: this.#encoding, - ...(this.#params !== undefined - ? { - [HEADER_CONN_PARAMS]: JSON.stringify( - this.#params, - ), - } - : {}), - ...(isReconnection - ? { - [HEADER_CONN_ID]: this.#connectionId, - [HEADER_CONN_TOKEN]: - this.#connectionToken, - } - : {}), - }, - }), - ); - }, - }) as UniversalEventSource; - - this.#transport = { sse: eventSource }; - - eventSource.addEventListener("message", (ev: UniversalMessageEvent) => { - // Ignore pings - if (ev.type === "ping") return; - - this.#handleOnMessage(ev.data); - }); - - eventSource.addEventListener("error", (ev: UniversalErrorEvent) => { - this.#handleOnError(); - }); - } - /** Called by the onopen event from drivers. */ #handleOnOpen() { logger().debug({ @@ -470,10 +378,9 @@ enc ); if (response.body.tag === "Init") { - // Store connection info for reconnection + // Store connection info this.#actorId = response.body.val.actorId; this.#connectionId = response.body.val.connectionId; - this.#connectionToken = response.body.val.connectionToken; logger().trace({ msg: "received init message", actorId: this.#actorId, @@ -607,7 +514,7 @@ enc this.#actionsInFlight.clear(); } - this.#transport = undefined; + this.#websocket = undefined; // Automatically reconnect. Skip if already attempting to connect. if (!this.#disposed && !this.#connecting) { @@ -763,12 +670,8 @@ enc } let queueMessage = false; - if (!this.#transport) { - // No transport connected yet - logger().debug({ msg: "no transport, queueing message" }); - queueMessage = true; - } else if ("websocket" in this.#transport) { - const readyState = this.#transport.websocket.readyState; + if (this.#websocket) { + const readyState = this.#websocket.readyState; logger().debug({ msg: "websocket send attempt", readyState, @@ -791,7 +694,7 @@ enc message, TO_SERVER_VERSIONED, ); - this.#transport.websocket.send(messageSerialized); + this.#websocket.send(messageSerialized); logger().trace({ msg: "sent websocket message", len: messageLength(messageSerialized), @@ -813,15 +716,10 @@ enc }); queueMessage = true; } - } else if ("sse" in this.#transport) { - if (this.#transport.sse.readyState === 1) { - // Spawn in background since #sendMessage cannot be async - this.#sendHttpMessage(message, opts); - } else { - queueMessage = true; - } } else { - assertUnreachable(this.#transport); + // No websocket connected yet + logger().debug({ msg: "no websocket, queueing message" }); + queueMessage = true; } if (!opts?.ephemeral && queueMessage) { @@ -836,83 +734,8 @@ enc } } - async #sendHttpMessage( - message: protocol.ToServer, - opts?: SendHttpMessageOpts, - ) { - try { - if (!this.#actorId || !this.#connectionId || !this.#connectionToken) - throw new errors.InternalError( - "Missing connection ID or token.", - ); - - logger().trace( - getEnvUniversal("_RIVETKIT_LOG_MESSAGE") - ? { - msg: "sent http message", - message: `${jsonStringifyCompat(message).substring(0, 100)}...`, - } - : { msg: "sent http message" }, - ); - - logger().debug({ - msg: "sending http message", - actorId: this.#actorId, - connectionId: this.#connectionId, - }); - - // Send an HTTP request to the connections endpoint - await sendHttpRequest({ - url: "http://actor/connections/message", - method: "POST", - headers: { - [HEADER_ENCODING]: this.#encoding, - [HEADER_CONN_ID]: this.#connectionId, - [HEADER_CONN_TOKEN]: this.#connectionToken, - }, - body: message, - encoding: this.#encoding, - skipParseResponse: true, - customFetch: this.#driver.sendRequest.bind( - this.#driver, - this.#actorId, - ), - requestVersionedDataHandler: TO_SERVER_VERSIONED, - responseVersionedDataHandler: TO_CLIENT_VERSIONED, - }); - } catch (error) { - // TODO: This will not automatically trigger a re-broadcast of HTTP events since SSE is separate from the HTTP action - - logger().warn({ - msg: "failed to send message, added to queue", - error, - }); - - // Assuming the socket is disconnected and will be reconnected soon - // - // Will attempt to resend soon - if (!opts?.ephemeral) { - this.#messageQueue.unshift(message); - } - } - } - async #parseMessage(data: ConnMessage): Promise { - invariant(this.#transport, "transport must be defined"); - - // Decode base64 since SSE sends raw strings - if (encodingIsBinary(this.#encoding) && "sse" in this.#transport) { - if (typeof data === "string") { - const binaryString = atob(data); - data = new Uint8Array( - [...binaryString].map((char) => char.charCodeAt(0)), - ); - } else { - throw new errors.InternalError( - `Expected data to be a string for SSE, got ${data}.`, - ); - } - } + invariant(this.#websocket, "websocket must be defined"); const buffer = await inputDataToBuffer(data); @@ -964,13 +787,11 @@ enc // Remove from registry this.#client[ACTOR_CONNS_SYMBOL].delete(this); - // Disconnect transport cleanly - if (!this.#transport) { - // Nothing to do - } else if ("websocket" in this.#transport) { + // Disconnect websocket cleanly + if (this.#websocket) { logger().debug("closing ws"); - const ws = this.#transport.websocket; + const ws = this.#websocket; // Check if WebSocket is already closed or closing if ( ws.readyState === 2 /* CLOSING */ || @@ -986,41 +807,8 @@ enc ws.close(1000, "Normal closure"); await promise; } - } else if ("sse" in this.#transport) { - logger().debug("closing sse"); - - // Send close request to server for SSE connections - if (this.#connectionId && this.#connectionToken) { - try { - await sendHttpRequest({ - url: "http://actor/connections/close", - method: "POST", - headers: { - [HEADER_CONN_ID]: this.#connectionId, - [HEADER_CONN_TOKEN]: this.#connectionToken, - }, - encoding: this.#encoding, - skipParseResponse: true, - customFetch: this.#driver.sendRequest.bind( - this.#driver, - this.#actorId!, - ), - requestVersionedDataHandler: TO_SERVER_VERSIONED, - responseVersionedDataHandler: TO_CLIENT_VERSIONED, - }); - } catch (error) { - // Ignore errors when closing - connection may already be closed - logger().warn({ - msg: "failed to send close request", - error, - }); - } - } - this.#transport.sse.close(); - } else { - assertUnreachable(this.#transport); } - this.#transport = undefined; + this.#websocket = undefined; } #sendSubscription(eventName: string, subscribe: boolean) { diff --git a/rivetkit-typescript/packages/rivetkit/src/client/client.ts b/rivetkit-typescript/packages/rivetkit/src/client/client.ts index 61b8c4e2ce..a95b395a07 100644 --- a/rivetkit-typescript/packages/rivetkit/src/client/client.ts +++ b/rivetkit-typescript/packages/rivetkit/src/client/client.ts @@ -1,5 +1,4 @@ import type { AnyActorDefinition } from "@/actor/definition"; -import type { Transport } from "@/actor/protocol/old"; import type { Encoding } from "@/actor/protocol/serde"; import type { ManagerDriver } from "@/driver-helpers/mod"; import type { ActorQuery } from "@/manager/protocol/query"; @@ -149,7 +148,6 @@ export interface Region { export const ACTOR_CONNS_SYMBOL = Symbol("actorConns"); export const CREATE_ACTOR_CONN_PROXY = Symbol("createActorConnProxy"); -export const TRANSPORT_SYMBOL = Symbol("transport"); /** * Client for managing & connecting to actors. @@ -164,7 +162,6 @@ export class ClientRaw { #driver: ManagerDriver; #encodingKind: Encoding; - [TRANSPORT_SYMBOL]: Transport; /** * Creates an instance of Client. @@ -173,7 +170,6 @@ export class ClientRaw { this.#driver = driver; this.#encodingKind = config.encoding ?? "bare"; - this[TRANSPORT_SYMBOL] = config.transport ?? "websocket"; } /** diff --git a/rivetkit-typescript/packages/rivetkit/src/client/config.ts b/rivetkit-typescript/packages/rivetkit/src/client/config.ts index e66dd7d006..c2c622d900 100644 --- a/rivetkit-typescript/packages/rivetkit/src/client/config.ts +++ b/rivetkit-typescript/packages/rivetkit/src/client/config.ts @@ -1,5 +1,4 @@ import z from "zod"; -import { TransportSchema } from "@/actor/protocol/old"; import { EncodingSchema } from "@/actor/protocol/serde"; import { type GetUpgradeWebSocket, getEnvUniversal } from "@/utils"; @@ -33,8 +32,6 @@ export const ClientConfigSchema = z.object({ encoding: EncodingSchema.default("bare"), - transport: TransportSchema.default("websocket"), - headers: z.record(z.string()).optional().default({}), // See RunConfig.getUpgradeWebSocket diff --git a/rivetkit-typescript/packages/rivetkit/src/client/mod.ts b/rivetkit-typescript/packages/rivetkit/src/client/mod.ts index 92252a9ae9..d4b8d27766 100644 --- a/rivetkit-typescript/packages/rivetkit/src/client/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/client/mod.ts @@ -11,7 +11,6 @@ export { ActorDefinition, AnyActorDefinition, } from "@/actor/definition"; -export type { Transport } from "@/actor/protocol/old"; export type { Encoding } from "@/actor/protocol/serde"; export { ActorClientError, diff --git a/rivetkit-typescript/packages/rivetkit/src/client/raw-utils.ts b/rivetkit-typescript/packages/rivetkit/src/client/raw-utils.ts index 8922d0f878..321eaeb544 100644 --- a/rivetkit-typescript/packages/rivetkit/src/client/raw-utils.ts +++ b/rivetkit-typescript/packages/rivetkit/src/client/raw-utils.ts @@ -1,5 +1,5 @@ import invariant from "invariant"; -import { PATH_RAW_WEBSOCKET_PREFIX } from "@/common/actor-router-consts"; +import { PATH_WEBSOCKET_PREFIX } from "@/common/actor-router-consts"; import { deconstructError } from "@/common/utils"; import { HEADER_CONN_PARAMS, type ManagerDriver } from "@/driver-helpers/mod"; import type { ActorQuery } from "@/manager/protocol/query"; @@ -69,7 +69,7 @@ export async function rawHttpFetch( // Build the URL with normalized path const normalizedPath = path.startsWith("/") ? path.slice(1) : path; - const url = new URL(`http://actor/raw/http/${normalizedPath}`); + const url = new URL(`http://actor/request/${normalizedPath}`); // Forward conn params if provided const proxyRequestHeaders = new Headers(mergedInit.headers); @@ -132,7 +132,7 @@ export async function rawWebSocket( } } - const fullPath = `${PATH_RAW_WEBSOCKET_PREFIX}${pathPortion}${queryPortion}`; + const fullPath = `${PATH_WEBSOCKET_PREFIX}${pathPortion}${queryPortion}`; logger().debug({ msg: "opening websocket", diff --git a/rivetkit-typescript/packages/rivetkit/src/common/actor-router-consts.ts b/rivetkit-typescript/packages/rivetkit/src/common/actor-router-consts.ts index 0a17e7231b..85c43a0a9c 100644 --- a/rivetkit-typescript/packages/rivetkit/src/common/actor-router-consts.ts +++ b/rivetkit-typescript/packages/rivetkit/src/common/actor-router-consts.ts @@ -1,8 +1,8 @@ // NOTE: This is in a separate file from the router since it needs to be shared between the client & the server. If this was in the router file, the client would end up importing the *entire* actor router and tree shaking would not work. // MARK: Paths -export const PATH_CONNECT_WEBSOCKET = "/connect/websocket"; -export const PATH_RAW_WEBSOCKET_PREFIX = "/raw/websocket/"; +export const PATH_CONNECT = "/connect"; +export const PATH_WEBSOCKET_PREFIX = "/websocket/"; // MARK: Headers export const HEADER_ACTOR_QUERY = "x-rivet-query"; @@ -14,10 +14,6 @@ export const HEADER_CONN_PARAMS = "x-rivet-conn-params"; export const HEADER_ACTOR_ID = "x-rivet-actor"; -export const HEADER_CONN_ID = "x-rivet-conn"; - -export const HEADER_CONN_TOKEN = "x-rivet-conn-token"; - export const HEADER_RIVET_TOKEN = "x-rivet-token"; // MARK: Manager Gateway Headers @@ -32,13 +28,10 @@ export const WS_PROTOCOL_TARGET = "rivet_target."; export const WS_PROTOCOL_ACTOR = "rivet_actor."; export const WS_PROTOCOL_ENCODING = "rivet_encoding."; export const WS_PROTOCOL_CONN_PARAMS = "rivet_conn_params."; -export const WS_PROTOCOL_CONN_ID = "rivet_conn."; -export const WS_PROTOCOL_CONN_TOKEN = "rivet_conn_token."; export const WS_PROTOCOL_TOKEN = "rivet_token."; // MARK: WebSocket Inline Test Protocol Prefixes -export const WS_PROTOCOL_TRANSPORT = "test_transport."; -export const WS_PROTOCOL_PATH = "test_path."; +export const WS_TEST_PROTOCOL_PATH = "test_path."; /** * Headers that publics can send from public clients. @@ -52,8 +45,6 @@ export const ALLOWED_PUBLIC_HEADERS = [ HEADER_ENCODING, HEADER_CONN_PARAMS, HEADER_ACTOR_ID, - HEADER_CONN_ID, - HEADER_CONN_TOKEN, HEADER_RIVET_TARGET, HEADER_RIVET_ACTOR, HEADER_RIVET_NAMESPACE, diff --git a/rivetkit-typescript/packages/rivetkit/src/common/log.ts b/rivetkit-typescript/packages/rivetkit/src/common/log.ts index 89012e03a0..60114e7f77 100644 --- a/rivetkit-typescript/packages/rivetkit/src/common/log.ts +++ b/rivetkit-typescript/packages/rivetkit/src/common/log.ts @@ -147,7 +147,7 @@ export async function configureDefaultLogger( }, hooks: { logMethod(inputArgs, method, level) { - // TODO: This is a hack to not implement our own transport target. We can get better perf if we have our own transport target. + // TODO: This is a hack to not implement our own Pino transport target. We can get better perf if we have our own transport target. const levelMap: Record = { 10: "trace", diff --git a/rivetkit-typescript/packages/rivetkit/src/driver-helpers/mod.ts b/rivetkit-typescript/packages/rivetkit/src/driver-helpers/mod.ts index 05754cfd4b..d6622c2717 100644 --- a/rivetkit-typescript/packages/rivetkit/src/driver-helpers/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/driver-helpers/mod.ts @@ -5,23 +5,18 @@ export { ALLOWED_PUBLIC_HEADERS, HEADER_ACTOR_ID, HEADER_ACTOR_QUERY, - HEADER_CONN_ID, HEADER_CONN_PARAMS, - HEADER_CONN_TOKEN, HEADER_ENCODING, HEADER_RIVET_ACTOR, HEADER_RIVET_TARGET, - PATH_CONNECT_WEBSOCKET, - PATH_RAW_WEBSOCKET_PREFIX, + PATH_CONNECT, + PATH_WEBSOCKET_PREFIX, WS_PROTOCOL_ACTOR, - WS_PROTOCOL_CONN_ID, WS_PROTOCOL_CONN_PARAMS, - WS_PROTOCOL_CONN_TOKEN, WS_PROTOCOL_ENCODING, - WS_PROTOCOL_PATH, WS_PROTOCOL_STANDARD, WS_PROTOCOL_TARGET, - WS_PROTOCOL_TRANSPORT, + WS_TEST_PROTOCOL_PATH as WS_PROTOCOL_PATH, } from "@/common/actor-router-consts"; export type { ActorOutput, diff --git a/rivetkit-typescript/packages/rivetkit/src/driver-helpers/utils.ts b/rivetkit-typescript/packages/rivetkit/src/driver-helpers/utils.ts index a75efdf0e4..9c875043e8 100644 --- a/rivetkit-typescript/packages/rivetkit/src/driver-helpers/utils.ts +++ b/rivetkit-typescript/packages/rivetkit/src/driver-helpers/utils.ts @@ -13,9 +13,8 @@ export function serializeEmptyPersistData( : null, hasInitialized: false, state: bufferToArrayBuffer(cbor.encode(undefined)), - connections: [], + hibernatableConns: [], scheduledEvents: [], - hibernatableWebSocket: [], }; return PERSISTED_ACTOR_VERSIONED.serializeWithEmbeddedVersion(persistData); } diff --git a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/mod.ts b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/mod.ts index d066f011c8..ed522e0d34 100644 --- a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/mod.ts @@ -4,7 +4,7 @@ import { bundleRequire } from "bundle-require"; import invariant from "invariant"; import { describe } from "vitest"; import { ClientConfigSchema } from "@/client/config"; -import type { Encoding, Transport } from "@/client/mod"; +import type { Encoding } from "@/client/mod"; import { configureInspectorAccessToken } from "@/inspector/utils"; import { createManagerRouter } from "@/manager/router"; import { @@ -19,10 +19,7 @@ import { logger } from "./log"; import { runActionFeaturesTests } from "./tests/action-features"; import { runActorConnTests } from "./tests/actor-conn"; import { runActorConnStateTests } from "./tests/actor-conn-state"; -import { - runActorDriverTests, - runActorDriverTestsWithTransport, -} from "./tests/actor-driver"; +import { runActorDriverTests } from "./tests/actor-driver"; import { runActorErrorHandlingTests } from "./tests/actor-error-handling"; import { runActorHandleTests } from "./tests/actor-handle"; import { runActorInlineClientTests } from "./tests/actor-inline-client"; @@ -40,7 +37,6 @@ import { runRequestAccessTests } from "./tests/request-access"; export interface SkipTests { schedule?: boolean; sleep?: boolean; - sse?: boolean; inline?: boolean; } @@ -59,8 +55,6 @@ export interface DriverTestConfig { skip?: SkipTests; - transport?: Transport; - encoding?: Encoding; clientType: ClientType; @@ -86,10 +80,7 @@ export interface DriverDeployOutput { /** Runs all Vitest tests against the provided drivers. */ export function runDriverTests( - driverTestConfigPartial: Omit< - DriverTestConfig, - "clientType" | "transport" | "encoding" - >, + driverTestConfigPartial: Omit, ) { const clientTypes: ClientType[] = driverTestConfigPartial.skip?.inline ? ["http"] @@ -109,37 +100,13 @@ export function runDriverTests( runActorDriverTests(driverTestConfig); runManagerDriverTests(driverTestConfig); - const transports: Transport[] = driverTestConfig.skip?.sse - ? ["websocket"] - : ["websocket", "sse"]; - for (const transport of transports) { - describe(`transport (${transport})`, () => { - runActorConnTests({ - ...driverTestConfig, - transport, - }); - - runActorConnStateTests({ - ...driverTestConfig, - transport, - }); - - runActorReconnectTests({ - ...driverTestConfig, - transport, - }); - - runRequestAccessTests({ - ...driverTestConfig, - transport, - }); - - runActorDriverTestsWithTransport({ - ...driverTestConfig, - transport, - }); - }); - } + runActorConnTests(driverTestConfig); + + runActorConnStateTests(driverTestConfig); + + runActorReconnectTests(driverTestConfig); + + runRequestAccessTests(driverTestConfig); runActorHandleTests(driverTestConfig); diff --git a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/test-inline-client-driver.ts b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/test-inline-client-driver.ts index ebaca6a5f2..4f672a414f 100644 --- a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/test-inline-client-driver.ts +++ b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/test-inline-client-driver.ts @@ -5,7 +5,6 @@ import type { WebSocket } from "ws"; import type { Encoding } from "@/actor/protocol/serde"; import { assertUnreachable } from "@/actor/utils"; import { ActorError as ClientActorError } from "@/client/errors"; -import type { Transport } from "@/client/mod"; import { HEADER_ACTOR_QUERY, HEADER_CONN_PARAMS, @@ -13,9 +12,8 @@ import { WS_PROTOCOL_ACTOR, WS_PROTOCOL_CONN_PARAMS, WS_PROTOCOL_ENCODING, - WS_PROTOCOL_PATH, WS_PROTOCOL_TARGET, - WS_PROTOCOL_TRANSPORT, + WS_TEST_PROTOCOL_PATH, } from "@/common/actor-router-consts"; import type { UniversalEventSource } from "@/common/eventsource-interface"; import type { DeconstructedError } from "@/common/utils"; @@ -37,7 +35,6 @@ import { logger } from "./log"; export interface TestInlineDriverCallRequest { encoding: Encoding; - transport: Transport; method: string; args: unknown[]; } @@ -56,46 +53,25 @@ export type TestInlineDriverCallResponse = export function createTestInlineClientDriver( endpoint: string, encoding: Encoding, - transport: Transport, ): ManagerDriver { return { getForId(input: GetForIdInput): Promise { - return makeInlineRequest( - endpoint, - encoding, - transport, - "getForId", - [input], - ); + return makeInlineRequest(endpoint, encoding, "getForId", [input]); }, getWithKey(input: GetWithKeyInput): Promise { - return makeInlineRequest( - endpoint, - encoding, - transport, - "getWithKey", - [input], - ); + return makeInlineRequest(endpoint, encoding, "getWithKey", [input]); }, getOrCreateWithKey( input: GetOrCreateWithKeyInput, ): Promise { - return makeInlineRequest( - endpoint, - encoding, - transport, - "getOrCreateWithKey", - [input], - ); + return makeInlineRequest(endpoint, encoding, "getOrCreateWithKey", [ + input, + ]); }, createActor(input: CreateInput): Promise { - return makeInlineRequest( - endpoint, - encoding, - transport, - "createActor", - [input], - ); + return makeInlineRequest(endpoint, encoding, "createActor", [ + input, + ]); }, async sendRequest( actorId: string, @@ -180,8 +156,6 @@ export function createTestInlineClientDriver( actorId: string, encoding: Encoding, params: unknown, - connId?: string, - connToken?: string, ): Promise { const WebSocket = await importWebSocket(); @@ -209,9 +183,8 @@ export function createTestInlineClientDriver( protocols.push(`${WS_PROTOCOL_TARGET}actor`); protocols.push(`${WS_PROTOCOL_ACTOR}${actorId}`); protocols.push(`${WS_PROTOCOL_ENCODING}${encoding}`); - protocols.push(`${WS_PROTOCOL_TRANSPORT}${transport}`); protocols.push( - `${WS_PROTOCOL_PATH}${encodeURIComponent(normalizedPath)}`, + `${WS_TEST_PROTOCOL_PATH}${encodeURIComponent(normalizedPath)}`, ); if (params !== undefined) { protocols.push( @@ -263,7 +236,6 @@ export function createTestInlineClientDriver( // return makeInlineRequest( // endpoint, // encoding, - // transport, // "action", // [undefined, actorQuery, encoding, params, name, args], // ); @@ -278,7 +250,6 @@ export function createTestInlineClientDriver( // return makeInlineRequest( // endpoint, // encodingKind, - // transport, // "resolveActorId", // [undefined, actorQuery, encodingKind, params], // ); @@ -400,7 +371,6 @@ export function createTestInlineClientDriver( // actorId, // encoding, // connectionId, - // transport, // }); // // const result = await fetch( @@ -412,7 +382,6 @@ export function createTestInlineClientDriver( // }, // body: JSON.stringify({ // encoding, - // transport, // method: "sendHttpMessage", // args: [ // undefined, @@ -573,14 +542,12 @@ export function createTestInlineClientDriver( async function makeInlineRequest( endpoint: string, encoding: Encoding, - transport: Transport, method: string, args: unknown[], ): Promise { logger().debug({ msg: "sending inline request", encoding, - transport, method, args, }); @@ -593,7 +560,6 @@ async function makeInlineRequest( }, body: cbor.encode({ encoding, - transport, method, args, } satisfies TestInlineDriverCallRequest), diff --git a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-conn-state.ts b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-conn-state.ts index 9d2da73597..0e80b7143b 100644 --- a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-conn-state.ts +++ b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-conn-state.ts @@ -147,10 +147,7 @@ export function runActorConnStateTests(driverTestConfig: DriverTestConfig) { }); describe("Connection Lifecycle", () => { - test.skipIf( - driverTestConfig.transport === "sse" && - driverTestConfig.clientType === "inline", - )("should track connection and disconnection events", async (c) => { + test("should track connection and disconnection events", async (c) => { const { client } = await setupDriverTest(c, driverTestConfig); const debugHandle = client.connStateActor.getOrCreate( diff --git a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-conn.ts b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-conn.ts index 8e67d55c30..9378d1f8a9 100644 --- a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-conn.ts +++ b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-conn.ts @@ -1,5 +1,4 @@ import { describe, expect, test, vi } from "vitest"; -import { SSE_PING_INTERVAL } from "@/actor/router-endpoints"; import type { DriverTestConfig } from "../mod"; import { FAKE_TIME, setupDriverTest, waitFor } from "../utils"; @@ -247,10 +246,7 @@ export function runActorConnTests(driverTestConfig: DriverTestConfig) { }); describe("Lifecycle Hooks", () => { - test.skipIf( - driverTestConfig.transport === "sse" && - driverTestConfig.clientType === "inline", - )("should trigger lifecycle hooks", async (c) => { + test("should trigger lifecycle hooks", async (c) => { const { client } = await setupDriverTest(c, driverTestConfig); // Create and connect diff --git a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-driver.ts b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-driver.ts index d51720ee4c..438348285a 100644 --- a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-driver.ts +++ b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-driver.ts @@ -11,14 +11,7 @@ export function runActorDriverTests(driverTestConfig: DriverTestConfig) { // Run scheduled alarms tests runActorScheduleTests(driverTestConfig); - }); -} -/** Actor driver tests that need to be tested for all transport mechanisms. */ -export function runActorDriverTestsWithTransport( - driverTestConfig: DriverTestConfig, -) { - describe("Actor Driver Tests", () => { // Run actor sleep tests runActorSleepTests(driverTestConfig); }); diff --git a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/raw-http-direct-registry.ts b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/raw-http-direct-registry.ts index 5e053e1840..359db4d54e 100644 --- a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/raw-http-direct-registry.ts +++ b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/raw-http-direct-registry.ts @@ -25,7 +25,7 @@ // // // Make a direct fetch request to the registry // const response = await fetch( -// `${endpoint}/registry/actors/raw/http/api/hello`, +// `${endpoint}/registry/actors/request/api/hello`, // { // method: "GET", // headers: { @@ -52,7 +52,7 @@ // // const testData = { test: "direct", number: 456 }; // const response = await fetch( -// `${endpoint}/registry/actors/raw/http/api/echo`, +// `${endpoint}/registry/actors/request/api/echo`, // { // method: "POST", // headers: { @@ -85,7 +85,7 @@ // }; // // const response = await fetch( -// `${endpoint}/registry/actors/raw/http/api/headers`, +// `${endpoint}/registry/actors/request/api/headers`, // { // method: "GET", // headers: { @@ -114,7 +114,7 @@ // const connParams = { token: "test-auth-token", userId: "user123" }; // // const response = await fetch( -// `${endpoint}/registry/actors/raw/http/api/hello`, +// `${endpoint}/registry/actors/request/api/hello`, // { // method: "GET", // headers: { @@ -140,7 +140,7 @@ // }; // // const response = await fetch( -// `${endpoint}/registry/actors/raw/http/api/anything`, +// `${endpoint}/registry/actors/request/api/anything`, // { // method: "GET", // headers: { @@ -168,7 +168,7 @@ // // for (const method of methods) { // const response = await fetch( -// `${endpoint}/registry/actors/raw/http/api/echo`, +// `${endpoint}/registry/actors/request/api/echo`, // { // method, // headers: { @@ -207,7 +207,7 @@ // // Send binary data // const binaryData = new Uint8Array([1, 2, 3, 4, 5]); // const response = await fetch( -// `${endpoint}/registry/actors/raw/http/api/echo`, +// `${endpoint}/registry/actors/request/api/echo`, // { // method: "POST", // headers: { diff --git a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/raw-websocket-direct-registry.ts b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/raw-websocket-direct-registry.ts index e5be963d98..0c29f70cf0 100644 --- a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/raw-websocket-direct-registry.ts +++ b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/raw-websocket-direct-registry.ts @@ -28,7 +28,7 @@ // const wsEndpoint = endpoint // .replace(/^http:/, "ws:") // .replace(/^https:/, "wss:"); -// const wsUrl = `${wsEndpoint}/registry/actors/raw/websocket/`; +// const wsUrl = `${wsEndpoint}/registry/actors/websocket/`; // // // Create WebSocket connection with subprotocol // const ws = new WebSocket(wsUrl, [ @@ -79,7 +79,7 @@ // const wsEndpoint = endpoint // .replace(/^http:/, "ws:") // .replace(/^https:/, "wss:"); -// const wsUrl = `${wsEndpoint}/registry/actors/raw/websocket/`; +// const wsUrl = `${wsEndpoint}/registry/actors/websocket/`; // // const ws = new WebSocket(wsUrl, [ // queryProtocol, @@ -138,7 +138,7 @@ // const wsEndpoint = endpoint // .replace(/^http:/, "ws:") // .replace(/^https:/, "wss:"); -// const wsUrl = `${wsEndpoint}/registry/actors/raw/websocket/`; +// const wsUrl = `${wsEndpoint}/registry/actors/websocket/`; // // const ws = new WebSocket(wsUrl, [ // queryProtocol, @@ -191,7 +191,7 @@ // const wsEndpoint = endpoint // .replace(/^http:/, "ws:") // .replace(/^https:/, "wss:"); -// const wsUrl = `${wsEndpoint}/registry/actors/raw/websocket/`; +// const wsUrl = `${wsEndpoint}/registry/actors/websocket/`; // // const ws = new WebSocket(wsUrl, [ // queryProtocol, @@ -247,7 +247,7 @@ // const paths = ["chat/room1", "updates/feed", "stream/events"]; // // for (const path of paths) { -// const wsUrl = `${wsEndpoint}/registry/actors/raw/websocket/${path}`; +// const wsUrl = `${wsEndpoint}/registry/actors/websocket/${path}`; // const ws = new WebSocket(wsUrl, [ // queryProtocol, // // HACK: See packages/drivers/cloudflare-workers/src/websocket.ts @@ -295,7 +295,7 @@ // const wsEndpoint = endpoint // .replace(/^http:/, "ws:") // .replace(/^https:/, "wss:"); -// const wsUrl = `${wsEndpoint}/registry/actors/raw/websocket/`; +// const wsUrl = `${wsEndpoint}/registry/actors/websocket/`; // // const ws = new WebSocket(wsUrl, [ // queryProtocol, @@ -329,7 +329,7 @@ // const wsEndpoint = endpoint // .replace(/^http:/, "ws:") // .replace(/^https:/, "wss:"); -// const wsUrl = `${wsEndpoint}/registry/actors/raw/websocket/`; +// const wsUrl = `${wsEndpoint}/registry/actors/websocket/`; // // const ws = new WebSocket(wsUrl, [ // queryProtocol, diff --git a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/request-access.ts b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/request-access.ts index 78c27838d1..a5ac02f671 100644 --- a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/request-access.ts +++ b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/request-access.ts @@ -141,7 +141,7 @@ export function runRequestAccessTests(driverTestConfig: DriverTestConfig) { // }, // }; // - // const url = `${endpoint}/registry/actors/raw/http/test-path`; + // const url = `${endpoint}/registry/actors/request/test-path`; // const response = await fetch(url, { // method: "POST", // headers: { @@ -200,7 +200,7 @@ export function runRequestAccessTests(driverTestConfig: DriverTestConfig) { // .replace("http://", "ws://") // .replace("https://", "wss://"); // const ws = new WebSocket( - // `${wsUrl}/registry/actors/raw/websocket/test-path`, + // `${wsUrl}/registry/actors/websocket/test-path`, // [ // queryProtocol, // "rivetkit", // Required protocol diff --git a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/utils.ts b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/utils.ts index bdf81a9a98..0ebbbdf2d7 100644 --- a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/utils.ts +++ b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/utils.ts @@ -35,20 +35,13 @@ export async function setupDriverTest( endpoint, namespace, runnerName, - transport: driverTestConfig.transport, encoding: driverTestConfig.encoding, }); } else if (driverTestConfig.clientType === "inline") { // Use inline client from driver - const transport = driverTestConfig.transport ?? "websocket"; const encoding = driverTestConfig.encoding ?? "bare"; - const managerDriver = createTestInlineClientDriver( - endpoint, - encoding, - transport, - ); + const managerDriver = createTestInlineClientDriver(endpoint, encoding); const runConfig = RunConfigSchema.parse({ - transport: transport, encoding: encoding, }); client = createClientWithDriver(managerDriver, runConfig); diff --git a/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts b/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts index c577b58915..17152b4770 100644 --- a/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts +++ b/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts @@ -21,16 +21,14 @@ import { } from "@/actor/router-endpoints"; import type { Client } from "@/client/client"; import { - PATH_CONNECT_WEBSOCKET, - PATH_RAW_WEBSOCKET_PREFIX, + PATH_CONNECT, + PATH_WEBSOCKET_PREFIX, WS_PROTOCOL_CONN_PARAMS, WS_PROTOCOL_ENCODING, - WS_PROTOCOL_TOKEN, } from "@/common/actor-router-consts"; import type { UpgradeWebSocketArgs } from "@/common/inline-websocket-adapter2"; import { getLogger } from "@/common/log"; import type { - RivetEvent, RivetMessageEvent, UniversalWebSocket, } from "@/common/websocket-interface"; @@ -200,12 +198,12 @@ export class EngineActorDriver implements ActorDriver { msg: "no existing hibernatable websocket found", requestId: idToStr(requestId), }); - if (path === PATH_CONNECT_WEBSOCKET) { + if (path === PATH_CONNECT) { hibernationConfig = { enabled: true, lastMsgIndex: undefined, }; - } else if (path.startsWith(PATH_RAW_WEBSOCKET_PREFIX)) { + } else if (path.startsWith(PATH_WEBSOCKET_PREFIX)) { // Find actor config const definition = lookupInRegistry( this.#registryConfig, @@ -553,7 +551,7 @@ export class EngineActorDriver implements ActorDriver { // // We store the promise since we need to add WebSocket event listeners immediately that will wait for the promise to resolve let wsHandlerPromise: Promise; - if (url.pathname === PATH_CONNECT_WEBSOCKET) { + if (url.pathname === PATH_CONNECT) { wsHandlerPromise = handleWebSocketConnect( request, this.#runConfig, @@ -563,11 +561,8 @@ export class EngineActorDriver implements ActorDriver { connParams, requestId, requestIdBuf, - // Extract connId and connToken from protocols if needed - undefined, - undefined, ); - } else if (url.pathname.startsWith(PATH_RAW_WEBSOCKET_PREFIX)) { + } else if (url.pathname.startsWith(PATH_WEBSOCKET_PREFIX)) { wsHandlerPromise = handleRawWebSocketHandler( request, url.pathname + url.search, diff --git a/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/manager.ts b/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/manager.ts index 3a5534be47..f5ab47c125 100644 --- a/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/manager.ts +++ b/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/manager.ts @@ -25,8 +25,8 @@ import type { ManagerDisplayInformation } from "@/manager/driver"; import { type DriverConfig, type Encoding, - PATH_CONNECT_WEBSOCKET, - PATH_RAW_WEBSOCKET_PREFIX, + PATH_CONNECT, + PATH_WEBSOCKET_PREFIX, type RegistryConfig, type RunConfig, type UniversalWebSocket, @@ -157,15 +157,13 @@ export class FileSystemManagerDriver implements ManagerDriver { actorId: string, encoding: Encoding, params: unknown, - connId?: string, - connToken?: string, ): Promise { // Handle raw WebSocket paths const pathOnly = path.split("?")[0]; const normalizedPath = pathOnly.startsWith("/") ? pathOnly : `/${pathOnly}`; - if (normalizedPath === PATH_CONNECT_WEBSOCKET) { + if (normalizedPath === PATH_CONNECT) { // Handle standard connect const wsHandler = await handleWebSocketConnect( undefined, @@ -176,13 +174,11 @@ export class FileSystemManagerDriver implements ManagerDriver { params, generateConnRequestId(), undefined, - connId, - connToken, ); return new InlineWebSocketAdapter2(wsHandler); } else if ( - normalizedPath.startsWith(PATH_RAW_WEBSOCKET_PREFIX) || - normalizedPath === "/raw/websocket" + normalizedPath.startsWith(PATH_WEBSOCKET_PREFIX) || + normalizedPath === "/websocket" ) { // Handle websocket proxy // Use the full path with query parameters @@ -215,8 +211,6 @@ export class FileSystemManagerDriver implements ManagerDriver { actorId: string, encoding: Encoding, connParams: unknown, - connId?: string, - connToken?: string, ): Promise { const upgradeWebSocket = this.#runConfig.getUpgradeWebSocket?.(); invariant(upgradeWebSocket, "missing getUpgradeWebSocket"); @@ -226,7 +220,7 @@ export class FileSystemManagerDriver implements ManagerDriver { const normalizedPath = pathOnly.startsWith("/") ? pathOnly : `/${pathOnly}`; - if (normalizedPath === PATH_CONNECT_WEBSOCKET) { + if (normalizedPath === PATH_CONNECT) { // Handle standard connect const wsHandler = await handleWebSocketConnect( c.req.raw, @@ -237,13 +231,11 @@ export class FileSystemManagerDriver implements ManagerDriver { connParams, generateConnRequestId(), undefined, - connId, - connToken, ); return upgradeWebSocket(() => wsHandler)(c, noopNext()); } else if ( - normalizedPath.startsWith(PATH_RAW_WEBSOCKET_PREFIX) || - normalizedPath === "/raw/websocket" + normalizedPath.startsWith(PATH_WEBSOCKET_PREFIX) || + normalizedPath === "/websocket" ) { // Handle websocket proxy // Use the full path with query parameters diff --git a/rivetkit-typescript/packages/rivetkit/src/manager/driver.ts b/rivetkit-typescript/packages/rivetkit/src/manager/driver.ts index 051c87d687..a3ab26eb03 100644 --- a/rivetkit-typescript/packages/rivetkit/src/manager/driver.ts +++ b/rivetkit-typescript/packages/rivetkit/src/manager/driver.ts @@ -21,8 +21,6 @@ export interface ManagerDriver { actorId: string, encoding: Encoding, params: unknown, - connId?: string, - connToken?: string, ): Promise; proxyRequest( c: HonoContext, @@ -35,8 +33,6 @@ export interface ManagerDriver { actorId: string, encoding: Encoding, params: unknown, - connId?: string, - connToken?: string, ): Promise; displayInformation(): ManagerDisplayInformation; diff --git a/rivetkit-typescript/packages/rivetkit/src/manager/gateway.ts b/rivetkit-typescript/packages/rivetkit/src/manager/gateway.ts index a6be497a56..d35723753c 100644 --- a/rivetkit-typescript/packages/rivetkit/src/manager/gateway.ts +++ b/rivetkit-typescript/packages/rivetkit/src/manager/gateway.ts @@ -1,15 +1,13 @@ import type { Context as HonoContext, Next } from "hono"; import type { WSContext } from "hono/ws"; import { MissingActorHeader, WebSocketsNotEnabled } from "@/actor/errors"; -import type { Encoding, Transport } from "@/client/mod"; +import type { Encoding } from "@/client/mod"; import { HEADER_RIVET_ACTOR, HEADER_RIVET_NAMESPACE, HEADER_RIVET_TARGET, WS_PROTOCOL_ACTOR, - WS_PROTOCOL_CONN_ID, WS_PROTOCOL_CONN_PARAMS, - WS_PROTOCOL_CONN_TOKEN, WS_PROTOCOL_ENCODING, WS_PROTOCOL_TARGET, WS_PROTOCOL_TOKEN, @@ -47,8 +45,6 @@ async function handleWebSocketGatewayPathBased( const protocols = c.req.header("sec-websocket-protocol"); let encodingRaw: string | undefined; let connParamsRaw: string | undefined; - let connIdRaw: string | undefined; - let connTokenRaw: string | undefined; if (protocols) { const protocolList = protocols.split(",").map((p) => p.trim()); @@ -59,12 +55,6 @@ async function handleWebSocketGatewayPathBased( connParamsRaw = decodeURIComponent( protocol.substring(WS_PROTOCOL_CONN_PARAMS.length), ); - } else if (protocol.startsWith(WS_PROTOCOL_CONN_ID)) { - connIdRaw = protocol.substring(WS_PROTOCOL_CONN_ID.length); - } else if (protocol.startsWith(WS_PROTOCOL_CONN_TOKEN)) { - connTokenRaw = protocol.substring( - WS_PROTOCOL_CONN_TOKEN.length, - ); } } } @@ -85,8 +75,6 @@ async function handleWebSocketGatewayPathBased( actorPathInfo.actorId, encoding as any, // Will be validated by driver connParams, - connIdRaw, - connTokenRaw, ); } @@ -230,8 +218,6 @@ async function handleWebSocketGateway( let actorId: string | undefined; let encodingRaw: string | undefined; let connParamsRaw: string | undefined; - let connIdRaw: string | undefined; - let connTokenRaw: string | undefined; if (protocols) { const protocolList = protocols.split(",").map((p) => p.trim()); @@ -246,12 +232,6 @@ async function handleWebSocketGateway( connParamsRaw = decodeURIComponent( protocol.substring(WS_PROTOCOL_CONN_PARAMS.length), ); - } else if (protocol.startsWith(WS_PROTOCOL_CONN_ID)) { - connIdRaw = protocol.substring(WS_PROTOCOL_CONN_ID.length); - } else if (protocol.startsWith(WS_PROTOCOL_CONN_TOKEN)) { - connTokenRaw = protocol.substring( - WS_PROTOCOL_CONN_TOKEN.length, - ); } } } @@ -285,8 +265,6 @@ async function handleWebSocketGateway( actorId, encoding as any, // Will be validated by driver connParams, - connIdRaw, - connTokenRaw, ); } diff --git a/rivetkit-typescript/packages/rivetkit/src/manager/protocol/mod.ts b/rivetkit-typescript/packages/rivetkit/src/manager/protocol/mod.ts index 1eaa9be279..13343c3dd2 100644 --- a/rivetkit-typescript/packages/rivetkit/src/manager/protocol/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/manager/protocol/mod.ts @@ -1,5 +1,4 @@ import { z } from "zod"; -import { TransportSchema } from "@/actor/protocol/old"; import { ActorQuerySchema } from "./query"; export * from "./query"; @@ -10,7 +9,6 @@ export const ActorsRequestSchema = z.object({ export const ActorsResponseSchema = z.object({ actorId: z.string(), - supportedTransports: z.array(TransportSchema), }); //export const RivetConfigResponseSchema = z.object({ diff --git a/rivetkit-typescript/packages/rivetkit/src/manager/protocol/query.ts b/rivetkit-typescript/packages/rivetkit/src/manager/protocol/query.ts index 5a7d7c9179..bd887b2b53 100644 --- a/rivetkit-typescript/packages/rivetkit/src/manager/protocol/query.ts +++ b/rivetkit-typescript/packages/rivetkit/src/manager/protocol/query.ts @@ -3,9 +3,7 @@ import { EncodingSchema } from "@/actor/protocol/serde"; import { HEADER_ACTOR_ID, HEADER_ACTOR_QUERY, - HEADER_CONN_ID, HEADER_CONN_PARAMS, - HEADER_CONN_TOKEN, HEADER_ENCODING, } from "@/common/actor-router-consts"; @@ -69,9 +67,7 @@ export const ConnectWebSocketRequestSchema = z.object({ export const ConnMessageRequestSchema = z.object({ actorId: z.string().describe(HEADER_ACTOR_ID), - connId: z.string().describe(HEADER_CONN_ID), encoding: EncodingSchema.describe(HEADER_ENCODING), - connToken: z.string().describe(HEADER_CONN_TOKEN), }); export const ResolveRequestSchema = z.object({ diff --git a/rivetkit-typescript/packages/rivetkit/src/manager/router.ts b/rivetkit-typescript/packages/rivetkit/src/manager/router.ts index abccea32d4..774a0099f8 100644 --- a/rivetkit-typescript/packages/rivetkit/src/manager/router.ts +++ b/rivetkit-typescript/packages/rivetkit/src/manager/router.ts @@ -7,20 +7,16 @@ import { type Next, } from "hono"; import { createMiddleware } from "hono/factory"; -import { streamSSE } from "hono/streaming"; import invariant from "invariant"; import { z } from "zod"; import { ActorNotFound, InvalidRequest, Unsupported } from "@/actor/errors"; import { serializeActorKey } from "@/actor/keys"; -import type { Client, Encoding, Transport } from "@/client/mod"; +import type { Client, Encoding } from "@/client/mod"; import { WS_PROTOCOL_ACTOR, - WS_PROTOCOL_CONN_ID, WS_PROTOCOL_CONN_PARAMS, - WS_PROTOCOL_CONN_TOKEN, WS_PROTOCOL_ENCODING, - WS_PROTOCOL_PATH, - WS_PROTOCOL_TRANSPORT, + WS_TEST_PROTOCOL_PATH, } from "@/common/actor-router-consts"; import { cors } from "@/common/cors"; import { @@ -492,19 +488,12 @@ function addManagerRoutes( router.post(".test/inline-driver/call", async (c) => { // TODO: use openapi instead const buffer = await c.req.arrayBuffer(); - const { - encoding, - transport, - method, - args, - }: TestInlineDriverCallRequest = cbor.decode( - new Uint8Array(buffer), - ); + const { encoding, method, args }: TestInlineDriverCallRequest = + cbor.decode(new Uint8Array(buffer)); logger().debug({ msg: "received inline request", encoding, - transport, method, args, }); @@ -541,11 +530,8 @@ function addManagerRoutes( // Parse protocols to extract connection info let actorId = ""; let encoding: Encoding = "bare"; - let transport: Transport = "websocket"; let path = ""; let params: unknown; - let connId: string | undefined; - let connToken: string | undefined; for (const protocol of protocols) { if (protocol.startsWith(WS_PROTOCOL_ACTOR)) { @@ -554,25 +540,15 @@ function addManagerRoutes( encoding = protocol.substring( WS_PROTOCOL_ENCODING.length, ) as Encoding; - } else if (protocol.startsWith(WS_PROTOCOL_TRANSPORT)) { - transport = protocol.substring( - WS_PROTOCOL_TRANSPORT.length, - ) as Transport; - } else if (protocol.startsWith(WS_PROTOCOL_PATH)) { + } else if (protocol.startsWith(WS_TEST_PROTOCOL_PATH)) { path = decodeURIComponent( - protocol.substring(WS_PROTOCOL_PATH.length), + protocol.substring(WS_TEST_PROTOCOL_PATH.length), ); } else if (protocol.startsWith(WS_PROTOCOL_CONN_PARAMS)) { const paramsRaw = decodeURIComponent( protocol.substring(WS_PROTOCOL_CONN_PARAMS.length), ); params = JSON.parse(paramsRaw); - } else if (protocol.startsWith(WS_PROTOCOL_CONN_ID)) { - connId = protocol.substring(WS_PROTOCOL_CONN_ID.length); - } else if (protocol.startsWith(WS_PROTOCOL_CONN_TOKEN)) { - connToken = protocol.substring( - WS_PROTOCOL_CONN_TOKEN.length, - ); } } @@ -581,7 +557,6 @@ function addManagerRoutes( actorId, params, encodingKind: encoding, - transport, path: path, }); @@ -591,8 +566,6 @@ function addManagerRoutes( actorId, encoding, params, - connId, - connToken, ); return await createTestWebSocketProxy(clientWsPromise); diff --git a/rivetkit-typescript/packages/rivetkit/src/mod.ts b/rivetkit-typescript/packages/rivetkit/src/mod.ts index 924a7aa299..9156949d8a 100644 --- a/rivetkit-typescript/packages/rivetkit/src/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/mod.ts @@ -1,4 +1,3 @@ -export { generateConnId, generateConnToken } from "@/actor/conn"; export * from "@/actor/mod"; export { type AnyClient, diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/mod.ts b/rivetkit-typescript/packages/rivetkit/src/registry/mod.ts index 5befc2a6ad..e7965e71f2 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/mod.ts @@ -287,7 +287,6 @@ async function configureServerlessRunner(config: RunnerConfig): Promise { namespace: config.namespace, runnerName: config.runnerName, encoding: config.encoding, - transport: config.transport, headers: config.headers, getUpgradeWebSocket: config.getUpgradeWebSocket, disableHealthCheck: true, // We don't need health check for this operation diff --git a/rivetkit-typescript/packages/rivetkit/src/remote-manager-driver/actor-websocket-client.ts b/rivetkit-typescript/packages/rivetkit/src/remote-manager-driver/actor-websocket-client.ts index 6ead7015e4..c0213fb702 100644 --- a/rivetkit-typescript/packages/rivetkit/src/remote-manager-driver/actor-websocket-client.ts +++ b/rivetkit-typescript/packages/rivetkit/src/remote-manager-driver/actor-websocket-client.ts @@ -2,9 +2,7 @@ import type { ClientConfig } from "@/client/config"; import { HEADER_CONN_PARAMS, HEADER_ENCODING, - WS_PROTOCOL_CONN_ID, WS_PROTOCOL_CONN_PARAMS, - WS_PROTOCOL_CONN_TOKEN, WS_PROTOCOL_ENCODING, WS_PROTOCOL_STANDARD as WS_PROTOCOL_RIVETKIT, WS_PROTOCOL_TOKEN, @@ -21,8 +19,6 @@ export async function openWebSocketToActor( actorId: string, encoding: Encoding, params: unknown, - connId?: string, - connToken?: string, ): Promise { const WebSocket = await importWebSocket(); @@ -40,7 +36,7 @@ export async function openWebSocketToActor( // Create WebSocket connection const ws = new WebSocket( guardUrl, - buildWebSocketProtocols(runConfig, encoding, params, connId, connToken), + buildWebSocketProtocols(runConfig, encoding, params), ); // Set binary type to arraybuffer for proper encoding support @@ -55,25 +51,14 @@ export function buildWebSocketProtocols( runConfig: ClientConfig, encoding: Encoding, params?: unknown, - connId?: string, - connToken?: string, ): string[] { const protocols: string[] = []; protocols.push(WS_PROTOCOL_RIVETKIT); protocols.push(`${WS_PROTOCOL_ENCODING}${encoding}`); - if (runConfig.token) { - protocols.push(`${WS_PROTOCOL_TOKEN}${runConfig.token}`); - } if (params) { protocols.push( `${WS_PROTOCOL_CONN_PARAMS}${encodeURIComponent(JSON.stringify(params))}`, ); } - if (connId) { - protocols.push(`${WS_PROTOCOL_CONN_ID}${connId}`); - } - if (connToken) { - protocols.push(`${WS_PROTOCOL_CONN_TOKEN}${connToken}`); - } return protocols; } diff --git a/rivetkit-typescript/packages/rivetkit/src/remote-manager-driver/mod.ts b/rivetkit-typescript/packages/rivetkit/src/remote-manager-driver/mod.ts index e3a04e7f73..08a29a4710 100644 --- a/rivetkit-typescript/packages/rivetkit/src/remote-manager-driver/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/remote-manager-driver/mod.ts @@ -316,8 +316,6 @@ export class RemoteManagerDriver implements ManagerDriver { actorId: string, encoding: Encoding, params: unknown, - connId?: string, - connToken?: string, ): Promise { // Wait for metadata check to complete if in progress if (this.#metadataPromise) { @@ -330,8 +328,6 @@ export class RemoteManagerDriver implements ManagerDriver { actorId, encoding, params, - connId, - connToken, ); } @@ -358,8 +354,6 @@ export class RemoteManagerDriver implements ManagerDriver { actorId: string, encoding: Encoding, params: unknown, - connId?: string, - connToken?: string, ): Promise { // Wait for metadata check to complete if in progress if (this.#metadataPromise) { @@ -385,8 +379,6 @@ export class RemoteManagerDriver implements ManagerDriver { this.#config, encoding, params, - connId, - connToken, ); const args = await createWebSocketProxy(c, wsGuardUrl, protocols); diff --git a/rivetkit-typescript/packages/rivetkit/src/schemas/actor-persist/mod.ts b/rivetkit-typescript/packages/rivetkit/src/schemas/actor-persist/mod.ts index e1afda50e2..a7add56dfd 100644 --- a/rivetkit-typescript/packages/rivetkit/src/schemas/actor-persist/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/schemas/actor-persist/mod.ts @@ -1 +1 @@ -export * from "../../../dist/schemas/actor-persist/v2"; +export * from "../../../dist/schemas/actor-persist/v3"; diff --git a/rivetkit-typescript/packages/rivetkit/src/schemas/actor-persist/versioned.ts b/rivetkit-typescript/packages/rivetkit/src/schemas/actor-persist/versioned.ts index 5a4a63a0af..84ba566473 100644 --- a/rivetkit-typescript/packages/rivetkit/src/schemas/actor-persist/versioned.ts +++ b/rivetkit-typescript/packages/rivetkit/src/schemas/actor-persist/versioned.ts @@ -3,39 +3,89 @@ import { type MigrationFn, } from "@/common/versioned-data"; import type * as v1 from "../../../dist/schemas/actor-persist/v1"; -import * as v2 from "../../../dist/schemas/actor-persist/v2"; - -export const CURRENT_VERSION = 2; - -export type CurrentPersistedActor = v2.PersistedActor; -export type CurrentPersistedConnection = v2.PersistedConnection; -export type CurrentPersistedSubscription = v2.PersistedSubscription; -export type CurrentGenericPersistedScheduleEvent = - v2.GenericPersistedScheduleEvent; -export type CurrentPersistedScheduleEventKind = v2.PersistedScheduleEventKind; -export type CurrentPersistedScheduleEvent = v2.PersistedScheduleEvent; -export type CurrentPersistedHibernatableWebSocket = - v2.PersistedHibernatableWebSocket; - -const migrations = new Map>(); - -// Migration from v1 to v2: Add hibernatableWebSocket field -migrations.set( - 1, - (v1Data: v1.PersistedActor): v2.PersistedActor => ({ - ...v1Data, - connections: v1Data.connections.map((conn) => ({ - ...conn, - hibernatableRequestId: null, - })), - hibernatableWebSocket: [], - }), -); +import type * as v2 from "../../../dist/schemas/actor-persist/v2"; +import * as v3 from "../../../dist/schemas/actor-persist/v3"; + +export const CURRENT_VERSION = 3; + +export type CurrentPersistedActor = v3.PersistedActor; +export type CurrentPersistedHibernatableConn = v3.PersistedHibernatableConn; +export type CurrentPersistedScheduleEvent = v3.PersistedScheduleEvent; + +const migrations = new Map>([ + [ + 1, + (v1Data: v1.PersistedActor): v2.PersistedActor => ({ + ...v1Data, + connections: v1Data.connections.map((conn) => ({ + ...conn, + hibernatableRequestId: null, + })), + hibernatableWebSocket: [], + }), + ], + [ + 2, + (v2Data: v2.PersistedActor): v3.PersistedActor => { + // Merge connections and hibernatableWebSocket into hibernatableConns + const hibernatableConns: v3.PersistedHibernatableConn[] = []; + + // Convert connections with hibernatable request IDs to hibernatable conns + for (const conn of v2Data.connections) { + if (conn.hibernatableRequestId) { + // Find the matching hibernatable WebSocket + const ws = v2Data.hibernatableWebSocket.find((ws) => + Buffer.from(ws.requestId).equals( + Buffer.from(conn.hibernatableRequestId!), + ), + ); + + if (ws) { + hibernatableConns.push({ + id: conn.id, + parameters: conn.parameters, + state: conn.state, + hibernatableRequestId: conn.hibernatableRequestId, + lastSeenTimestamp: ws.lastSeenTimestamp, + msgIndex: ws.msgIndex, + }); + } + } + } + + // Transform scheduled events from nested structure to flat structure + const scheduledEvents: v3.PersistedScheduleEvent[] = + v2Data.scheduledEvents.map((event) => { + // Extract action and args from the kind wrapper + if (event.kind.tag === "GenericPersistedScheduleEvent") { + return { + eventId: event.eventId, + timestamp: event.timestamp, + action: event.kind.val.action, + args: event.kind.val.args, + }; + } + // Fallback for unknown kinds + throw new Error( + `Unknown schedule event kind: ${event.kind.tag}`, + ); + }); + + return { + input: v2Data.input, + hasInitialized: v2Data.hasInitialized, + state: v2Data.state, + hibernatableConns, + scheduledEvents, + }; + }, + ], +]); export const PERSISTED_ACTOR_VERSIONED = createVersionedDataHandler({ currentVersion: CURRENT_VERSION, migrations, - serializeVersion: (data) => v2.encodePersistedActor(data), - deserializeVersion: (bytes) => v2.decodePersistedActor(bytes), + serializeVersion: (data) => v3.encodePersistedActor(data), + deserializeVersion: (bytes) => v3.decodePersistedActor(bytes), }); diff --git a/rivetkit-typescript/packages/rivetkit/src/schemas/client-protocol/mod.ts b/rivetkit-typescript/packages/rivetkit/src/schemas/client-protocol/mod.ts index a1841fee94..19c1c3877e 100644 --- a/rivetkit-typescript/packages/rivetkit/src/schemas/client-protocol/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/schemas/client-protocol/mod.ts @@ -1 +1 @@ -export * from "../../../dist/schemas/client-protocol/v1"; +export * from "../../../dist/schemas/client-protocol/v2"; diff --git a/rivetkit-typescript/packages/rivetkit/src/schemas/client-protocol/versioned.ts b/rivetkit-typescript/packages/rivetkit/src/schemas/client-protocol/versioned.ts index 50408aa104..51d5da5347 100644 --- a/rivetkit-typescript/packages/rivetkit/src/schemas/client-protocol/versioned.ts +++ b/rivetkit-typescript/packages/rivetkit/src/schemas/client-protocol/versioned.ts @@ -2,52 +2,80 @@ import { createVersionedDataHandler, type MigrationFn, } from "@/common/versioned-data"; -import * as v1 from "../../../dist/schemas/client-protocol/v1"; +import type * as v1 from "../../../dist/schemas/client-protocol/v1"; +import * as v2 from "../../../dist/schemas/client-protocol/v2"; -export const CURRENT_VERSION = 1; +export const CURRENT_VERSION = 2; + +export type CurrentToServer = v2.ToServer; +export type CurrentToClient = v2.ToClient; +export type CurrentHttpActionRequest = v2.HttpActionRequest; +export type CurrentHttpActionResponse = v2.HttpActionResponse; +export type CurrentHttpResponseError = v2.HttpResponseError; +export type CurrentHttpResolveRequest = v2.HttpResolveRequest; +export type CurrentHttpResolveResponse = v2.HttpResolveResponse; const migrations = new Map>(); -export const TO_SERVER_VERSIONED = createVersionedDataHandler({ +// Migration from v1 to v2: Remove connectionToken from Init message +migrations.set(1, (v1Data: v1.ToClient): v2.ToClient => { + // Handle Init message specifically to remove connectionToken + if (v1Data.body.tag === "Init") { + const { actorId, connectionId } = v1Data.body.val as v1.Init; + return { + body: { + tag: "Init", + val: { + actorId, + connectionId, + }, + }, + }; + } + // All other messages are unchanged + return v1Data as unknown as v2.ToClient; +}); + +export const TO_SERVER_VERSIONED = createVersionedDataHandler({ currentVersion: CURRENT_VERSION, migrations, - serializeVersion: (data) => v1.encodeToServer(data), - deserializeVersion: (bytes) => v1.decodeToServer(bytes), + serializeVersion: (data) => v2.encodeToServer(data), + deserializeVersion: (bytes) => v2.decodeToServer(bytes), }); -export const TO_CLIENT_VERSIONED = createVersionedDataHandler({ +export const TO_CLIENT_VERSIONED = createVersionedDataHandler({ currentVersion: CURRENT_VERSION, migrations, - serializeVersion: (data) => v1.encodeToClient(data), - deserializeVersion: (bytes) => v1.decodeToClient(bytes), + serializeVersion: (data) => v2.encodeToClient(data), + deserializeVersion: (bytes) => v2.decodeToClient(bytes), }); export const HTTP_ACTION_REQUEST_VERSIONED = - createVersionedDataHandler({ + createVersionedDataHandler({ currentVersion: CURRENT_VERSION, migrations, - serializeVersion: (data) => v1.encodeHttpActionRequest(data), - deserializeVersion: (bytes) => v1.decodeHttpActionRequest(bytes), + serializeVersion: (data) => v2.encodeHttpActionRequest(data), + deserializeVersion: (bytes) => v2.decodeHttpActionRequest(bytes), }); export const HTTP_ACTION_RESPONSE_VERSIONED = - createVersionedDataHandler({ + createVersionedDataHandler({ currentVersion: CURRENT_VERSION, migrations, - serializeVersion: (data) => v1.encodeHttpActionResponse(data), - deserializeVersion: (bytes) => v1.decodeHttpActionResponse(bytes), + serializeVersion: (data) => v2.encodeHttpActionResponse(data), + deserializeVersion: (bytes) => v2.decodeHttpActionResponse(bytes), }); export const HTTP_RESPONSE_ERROR_VERSIONED = - createVersionedDataHandler({ + createVersionedDataHandler({ currentVersion: CURRENT_VERSION, migrations, - serializeVersion: (data) => v1.encodeHttpResponseError(data), - deserializeVersion: (bytes) => v1.decodeHttpResponseError(bytes), + serializeVersion: (data) => v2.encodeHttpResponseError(data), + deserializeVersion: (bytes) => v2.decodeHttpResponseError(bytes), }); export const HTTP_RESOLVE_REQUEST_VERSIONED = - createVersionedDataHandler({ + createVersionedDataHandler({ currentVersion: CURRENT_VERSION, migrations, serializeVersion: (_) => new Uint8Array(), @@ -55,9 +83,9 @@ export const HTTP_RESOLVE_REQUEST_VERSIONED = }); export const HTTP_RESOLVE_RESPONSE_VERSIONED = - createVersionedDataHandler({ + createVersionedDataHandler({ currentVersion: CURRENT_VERSION, migrations, - serializeVersion: (data) => v1.encodeHttpResolveResponse(data), - deserializeVersion: (bytes) => v1.decodeHttpResolveResponse(bytes), + serializeVersion: (data) => v2.encodeHttpResolveResponse(data), + deserializeVersion: (bytes) => v2.decodeHttpResolveResponse(bytes), }); diff --git a/rivetkit-typescript/packages/rivetkit/tests/driver-engine.test.ts b/rivetkit-typescript/packages/rivetkit/tests/driver-engine.test.ts index 4357f986ff..598d839a87 100644 --- a/rivetkit-typescript/packages/rivetkit/tests/driver-engine.test.ts +++ b/rivetkit-typescript/packages/rivetkit/tests/driver-engine.test.ts @@ -8,8 +8,6 @@ runDriverTests({ // Use real timers for engine-runner tests useRealTimers: true, skip: { - // SSE is not implemented on Rivet Guard yet - sse: true, // The inline client is the same as the remote client driver on Rivet inline: true, }, diff --git a/website/src/content/docs/actors/fetch-and-websocket-handler.mdx b/website/src/content/docs/actors/fetch-and-websocket-handler.mdx index aa79b16616..15faad7810 100644 --- a/website/src/content/docs/actors/fetch-and-websocket-handler.mdx +++ b/website/src/content/docs/actors/fetch-and-websocket-handler.mdx @@ -411,7 +411,7 @@ For HTTP requests, the router expects these headers: ```typescript // Direct HTTP request to actor -const response = await fetch("http://localhost:8080/registry/actors/myActor/raw/http/api/hello", { +const response = await fetch("http://localhost:8080/registry/actors/myActor/request/api/hello", { method: "GET", headers: { "X-RivetKit-Actor-Query": JSON.stringify({ @@ -426,7 +426,7 @@ const data = await response.json(); console.log(data); // { message: "Hello from actor!" } // POST request with data -const postResponse = await fetch("http://localhost:8080/registry/actors/myActor/raw/http/api/echo", { +const postResponse = await fetch("http://localhost:8080/registry/actors/myActor/request/api/echo", { method: "POST", headers: { "X-RivetKit-Actor-Query": JSON.stringify({