diff --git a/examples/cursors-raw-websocket/src/backend/registry.ts b/examples/cursors-raw-websocket/src/backend/registry.ts index 01ee63eabe..a7c6a1954a 100644 --- a/examples/cursors-raw-websocket/src/backend/registry.ts +++ b/examples/cursors-raw-websocket/src/backend/registry.ts @@ -56,7 +56,7 @@ export const cursorRoom = actor({ }, // Handle WebSocket connections - onWebSocket: async (c, websocket: UniversalWebSocket, { request }) => { + onWebSocket: async (c, websocket: UniversalWebSocket) => { const url = new URL(request.url); const sessionId = url.searchParams.get("sessionId"); diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/conn-params.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/conn-params.ts index 422b508877..4116a4432c 100644 --- a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/conn-params.ts +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/conn-params.ts @@ -2,7 +2,7 @@ import { actor } from "rivetkit"; export const counterWithParams = actor({ state: { count: 0, initializers: [] as string[] }, - createConnState: (c, opts, params: { name?: string }) => { + createConnState: (c, params: { name?: string }) => { return { name: params.name || "anonymous", }; diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/conn-state.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/conn-state.ts index 2dbb1c172d..92a93963b5 100644 --- a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/conn-state.ts +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/conn-state.ts @@ -16,7 +16,6 @@ export const connStateActor = actor({ // Define connection state createConnState: ( c, - opts, params: { username?: string; role?: string; noCount?: boolean }, ): ConnState => { return { diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/lifecycle.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/lifecycle.ts index 292ca8da91..2fb790e734 100644 --- a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/lifecycle.ts +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/lifecycle.ts @@ -7,13 +7,13 @@ export const counterWithLifecycle = actor({ count: 0, events: [] as string[], }, - createConnState: (c, opts, params: ConnParams) => ({ + createConnState: (c, params: ConnParams) => ({ joinTime: Date.now(), }), onWake: (c) => { c.state.events.push("onWake"); }, - onBeforeConnect: (c, opts, params: ConnParams) => { + onBeforeConnect: (c, params: ConnParams) => { if (params?.trackLifecycle) c.state.events.push("onBeforeConnect"); }, onConnect: (c, conn) => { diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/raw-websocket.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/raw-websocket.ts index 58bd5ce752..1c89fd1f20 100644 --- a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/raw-websocket.ts +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/raw-websocket.ts @@ -5,7 +5,7 @@ export const rawWebSocketActor = actor({ connectionCount: 0, messageCount: 0, }, - onWebSocket(ctx, websocket, opts) { + onWebSocket(ctx, websocket) { ctx.state.connectionCount = ctx.state.connectionCount + 1; console.log( `[ACTOR] New connection, count: ${ctx.state.connectionCount}`, @@ -51,15 +51,16 @@ export const rawWebSocketActor = actor({ }), ); } else if (parsed.type === "getRequestInfo") { + throw "TODO"; // Send back the request URL info - websocket.send( - JSON.stringify({ - type: "requestInfo", - url: opts.request.url, - pathname: new URL(opts.request.url).pathname, - search: new URL(opts.request.url).search, - }), - ); + // websocket.send( + // JSON.stringify({ + // type: "requestInfo", + // url: opts.request.url, + // pathname: new URL(opts.request.url).pathname, + // search: new URL(opts.request.url).search, + // }), + // ); } else { // Echo back websocket.send(data); @@ -93,7 +94,7 @@ export const rawWebSocketActor = actor({ }); export const rawWebSocketBinaryActor = actor({ - onWebSocket(ctx, websocket, opts) { + onWebSocket(ctx, websocket) { // Handle binary data websocket.addEventListener("message", (event: any) => { const data = event.data; diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/request-access.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/request-access.ts index 5caf7dd150..2825b4faf8 100644 --- a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/request-access.ts +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/request-access.ts @@ -31,19 +31,19 @@ export const requestAccessActor = actor({ requestHeaders: {} as Record, }, }, - createConnState: (c, { request }, params: { trackRequest?: boolean }) => { + createConnState: (c, params: { trackRequest?: boolean }) => { // In createConnState, the state isn't available yet. return { trackRequest: params?.trackRequest || false, requestInfo: - params?.trackRequest && request + params?.trackRequest && c.request ? { hasRequest: true, - requestUrl: request.url, - requestMethod: request.method, + requestUrl: c.request.url, + requestMethod: c.request.method, requestHeaders: Object.fromEntries( - request.headers.entries(), + c.request.headers.entries(), ), } : null, @@ -55,16 +55,16 @@ export const requestAccessActor = actor({ c.state.createConnStateRequest = conn.state.requestInfo; } }, - onBeforeConnect: (c, { request }, params) => { + onBeforeConnect: (c, params) => { if (params?.trackRequest) { - if (request) { + if (c.request) { c.state.onBeforeConnectRequest.hasRequest = true; - c.state.onBeforeConnectRequest.requestUrl = request.url; - c.state.onBeforeConnectRequest.requestMethod = request.method; + c.state.onBeforeConnectRequest.requestUrl = c.request.url; + c.state.onBeforeConnectRequest.requestMethod = c.request.method; // Store select headers const headers: Record = {}; - request.headers.forEach((value, key) => { + c.request.headers.forEach((value, key) => { headers[key] = value; }); c.state.onBeforeConnectRequest.requestHeaders = headers; @@ -101,15 +101,16 @@ export const requestAccessActor = actor({ }, ); }, - onWebSocket: (c, websocket, { request }) => { + onWebSocket: (c, websocket) => { + if (!c.request) throw "Missing request"; // Store request info c.state.onWebSocketRequest.hasRequest = true; - c.state.onWebSocketRequest.requestUrl = request.url; - c.state.onWebSocketRequest.requestMethod = request.method; + c.state.onWebSocketRequest.requestUrl = c.request.url; + c.state.onWebSocketRequest.requestMethod = c.request.method; // Store select headers const headers: Record = {}; - request.headers.forEach((value, key) => { + c.request.headers.forEach((value, key) => { headers[key] = value; }); c.state.onWebSocketRequest.requestHeaders = headers; @@ -118,8 +119,8 @@ export const requestAccessActor = actor({ websocket.send( JSON.stringify({ hasRequest: true, - requestUrl: request.url, - requestMethod: request.method, + requestUrl: c.request.url, + requestMethod: c.request.method, requestHeaders: headers, }), ); diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/sleep.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/sleep.ts index 1856c3fe87..49d963786b 100644 --- a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/sleep.ts +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/sleep.ts @@ -112,7 +112,7 @@ export const sleepWithRawWebSocket = actor({ onSleep: (c) => { c.state.sleepCount += 1; }, - onWebSocket: (c, websocket: UniversalWebSocket, opts) => { + onWebSocket: (c, websocket: UniversalWebSocket) => { c.state.connectionCount += 1; c.log.info({ msg: "websocket connected", diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/config.ts b/rivetkit-typescript/packages/rivetkit/src/actor/config.ts index 9625cb229c..8be141b4ad 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/config.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/config.ts @@ -3,6 +3,9 @@ import type { UniversalWebSocket } from "@/common/websocket-interface"; import type { Conn } from "./conn/mod"; import type { ActionContext } from "./contexts/action"; import type { ActorContext } from "./contexts/actor"; +import type { CreateConnStateContext } from "./contexts/create-conn-state"; +import type { OnBeforeConnectContext } from "./contexts/on-before-connect"; +import type { OnConnectContext } from "./contexts/on-connect"; import type { RequestContext } from "./contexts/request"; import type { WebSocketContext } from "./contexts/websocket"; import type { AnyDatabaseProvider } from "./database"; @@ -113,15 +116,6 @@ export const ActorConfigSchema = z }, ); -export interface OnConnectOptions { - /** - * The request object associated with the connection. - * - * @experimental - */ - request?: Request; -} - // Creates state config // // This must have only one or the other or else TState will not be able to be inferred @@ -148,13 +142,12 @@ type CreateConnState< TConnState, TVars, TInput, - TDatabase, + TDatabase extends AnyDatabaseProvider, > = | { connState: TConnState } | { createConnState: ( - c: InitContext, - opts: OnConnectOptions, + c: CreateConnStateContext, params: TConnParams, ) => TConnState | Promise; } @@ -323,15 +316,7 @@ interface BaseActorConfig< * @throws Throw an error to reject the connection */ onBeforeConnect?: ( - c: ActorContext< - TState, - TConnParams, - TConnState, - TVars, - TInput, - TDatabase - >, - opts: OnConnectOptions, + c: OnBeforeConnectContext, params: TConnParams, ) => void | Promise; @@ -345,14 +330,7 @@ interface BaseActorConfig< * @returns Void or a Promise that resolves when connection handling is complete */ onConnect?: ( - c: ActorContext< - TState, - TConnParams, - TConnState, - TVars, - TInput, - TDatabase - >, + c: OnConnectContext, conn: Conn, ) => void | Promise; @@ -446,7 +424,6 @@ interface BaseActorConfig< TDatabase >, websocket: UniversalWebSocket, - opts: { request: Request }, ) => void | Promise; actions: TActions; diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/conn/drivers/websocket.ts b/rivetkit-typescript/packages/rivetkit/src/actor/conn/drivers/websocket.ts index 2a6f0ab2ad..614d7f3cff 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/conn/drivers/websocket.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/conn/drivers/websocket.ts @@ -5,7 +5,7 @@ import type { CachedSerializer, Encoding } from "@/actor/protocol/serde"; import type * as protocol from "@/schemas/client-protocol/mod"; import { type ConnDriver, DriverReadyState } from "../driver"; -export type ConnDriverWebSocketState = {}; +export type ConnDriverWebSocketState = Record; export function createWebSocketSocket( requestId: string, diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/contexts/action.ts b/rivetkit-typescript/packages/rivetkit/src/actor/contexts/action.ts index a8953d883e..9e99862a69 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/contexts/action.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/contexts/action.ts @@ -1,17 +1,10 @@ -import type { ActorKey } from "@/actor/mod"; -import type { Client } from "@/client/client"; -import type { Logger } from "@/common/log"; -import type { Registry } from "@/registry/mod"; -import type { Conn, ConnId } from "../conn/mod"; -import type { AnyDatabaseProvider, InferDatabaseClient } from "../database"; -import type { SaveStateOptions } from "../instance/state-manager"; -import type { Schedule } from "../schedule"; -import type { ActorContext } from "./actor"; +import type { Conn } from "../conn/mod"; +import type { AnyDatabaseProvider } from "../database"; +import type { ActorInstance } from "../instance/mod"; +import { ConnContext } from "./conn"; /** * Context for a remote procedure call. - * - * @typeParam A Actor this action belongs to */ export class ActionContext< TState, @@ -20,159 +13,11 @@ export class ActionContext< TVars, TInput, TDatabase extends AnyDatabaseProvider, -> { - #actorContext: ActorContext< - TState, - TConnParams, - TConnState, - TVars, - TInput, - TDatabase - >; - - /** - * Should not be called directly. - * - * @param actorContext - The actor context - * @param conn - The connection associated with the action - */ - constructor( - actorContext: ActorContext< - TState, - TConnParams, - TConnState, - TVars, - TInput, - TDatabase - >, - public readonly conn: Conn< - TState, - TConnParams, - TConnState, - TVars, - TInput, - TDatabase - >, - ) { - this.#actorContext = actorContext; - } - - /** - * Get the actor state - */ - get state(): TState { - return this.#actorContext.state; - } - - /** - * Get the actor variables - */ - get vars(): TVars { - return this.#actorContext.vars; - } - - /** - * Broadcasts an event to all connected clients. - */ - broadcast(name: string, ...args: any[]): void { - this.#actorContext.broadcast(name, ...args); - } - - /** - * Gets the logger instance. - */ - get log(): Logger { - return this.#actorContext.log; - } - - /** - * Gets actor ID. - */ - get actorId(): string { - return this.#actorContext.actorId; - } - - /** - * Gets the actor name. - */ - get name(): string { - return this.#actorContext.name; - } - - /** - * Gets the actor key. - */ - get key(): ActorKey { - return this.#actorContext.key; - } - - /** - * Gets the region. - */ - get region(): string { - return this.#actorContext.region; - } - - /** - * Gets the scheduler. - */ - get schedule(): Schedule { - return this.#actorContext.schedule; - } - - /** - * Gets the map of connections. - */ - get conns(): Map< - ConnId, - Conn - > { - return this.#actorContext.conns; - } - - /** - * Returns the client for the given registry. - */ - client>(): Client { - return this.#actorContext.client(); - } - - /** - * @experimental - */ - get db(): InferDatabaseClient { - return this.#actorContext.db; - } - - /** - * Forces the state to get saved. - */ - async saveState(opts: SaveStateOptions): Promise { - return this.#actorContext.saveState(opts); - } - - /** - * Prevents the actor from sleeping until promise is complete. - */ - waitUntil(promise: Promise): void { - this.#actorContext.waitUntil(promise); - } - - /** - * AbortSignal that fires when the actor is stopping. - */ - get abortSignal(): AbortSignal { - return this.#actorContext.abortSignal; - } - - /** - * Forces the actor to sleep. - * - * Not supported on all drivers. - * - * @experimental - */ - sleep() { - this.#actorContext.sleep(); - } -} +> extends ConnContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase +> {} diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/contexts/conn-init.ts b/rivetkit-typescript/packages/rivetkit/src/actor/contexts/conn-init.ts new file mode 100644 index 0000000000..801bd86046 --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/src/actor/contexts/conn-init.ts @@ -0,0 +1,31 @@ +import type { AnyDatabaseProvider } from "../database"; +import type { ActorInstance } from "../instance/mod"; +import { ActorContext } from "./actor"; + +/** + * Base context for connection initialization handlers. + * Extends ActorContext with request-specific functionality for connection lifecycle events. + */ +export abstract class ConnInitContext< + TState, + TVars, + TInput, + TDatabase extends AnyDatabaseProvider, +> extends ActorContext { + /** + * The incoming request that initiated the connection. + * May be undefined for connections initiated without a direct HTTP request. + */ + public readonly request: Request | undefined; + + /** + * @internal + */ + constructor( + actor: ActorInstance, + request: Request | undefined, + ) { + super(actor); + this.request = request; + } +} diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/contexts/conn.ts b/rivetkit-typescript/packages/rivetkit/src/actor/contexts/conn.ts new file mode 100644 index 0000000000..725d40b418 --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/src/actor/contexts/conn.ts @@ -0,0 +1,48 @@ +import type { Conn } from "../conn/mod"; +import type { AnyDatabaseProvider } from "../database"; +import type { ActorInstance } from "../instance/mod"; +import { ActorContext } from "./actor"; + +/** + * Base context for connection-based handlers. + * Extends ActorContext with connection-specific functionality. + */ +export abstract class ConnContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase extends AnyDatabaseProvider, +> extends ActorContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase +> { + /** + * @internal + */ + constructor( + actor: ActorInstance< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase + >, + public readonly conn: Conn< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase + >, + ) { + super(actor); + } +} diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/contexts/create-conn-state.ts b/rivetkit-typescript/packages/rivetkit/src/actor/contexts/create-conn-state.ts new file mode 100644 index 0000000000..78e9c78e6c --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/src/actor/contexts/create-conn-state.ts @@ -0,0 +1,13 @@ +import type { AnyDatabaseProvider } from "../database"; +import { ConnInitContext } from "./conn-init"; + +/** + * Context for the createConnState lifecycle hook. + * Called to initialize connection-specific state when a connection is created. + */ +export class CreateConnStateContext< + TState, + TVars, + TInput, + TDatabase extends AnyDatabaseProvider, +> extends ConnInitContext {} diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/contexts/on-before-connect.ts b/rivetkit-typescript/packages/rivetkit/src/actor/contexts/on-before-connect.ts new file mode 100644 index 0000000000..759c210052 --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/src/actor/contexts/on-before-connect.ts @@ -0,0 +1,13 @@ +import type { AnyDatabaseProvider } from "../database"; +import { ConnInitContext } from "./conn-init"; + +/** + * Context for the onBeforeConnect lifecycle hook. + * Called before a connection is established, allowing for validation and early rejection. + */ +export class OnBeforeConnectContext< + TState, + TVars, + TInput, + TDatabase extends AnyDatabaseProvider, +> extends ConnInitContext {} diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/contexts/on-connect.ts b/rivetkit-typescript/packages/rivetkit/src/actor/contexts/on-connect.ts new file mode 100644 index 0000000000..ae203cfa69 --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/src/actor/contexts/on-connect.ts @@ -0,0 +1,13 @@ +import type { AnyDatabaseProvider } from "../database"; +import { ConnInitContext } from "./conn-init"; + +/** + * Context for the onConnect lifecycle hook. + * Called when a connection is successfully established. + */ +export class OnConnectContext< + TState, + TVars, + TInput, + TDatabase extends AnyDatabaseProvider, +> extends ConnInitContext {} diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/contexts/request.ts b/rivetkit-typescript/packages/rivetkit/src/actor/contexts/request.ts index 1aa07b9afc..1615687f61 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/contexts/request.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/contexts/request.ts @@ -1,22 +1,10 @@ -import type { ActorKey } from "@/actor/mod"; -import type { Client } from "@/client/client"; -import type { Logger } from "@/common/log"; -import type { Registry } from "@/registry/mod"; -import type { Conn, ConnId } from "../conn/mod"; -import type { AnyDatabaseProvider, InferDatabaseClient } from "../database"; -import type { SaveStateOptions } from "../instance/state-manager"; -import type { Schedule } from "../schedule"; -import type { ActorContext } from "./actor"; +import type { Conn } from "../conn/mod"; +import type { AnyDatabaseProvider } from "../database"; +import type { ActorInstance } from "../instance/mod"; +import { ConnContext } from "./conn"; /** * Context for raw HTTP request handlers (onRequest). - * - * @typeParam TState - The actor state type - * @typeParam TConnParams - The connection parameters type - * @typeParam TConnState - The connection state type - * @typeParam TVars - The actor variables type - * @typeParam TInput - The actor input type - * @typeParam TDatabase - The database provider type */ export class RequestContext< TState, @@ -25,32 +13,25 @@ export class RequestContext< TVars, TInput, TDatabase extends AnyDatabaseProvider, +> extends ConnContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase > { - #actorContext: ActorContext< - TState, - TConnParams, - TConnState, - TVars, - TInput, - TDatabase - >; + /** + * The incoming HTTP request. + * May be undefined for request contexts initiated without a direct HTTP request. + */ + public readonly request: Request | undefined; /** - * Should not be called directly. - * - * @param actorContext - The actor context - * @param conn - The connection associated with the request + * @internal */ constructor( - actorContext: ActorContext< - TState, - TConnParams, - TConnState, - TVars, - TInput, - TDatabase - >, - public readonly conn: Conn< + actor: ActorInstance< TState, TConnParams, TConnState, @@ -58,126 +39,10 @@ export class RequestContext< TInput, TDatabase >, + conn: Conn, + request?: Request, ) { - this.#actorContext = actorContext; - } - - /** - * Get the actor state - */ - get state(): TState { - return this.#actorContext.state; - } - - /** - * Get the actor variables - */ - get vars(): TVars { - return this.#actorContext.vars; - } - - /** - * Broadcasts an event to all connected clients. - */ - broadcast(name: string, ...args: any[]): void { - this.#actorContext.broadcast(name, ...args); - } - - /** - * Gets the logger instance. - */ - get log(): Logger { - return this.#actorContext.log; - } - - /** - * Gets actor ID. - */ - get actorId(): string { - return this.#actorContext.actorId; - } - - /** - * Gets the actor name. - */ - get name(): string { - return this.#actorContext.name; - } - - /** - * Gets the actor key. - */ - get key(): ActorKey { - return this.#actorContext.key; - } - - /** - * Gets the region. - */ - get region(): string { - return this.#actorContext.region; - } - - /** - * Gets the scheduler. - */ - get schedule(): Schedule { - return this.#actorContext.schedule; - } - - /** - * Gets the map of connections. - */ - get conns(): Map< - ConnId, - Conn - > { - return this.#actorContext.conns; - } - - /** - * Returns the client for the given registry. - */ - client>(): Client { - return this.#actorContext.client(); - } - - /** - * @experimental - */ - get db(): InferDatabaseClient { - return this.#actorContext.db; - } - - /** - * Forces the state to get saved. - */ - async saveState(opts: SaveStateOptions): Promise { - return this.#actorContext.saveState(opts); - } - - /** - * Prevents the actor from sleeping until promise is complete. - */ - waitUntil(promise: Promise): void { - this.#actorContext.waitUntil(promise); - } - - /** - * AbortSignal that fires when the actor is stopping. - */ - get abortSignal(): AbortSignal { - return this.#actorContext.abortSignal; - } - - /** - * Forces the actor to sleep. - * - * Not supported on all drivers. - * - * @experimental - */ - sleep() { - this.#actorContext.sleep(); + super(actor, conn); + this.request = request; } } diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/contexts/websocket.ts b/rivetkit-typescript/packages/rivetkit/src/actor/contexts/websocket.ts index 8403432531..f368330971 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/contexts/websocket.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/contexts/websocket.ts @@ -1,22 +1,10 @@ -import type { ActorKey } from "@/actor/mod"; -import type { Client } from "@/client/client"; -import type { Logger } from "@/common/log"; -import type { Registry } from "@/registry/mod"; -import type { Conn, ConnId } from "../conn/mod"; -import type { AnyDatabaseProvider, InferDatabaseClient } from "../database"; -import type { SaveStateOptions } from "../instance/state-manager"; -import type { Schedule } from "../schedule"; -import type { ActorContext } from "./actor"; +import type { Conn } from "../conn/mod"; +import type { AnyDatabaseProvider } from "../database"; +import type { ActorInstance } from "../instance/mod"; +import { ConnContext } from "./conn"; /** * Context for raw WebSocket handlers (onWebSocket). - * - * @typeParam TState - The actor state type - * @typeParam TConnParams - The connection parameters type - * @typeParam TConnState - The connection state type - * @typeParam TVars - The actor variables type - * @typeParam TInput - The actor input type - * @typeParam TDatabase - The database provider type */ export class WebSocketContext< TState, @@ -25,32 +13,25 @@ export class WebSocketContext< TVars, TInput, TDatabase extends AnyDatabaseProvider, +> extends ConnContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase > { - #actorContext: ActorContext< - TState, - TConnParams, - TConnState, - TVars, - TInput, - TDatabase - >; + /** + * The incoming HTTP request that initiated the WebSocket upgrade. + * May be undefined for WebSocket connections initiated without a direct HTTP request. + */ + public readonly request: Request | undefined; /** - * Should not be called directly. - * - * @param actorContext - The actor context - * @param conn - The connection associated with the WebSocket + * @internal */ constructor( - actorContext: ActorContext< - TState, - TConnParams, - TConnState, - TVars, - TInput, - TDatabase - >, - public readonly conn: Conn< + actor: ActorInstance< TState, TConnParams, TConnState, @@ -58,126 +39,10 @@ export class WebSocketContext< TInput, TDatabase >, + conn: Conn, + request?: Request, ) { - this.#actorContext = actorContext; - } - - /** - * Get the actor state - */ - get state(): TState { - return this.#actorContext.state; - } - - /** - * Get the actor variables - */ - get vars(): TVars { - return this.#actorContext.vars; - } - - /** - * Broadcasts an event to all connected clients. - */ - broadcast(name: string, ...args: any[]): void { - this.#actorContext.broadcast(name, ...args); - } - - /** - * Gets the logger instance. - */ - get log(): Logger { - return this.#actorContext.log; - } - - /** - * Gets actor ID. - */ - get actorId(): string { - return this.#actorContext.actorId; - } - - /** - * Gets the actor name. - */ - get name(): string { - return this.#actorContext.name; - } - - /** - * Gets the actor key. - */ - get key(): ActorKey { - return this.#actorContext.key; - } - - /** - * Gets the region. - */ - get region(): string { - return this.#actorContext.region; - } - - /** - * Gets the scheduler. - */ - get schedule(): Schedule { - return this.#actorContext.schedule; - } - - /** - * Gets the map of connections. - */ - get conns(): Map< - ConnId, - Conn - > { - return this.#actorContext.conns; - } - - /** - * Returns the client for the given registry. - */ - client>(): Client { - return this.#actorContext.client(); - } - - /** - * @experimental - */ - get db(): InferDatabaseClient { - return this.#actorContext.db; - } - - /** - * Forces the state to get saved. - */ - async saveState(opts: SaveStateOptions): Promise { - return this.#actorContext.saveState(opts); - } - - /** - * Prevents the actor from sleeping until promise is complete. - */ - waitUntil(promise: Promise): void { - this.#actorContext.waitUntil(promise); - } - - /** - * AbortSignal that fires when the actor is stopping. - */ - get abortSignal(): AbortSignal { - return this.#actorContext.abortSignal; - } - - /** - * Forces the actor to sleep. - * - * Not supported on all drivers. - * - * @experimental - */ - sleep() { - this.#actorContext.sleep(); + super(actor, conn); + this.request = request; } } diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/instance/connection-manager.ts b/rivetkit-typescript/packages/rivetkit/src/actor/instance/connection-manager.ts index b888bd039a..0749b57808 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/instance/connection-manager.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/instance/connection-manager.ts @@ -1,6 +1,5 @@ import * as cbor from "cbor-x"; import { arrayBuffersEqual, idToStr, stringifyError } from "@/utils"; -import type { OnConnectOptions } from "../config"; import type { ConnDriver } from "../conn/driver"; import { CONN_DRIVER_SYMBOL, @@ -11,6 +10,9 @@ import { Conn, type ConnId, } from "../conn/mod"; +import { CreateConnStateContext } from "../contexts/create-conn-state"; +import { OnBeforeConnectContext } from "../contexts/on-before-connect"; +import { OnConnectContext } from "../contexts/on-connect"; import type { AnyDatabaseProvider } from "../database"; import type { ActorDriver } from "../driver"; import { deadline } from "../utils"; @@ -73,7 +75,7 @@ export class ConnectionManager< async createConn( driver: ConnDriver, params: CP, - request?: Request, + request: Request | undefined, ): Promise> { // Check for hibernatable websocket reconnection if (driver.requestIdBuf && driver.hibernatable) { @@ -274,26 +276,15 @@ export class ConnectionManager< // Prepare connection state let connState: CS | undefined; - const onBeforeConnectOpts = { - request, - } satisfies OnConnectOptions; - // Call onBeforeConnect hook if (config.onBeforeConnect) { - await config.onBeforeConnect( - this.#actor.actorContext, - onBeforeConnectOpts, - params, - ); + const ctx = new OnBeforeConnectContext(this.#actor, request); + await config.onBeforeConnect(ctx, params); } // Create connection state if enabled if ((this.#actor as any).connStateEnabled) { - connState = await this.#createConnState( - config, - onBeforeConnectOpts, - params, - ); + connState = await this.#createConnState(config, params, request); } // Create connection persist data @@ -326,7 +317,7 @@ export class ConnectionManager< // Call onConnect lifecycle hook if (config.onConnect) { - this.#callOnConnect(config, conn); + this.#callOnConnect(config, conn, request); } this.#actor.inspector.emitter.emit("connectionUpdated"); @@ -336,15 +327,12 @@ export class ConnectionManager< async #createConnState( config: any, - opts: OnConnectOptions, params: CP, + request: Request | undefined, ): Promise { if ("createConnState" in config) { - const dataOrPromise = config.createConnState( - this.#actor.actorContext, - opts, - params, - ); + const ctx = new CreateConnStateContext(this.#actor, request); + const dataOrPromise = config.createConnState(ctx, params); if (dataOrPromise instanceof Promise) { return await deadline( dataOrPromise, @@ -369,9 +357,14 @@ export class ConnectionManager< ); } - #callOnConnect(config: any, conn: Conn) { + #callOnConnect( + config: any, + conn: Conn, + request: Request | undefined, + ) { try { - const result = config.onConnect(this.#actor.actorContext, conn); + const ctx = new OnConnectContext(this.#actor, request); + const result = config.onConnect(ctx, conn); if (result instanceof Promise) { deadline(result, config.options.onConnectTimeout).catch( (error: any) => { diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts b/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts index ac8e6fe117..4170993b98 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts @@ -175,7 +175,7 @@ export class ActorInstance { try { return await this.executeAction( - new ActionContext(this.actorContext, conn), + new ActionContext(this, conn), name, params || [], ); @@ -493,7 +493,7 @@ export class ActorInstance { async createConn( driver: ConnDriver, params: any, - request?: Request, + request: Request | undefined, ): Promise> { this.#assertReady(); @@ -658,7 +658,7 @@ export class ActorInstance { } try { - const ctx = new RequestContext(this.actorContext, conn); + const ctx = new RequestContext(this, conn, request); const response = await this.#config.onRequest(ctx, request); if (!response) { throw new errors.InvalidRequestHandlerResponse(); @@ -678,7 +678,7 @@ export class ActorInstance { async handleRawWebSocket( conn: Conn, websocket: UniversalWebSocket, - opts: { request: Request }, + request?: Request, ): Promise { this.#assertReady(); @@ -693,8 +693,8 @@ export class ActorInstance { this.#resetSleepTimer(); // Handle WebSocket - const ctx = new WebSocketContext(this.actorContext, conn); - await this.#config.onWebSocket(ctx, websocket, opts); + const ctx = new WebSocketContext(this, conn, request); + await this.#config.onWebSocket(ctx, websocket); // Save state if changed if (this.#stateManager.persistChanged && !stateBeforeHandler) { diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/mod.ts b/rivetkit-typescript/packages/rivetkit/src/actor/mod.ts index 3a329b243a..686a1a275e 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/mod.ts @@ -76,6 +76,10 @@ export type * from "./config"; export type { Conn } from "./conn/mod"; export type { ActionContext } from "./contexts/action"; export type { ActorContext } from "./contexts/actor"; +export type { ConnInitContext } from "./contexts/conn-init"; +export type { CreateConnStateContext } from "./contexts/create-conn-state"; +export type { OnBeforeConnectContext } from "./contexts/on-before-connect"; +export type { OnConnectContext } from "./contexts/on-connect"; export type { RequestContext } from "./contexts/request"; export type { WebSocketContext } from "./contexts/websocket"; export type { diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/protocol/old.ts b/rivetkit-typescript/packages/rivetkit/src/actor/protocol/old.ts index ec41345e14..feadf2d244 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/protocol/old.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/protocol/old.ts @@ -142,10 +142,7 @@ export async function processMessage< actionName: name, }); - const ctx = new ActionContext( - actor.actorContext, - conn, - ); + const ctx = new ActionContext(actor, conn); // Process the action request and wait for the result // This will wait for async actions to complete diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/router-endpoints.ts b/rivetkit-typescript/packages/rivetkit/src/actor/router-endpoints.ts index 131f413225..4a6cd77a46 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/router-endpoints.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/router-endpoints.ts @@ -352,7 +352,7 @@ export async function handleAction( ); // Call action - const ctx = new ActionContext(actor.actorContext!, conn!); + const ctx = new ActionContext(actor, conn!); output = await actor.executeAction(ctx, actionName, actionArgs); } finally { if (conn) { @@ -474,9 +474,7 @@ export async function handleRawWebSocket( createdConn = conn; // Call the actor's onWebSocket handler with the adapted WebSocket - actor.handleRawWebSocket(conn, adapter, { - request: newRequest, - }); + actor.handleRawWebSocket(conn, adapter, newRequest); } catch (error) { actor.rLog.error({ msg: "failed to create raw WebSocket connection", diff --git a/website/public/llms-full.txt b/website/public/llms-full.txt index 26bdf66b13..c1c784cffc 100644 --- a/website/public/llms-full.txt +++ b/website/public/llms-full.txt @@ -1713,7 +1713,7 @@ For HTTP requests, the router expects these headers: ```typescript // Direct HTTP request to actor -const response = await fetch("http://localhost:8080/registry/actors/myActor/raw/http/api/hello", +const response = await fetch("http://localhost:8080/registry/actors/myActor/request/api/hello", }), "X-RivetKit-Encoding": "json", "X-RivetKit-Conn-Params": JSON.stringify() @@ -1724,7 +1724,7 @@ const data = await response.json(); console.log(data); // // POST request with data -const postResponse = await fetch("http://localhost:8080/registry/actors/myActor/raw/http/api/echo", +const postResponse = await fetch("http://localhost:8080/registry/actors/myActor/request/api/echo", }), "X-RivetKit-Encoding": "json", "X-RivetKit-Conn-Params": JSON.stringify(), @@ -6341,6 +6341,11 @@ kubectl -n rivet-engine wait --for=condition=ready pod -l app=postgres --timeout ### 3. Deploy Rivet Engine +The Rivet Engine deployment consists of two components: + +- **Main Engine Deployment**: Runs all services except singleton services. Configured with Horizontal Pod Autoscaling (HPA) to automatically scale between 2-10 replicas based on CPU and memory utilization. +- **Singleton Engine Deployment**: Runs singleton services that must have exactly 1 replica (e.g., schedulers, coordinators). + Save as `rivet-engine.yaml`: ```yaml @@ -6382,7 +6387,7 @@ metadata: name: rivet-engine namespace: rivet-engine spec: - replicas: 1 + replicas: 2 selector: matchLabels: app: rivet-engine @@ -6396,6 +6401,8 @@ spec: image: rivetkit/engine:latest args: - start + - --except-services + - singleton env: - name: RIVET_CONFIG_PATH value: /etc/rivet/config.jsonc @@ -6410,16 +6417,107 @@ spec: readOnly: true resources: requests: - cpu: 500m - memory: 1Gi + cpu: 2000m + memory: 4Gi limits: + cpu: 4000m + memory: 8Gi + startupProbe: + httpGet: + path: /health + port: 6421 + initialDelaySeconds: 30 + periodSeconds: 10 + failureThreshold: 30 + readinessProbe: + httpGet: + path: /health + port: 6421 + periodSeconds: 5 + failureThreshold: 2 + livenessProbe: + httpGet: + path: /health + port: 6421 + periodSeconds: 10 + failureThreshold: 3 + volumes: + - name: config + configMap: + name: engine-config +--- +apiVersion: autoscaling/v2 +kind: HorizontalPodAutoscaler +metadata: + name: rivet-engine + namespace: rivet-engine +spec: + scaleTargetRef: + apiVersion: apps/v1 + kind: Deployment + name: rivet-engine + minReplicas: 2 + maxReplicas: 10 + metrics: + - type: Resource + resource: + name: cpu + target: + type: Utilization + averageUtilization: 60 + - type: Resource + resource: + name: memory + target: + type: Utilization + averageUtilization: 80 +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: rivet-engine-singleton + namespace: rivet-engine +spec: + replicas: 1 + selector: + matchLabels: + app: rivet-engine-singleton + template: + metadata: + labels: + app: rivet-engine-singleton + spec: + containers: + - name: rivet-engine + image: rivetkit/engine:latest + args: + - start + - --services + - singleton + - --services + - api-peer + env: + - name: RIVET_CONFIG_PATH + value: /etc/rivet/config.jsonc + ports: + - containerPort: 6421 + name: api-peer + volumeMounts: + - name: config + mountPath: /etc/rivet + readOnly: true + resources: + requests: cpu: 2000m memory: 4Gi + limits: + cpu: 4000m + memory: 8Gi startupProbe: httpGet: path: /health port: 6421 - initialDelaySeconds: 10 + initialDelaySeconds: 30 periodSeconds: 10 failureThreshold: 30 readinessProbe: @@ -6427,11 +6525,13 @@ spec: path: /health port: 6421 periodSeconds: 5 + failureThreshold: 2 livenessProbe: httpGet: path: /health port: 6421 periodSeconds: 10 + failureThreshold: 3 volumes: - name: config configMap: @@ -6443,9 +6543,21 @@ Apply and wait for the engine to be ready: ```bash kubectl apply -f rivet-engine.yaml kubectl -n rivet-engine wait --for=condition=ready pod -l app=rivet-engine --timeout=300s +kubectl -n rivet-engine wait --for=condition=ready pod -l app=rivet-engine-singleton --timeout=300s ``` -### 4. Access the Engine +**Note**: The HPA requires a metrics server to be running in your cluster. Most Kubernetes distributions (including k3d, GKE, EKS, AKS) include this by default. + +### 4. Verify Deployment + +Check that all pods are running (you should see 2+ engine pods and 1 singleton pod): + +```bash +kubectl -n rivet-engine get pods +kubectl -n rivet-engine get hpa +``` + +### 5. Access the Engine Get the service URL: @@ -6528,14 +6640,44 @@ k3d cluster delete rivet ### Scaling -For horizontal scaling, update the deployment: +The engine is configured with Horizontal Pod Autoscaling (HPA) by default, automatically scaling between 2-10 replicas based on CPU (60%) and memory (80%) utilization. + +To adjust the scaling parameters, modify the HPA configuration: ```yaml +apiVersion: autoscaling/v2 +kind: HorizontalPodAutoscaler +metadata: + name: rivet-engine + namespace: rivet-engine spec: - replicas: 3 # Multiple replicas -``` + scaleTargetRef: + apiVersion: apps/v1 + kind: Deployment + name: rivet-engine + minReplicas: 2 # Adjust minimum replicas + maxReplicas: 20 # Adjust maximum replicas + metrics: + - type: Resource + resource: + name: cpu + target: + type: Utilization + averageUtilization: 70 # Adjust CPU threshold + - type: Resource + resource: + name: memory + target: + type: Utilization + averageUtilization: 80 # Adjust memory threshold +``` + +Monitor HPA status: -See our [HPA set up on github](https://github.com/rivet-gg/rivet/tree/main/k8s/engines/05-rivet-engine-hpa.yaml) for info on configuring automatic horizontal scaling. +```bash +kubectl -n rivet-engine get hpa +kubectl -n rivet-engine describe hpa rivet-engine +``` ## Next Steps diff --git a/website/public/llms.txt b/website/public/llms.txt index b2d6fa1cf4..92b2c848d1 100644 --- a/website/public/llms.txt +++ b/website/public/llms.txt @@ -24,8 +24,11 @@ https://rivet.dev/blog/2025-10-01-railway-selfhost https://rivet.dev/blog/2025-10-05-weekly-updates https://rivet.dev/blog/2025-10-09-rivet-cloud-launch https://rivet.dev/blog/2025-10-17-rivet-actors-vercel +https://rivet.dev/blog/2025-10-19-weekly-updates https://rivet.dev/blog/2025-10-20-how-we-built-websocket-servers-for-vercel-functions https://rivet.dev/blog/2025-10-20-weekly-updates +https://rivet.dev/blog/2025-10-24-weekly-updates +https://rivet.dev/blog/2025-11-02-weekly-updates https://rivet.dev/blog/godot-multiplayer-compared-to-unity https://rivet.dev/changelog https://rivet.dev/changelog.json @@ -50,8 +53,11 @@ https://rivet.dev/changelog/2025-10-01-railway-selfhost https://rivet.dev/changelog/2025-10-05-weekly-updates https://rivet.dev/changelog/2025-10-09-rivet-cloud-launch https://rivet.dev/changelog/2025-10-17-rivet-actors-vercel +https://rivet.dev/changelog/2025-10-19-weekly-updates https://rivet.dev/changelog/2025-10-20-how-we-built-websocket-servers-for-vercel-functions https://rivet.dev/changelog/2025-10-20-weekly-updates +https://rivet.dev/changelog/2025-10-24-weekly-updates +https://rivet.dev/changelog/2025-11-02-weekly-updates https://rivet.dev/changelog/godot-multiplayer-compared-to-unity https://rivet.dev/cloud https://rivet.dev/docs/actors