diff --git a/examples/cloudflare-workers-hono/wrangler.json b/examples/cloudflare-workers-hono/wrangler.json index 29b055cf3f..f5b84c4ef6 100644 --- a/examples/cloudflare-workers-hono/wrangler.json +++ b/examples/cloudflare-workers-hono/wrangler.json @@ -6,7 +6,7 @@ "migrations": [ { "tag": "v1", - "new_classes": ["ActorHandler"] + "new_sqlite_classes": ["ActorHandler"] } ], "durable_objects": { diff --git a/examples/cloudflare-workers/wrangler.json b/examples/cloudflare-workers/wrangler.json index 29b055cf3f..f5b84c4ef6 100644 --- a/examples/cloudflare-workers/wrangler.json +++ b/examples/cloudflare-workers/wrangler.json @@ -6,7 +6,7 @@ "migrations": [ { "tag": "v1", - "new_classes": ["ActorHandler"] + "new_sqlite_classes": ["ActorHandler"] } ], "durable_objects": { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index f9ed13666f..04665f8cac 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -2364,9 +2364,6 @@ importers: tsx: specifier: ^4.19.4 version: 4.20.5 - typedoc: - specifier: ^0.28.0 - version: 0.28.14(typescript@5.9.2) typescript: specifier: ^5.7.3 version: 5.9.2 diff --git a/rivetkit-typescript/packages/cloudflare-workers/src/actor-driver.ts b/rivetkit-typescript/packages/cloudflare-workers/src/actor-driver.ts index 1bace9e7af..eb45792111 100644 --- a/rivetkit-typescript/packages/cloudflare-workers/src/actor-driver.ts +++ b/rivetkit-typescript/packages/cloudflare-workers/src/actor-driver.ts @@ -1,5 +1,7 @@ import invariant from "invariant"; import type { + ActorKey, + ActorRouter, AnyActorInstance as CoreAnyActorInstance, RegistryConfig, RunConfig, @@ -12,7 +14,10 @@ import type { ManagerDriver, } from "rivetkit/driver-helpers"; import { promiseWithResolvers } from "rivetkit/utils"; -import { KEYS } from "./actor-handler-do"; +import { parseActorId } from "./actor-id"; +import { kvDelete, kvGet, kvListPrefix, kvPut } from "./actor-kv"; +import { GLOBAL_KV_KEYS } from "./global-kv"; +import { getCloudflareAmbientEnv } from "./handler"; interface DurableObjectGlobalState { ctx: DurableObjectState; @@ -25,11 +30,14 @@ interface DurableObjectGlobalState { * This allows for storing the actor context globally and looking it up by ID in `CloudflareActorsActorDriver`. */ export class CloudflareDurableObjectGlobalState { - // Single map for all actor state + // Map of actor ID -> DO state #dos: Map = new Map(); - getDOState(actorId: string): DurableObjectGlobalState { - const state = this.#dos.get(actorId); + // WeakMap of DO state -> ActorGlobalState for proper GC + #actors: WeakMap = new WeakMap(); + + getDOState(doId: string): DurableObjectGlobalState { + const state = this.#dos.get(doId); invariant( state !== undefined, "durable object state not in global state", @@ -37,8 +45,16 @@ export class CloudflareDurableObjectGlobalState { return state; } - setDOState(actorId: string, state: DurableObjectGlobalState) { - this.#dos.set(actorId, state); + setDOState(doId: string, state: DurableObjectGlobalState) { + this.#dos.set(doId, state); + } + + getActorState(ctx: DurableObjectState): ActorGlobalState | undefined { + return this.#actors.get(ctx); + } + + setActorState(ctx: DurableObjectState, actorState: ActorGlobalState): void { + this.#actors.set(ctx, actorState); } } @@ -46,11 +62,44 @@ export interface DriverContext { state: DurableObjectState; } -// Actor handler to track running instances -class ActorHandler { - actor?: AnyActorInstance; - actorPromise?: ReturnType> = - promiseWithResolvers(); +interface InitializedData { + name: string; + key: ActorKey; + generation: number; +} + +interface LoadedActor { + actorRouter: ActorRouter; + actorDriver: ActorDriver; + generation: number; +} + +// Actor global state to track running instances +export class ActorGlobalState { + // Initialization state + initialized?: InitializedData; + + // Loaded actor state + actor?: LoadedActor; + actorInstance?: AnyActorInstance; + actorPromise?: ReturnType>; + + /** + * Indicates if `startDestroy` has been called. + * + * This is stored in memory instead of SQLite since the destroy may be cancelled. + * + * See the corresponding `destroyed` property in SQLite metadata. + */ + destroying: boolean = false; + + reset() { + this.initialized = undefined; + this.actor = undefined; + this.actorInstance = undefined; + this.actorPromise = undefined; + this.destroying = false; + } } export class CloudflareActorsActorDriver implements ActorDriver { @@ -59,7 +108,6 @@ export class CloudflareActorsActorDriver implements ActorDriver { #managerDriver: ManagerDriver; #inlineClient: Client; #globalState: CloudflareDurableObjectGlobalState; - #actors: Map = new Map(); constructor( registryConfig: RegistryConfig, @@ -76,49 +124,79 @@ export class CloudflareActorsActorDriver implements ActorDriver { } #getDOCtx(actorId: string) { - return this.#globalState.getDOState(actorId).ctx; + // Parse actor ID to get DO ID + const [doId] = parseActorId(actorId); + return this.#globalState.getDOState(doId).ctx; } async loadActor(actorId: string): Promise { + // Parse actor ID to get DO ID and generation + const [doId, expectedGeneration] = parseActorId(actorId); + + // Get the DO state + const doState = this.#globalState.getDOState(doId); + // Check if actor is already loaded - let handler = this.#actors.get(actorId); - if (handler) { - if (handler.actorPromise) await handler.actorPromise.promise; - if (!handler.actor) throw new Error("Actor should be loaded"); - return handler.actor; + let actorState = this.#globalState.getActorState(doState.ctx); + if (actorState?.actorInstance) { + // Actor is already loaded, return it + return actorState.actorInstance; } - // Create new actor handler - handler = new ActorHandler(); - this.#actors.set(actorId, handler); + // Create new actor state if it doesn't exist + if (!actorState) { + actorState = new ActorGlobalState(); + actorState.actorPromise = promiseWithResolvers(); + this.#globalState.setActorState(doState.ctx, actorState); + } - // Get the actor metadata from Durable Object storage - const doState = this.#globalState.getDOState(actorId); - const storage = doState.ctx.storage; + // Another request is already loading this actor, wait for it + if (actorState.actorPromise) { + await actorState.actorPromise.promise; + if (!actorState.actorInstance) { + throw new Error( + `Actor ${actorId} failed to load in concurrent request`, + ); + } + return actorState.actorInstance; + } // Load actor metadata - const [name, key] = await Promise.all([ - storage.get(KEYS.NAME), - storage.get(KEYS.KEY), - ]); + const sql = doState.ctx.storage.sql; + const cursor = sql.exec( + "SELECT name, key, destroyed, generation FROM _rivetkit_metadata LIMIT 1", + ); + const result = cursor.raw().next(); - if (!name) { + if (result.done || !result.value) { throw new Error( - `Actor ${actorId} is not initialized - missing name`, + `Actor ${actorId} is not initialized - missing metadata`, ); } - if (!key) { + + const name = result.value[0] as string; + const key = JSON.parse(result.value[1] as string) as string[]; + const destroyed = result.value[2] as number; + const generation = result.value[3] as number; + + // Check if actor is destroyed + if (destroyed) { + throw new Error(`Actor ${actorId} is destroyed`); + } + + // Check if generation matches + if (generation !== expectedGeneration) { throw new Error( - `Actor ${actorId} is not initialized - missing key`, + `Actor ${actorId} generation mismatch - expected ${expectedGeneration}, got ${generation}`, ); } // Create actor instance const definition = lookupInRegistry(this.#registryConfig, name); - handler.actor = definition.instantiate(); + actorState.actorInstance = definition.instantiate(); // Start actor - await handler.actor.start( + await actorState.actorInstance.start( this, this.#inlineClient, actorId, @@ -128,14 +206,16 @@ export class CloudflareActorsActorDriver implements ActorDriver { ); // Finish - handler.actorPromise?.resolve(); - handler.actorPromise = undefined; + actorState.actorPromise?.resolve(); + actorState.actorPromise = undefined; - return handler.actor; + return actorState.actorInstance; } getContext(actorId: string): DriverContext { - const state = this.#globalState.getDOState(actorId); + // Parse actor ID to get DO ID + const [doId] = parseActorId(actorId); + const state = this.#globalState.getDOState(doId); return { state: state.ctx }; } @@ -147,97 +227,98 @@ export class CloudflareActorsActorDriver implements ActorDriver { return this.#getDOCtx(actorId).storage.sql; } - // Batch KV operations - convert between Uint8Array and Cloudflare's string-based API + // Batch KV operations async kvBatchPut( actorId: string, entries: [Uint8Array, Uint8Array][], ): Promise { - const storage = this.#getDOCtx(actorId).storage; - const encoder = new TextDecoder(); + const sql = this.#getDOCtx(actorId).storage.sql; - // Convert Uint8Array entries to object for Cloudflare batch put - const storageObj: Record = {}; for (const [key, value] of entries) { - // Convert key from Uint8Array to string - const keyStr = this.#uint8ArrayToKey(key); - storageObj[keyStr] = value; + kvPut(sql, key, value); } - - await storage.put(storageObj); } async kvBatchGet( actorId: string, keys: Uint8Array[], ): Promise<(Uint8Array | null)[]> { - const storage = this.#getDOCtx(actorId).storage; + const sql = this.#getDOCtx(actorId).storage.sql; - // Convert keys to strings - const keyStrs = keys.map((k) => this.#uint8ArrayToKey(k)); - - // Get values from storage - const results = await storage.get(keyStrs); + const results: (Uint8Array | null)[] = []; + for (const key of keys) { + results.push(kvGet(sql, key)); + } - // Convert Map results to array in same order as input keys - return keyStrs.map((k) => results.get(k) ?? null); + return results; } async kvBatchDelete(actorId: string, keys: Uint8Array[]): Promise { - const storage = this.#getDOCtx(actorId).storage; + const sql = this.#getDOCtx(actorId).storage.sql; - // Convert keys to strings - const keyStrs = keys.map((k) => this.#uint8ArrayToKey(k)); - - await storage.delete(keyStrs); + for (const key of keys) { + kvDelete(sql, key); + } } async kvListPrefix( actorId: string, prefix: Uint8Array, ): Promise<[Uint8Array, Uint8Array][]> { - const storage = this.#getDOCtx(actorId).storage; + const sql = this.#getDOCtx(actorId).storage.sql; - // Convert prefix to string - const prefixStr = this.#uint8ArrayToKey(prefix); + return kvListPrefix(sql, prefix); + } - // List with prefix - const results = await storage.list({ prefix: prefixStr }); + startDestroy(actorId: string): void { + // Parse actor ID to get DO ID and generation + const [doId, generation] = parseActorId(actorId); - // Convert Map to array of [key, value] tuples - const entries: [Uint8Array, Uint8Array][] = []; - for (const [key, value] of results) { - entries.push([this.#keyToUint8Array(key), value]); - } + // Get the DO state + const doState = this.#globalState.getDOState(doId); + const actorState = this.#globalState.getActorState(doState.ctx); - return entries; - } + // Actor not loaded, nothing to destroy + if (!actorState?.actorInstance) { + return; + } - // Helper to convert Uint8Array key to string for Cloudflare storage - #uint8ArrayToKey(key: Uint8Array): string { - // Check if this is a connection key (starts with [2]) - if (key.length > 0 && key[0] === 2) { - // Connection key - extract connId - const connId = new TextDecoder().decode(key.slice(1)); - return `${KEYS.CONN_PREFIX}${connId}`; + // Check if already destroying + if (actorState.destroying) { + return; } - // Otherwise, treat as persist data key [1] - return KEYS.PERSIST_DATA; + actorState.destroying = true; + + // Spawn onStop in background + this.#callOnStopAsync(actorId, doId, actorState.actorInstance); } - // Helper to convert string key back to Uint8Array - #keyToUint8Array(key: string): Uint8Array { - if (key.startsWith(KEYS.CONN_PREFIX)) { - // Connection key - const connId = key.slice(KEYS.CONN_PREFIX.length); - const encoder = new TextEncoder(); - const connIdBytes = encoder.encode(connId); - const result = new Uint8Array(1 + connIdBytes.length); - result[0] = 2; // Connection prefix - result.set(connIdBytes, 1); - return result; - } - // Persist data key - return Uint8Array.from([1]); + async #callOnStopAsync( + actorId: string, + doId: string, + actor: CoreAnyActorInstance, + ) { + // Stop + await actor.onStop("destroy"); + + // Remove state + const doState = this.#globalState.getDOState(doId); + const sql = doState.ctx.storage.sql; + sql.exec("UPDATE _rivetkit_metadata SET destroyed = 1 WHERE 1=1"); + sql.exec("DELETE FROM _rivetkit_kv_storage"); + + // Clear any scheduled alarms + await doState.ctx.storage.deleteAlarm(); + + // Delete from ACTOR_KV in the background - use full actorId including generation + const env = getCloudflareAmbientEnv(); + doState.ctx.waitUntil( + env.ACTOR_KV.delete(GLOBAL_KV_KEYS.actorMetadata(actorId)), + ); + + // Reset global state using the DO context + const actorHandle = this.#globalState.getActorState(doState.ctx); + actorHandle?.reset(); } } diff --git a/rivetkit-typescript/packages/cloudflare-workers/src/actor-handler-do.ts b/rivetkit-typescript/packages/cloudflare-workers/src/actor-handler-do.ts index 2b0479c19c..d2931bc738 100644 --- a/rivetkit-typescript/packages/cloudflare-workers/src/actor-handler-do.ts +++ b/rivetkit-typescript/packages/cloudflare-workers/src/actor-handler-do.ts @@ -3,55 +3,48 @@ import type { ExecutionContext } from "hono"; import invariant from "invariant"; import type { ActorKey, ActorRouter, Registry, RunConfig } from "rivetkit"; import { createActorRouter, createClientWithDriver } from "rivetkit"; -import type { ActorDriver } from "rivetkit/driver-helpers"; -import { - type ManagerDriver, - serializeEmptyPersistData, -} from "rivetkit/driver-helpers"; -import { promiseWithResolvers } from "rivetkit/utils"; +import type { ActorDriver, ManagerDriver } from "rivetkit/driver-helpers"; +import { getInitialActorKvState } from "rivetkit/driver-helpers"; +import { stringifyError } from "rivetkit/utils"; import { + ActorGlobalState, CloudflareDurableObjectGlobalState, createCloudflareActorsActorDriverBuilder, } from "./actor-driver"; +import { buildActorId, parseActorId } from "./actor-id"; +import { kvPut } from "./actor-kv"; +import { GLOBAL_KV_KEYS } from "./global-kv"; import type { Bindings } from "./handler"; +import { getCloudflareAmbientEnv } from "./handler"; import { logger } from "./log"; -export const KEYS = { - NAME: "rivetkit:name", - KEY: "rivetkit:key", - PERSIST_DATA: "rivetkit:data", - CONN_PREFIX: "rivetkit:conn:", -}; - -// Helper to create a connection key for Cloudflare -export function makeCloudflareConnKey(connId: string): string { - return `${KEYS.CONN_PREFIX}${connId}`; -} - export interface ActorHandlerInterface extends DurableObject { - initialize(req: ActorInitRequest): Promise; + create(req: ActorInitRequest): Promise; + getMetadata(): Promise< + | { + actorId: string; + name: string; + key: ActorKey; + destroying: boolean; + } + | undefined + >; } export interface ActorInitRequest { name: string; key: ActorKey; input?: unknown; + allowExisting: boolean; } - -interface InitializedData { - name: string; - key: ActorKey; -} +export type ActorInitResponse = + | { success: { actorId: string; created: boolean } } + | { error: { actorAlreadyExists: true } }; export type DurableObjectConstructor = new ( ...args: ConstructorParameters> ) => DurableObject; -interface LoadedActor { - actorRouter: ActorRouter; - actorDriver: ActorDriver; -} - export function createActorDurableObject( registry: Registry, rootRunConfig: RunConfig, @@ -71,50 +64,104 @@ export function createActorDurableObject( extends DurableObject implements ActorHandlerInterface { - #initialized?: InitializedData; - #initializedPromise?: ReturnType>; - - #actor?: LoadedActor; - - async #loadActor(): Promise { - // Wait for init - if (!this.#initialized) { - // Wait for init - if (this.#initializedPromise) { - await this.#initializedPromise.promise; - } else { - this.#initializedPromise = promiseWithResolvers(); - const res = await this.ctx.storage.get([ - KEYS.NAME, - KEYS.KEY, - KEYS.PERSIST_DATA, - ]); - if (res.get(KEYS.PERSIST_DATA)) { - const name = res.get(KEYS.NAME) as string; - if (!name) throw new Error("missing actor name"); - const key = res.get(KEYS.KEY) as ActorKey; - if (!key) throw new Error("missing actor key"); + /** + * This holds a strong reference to ActorGlobalState. + * CloudflareDurableObjectGlobalState holds a weak reference so we can + * access it elsewhere. + **/ + #state: ActorGlobalState; + + constructor( + ...args: ConstructorParameters> + ) { + super(...args); + + // Initialize SQL table for key-value storage + // + // We do this instead of using the native KV storage so we can store blob keys. The native CF KV API only supports string keys. + this.ctx.storage.sql.exec(` + CREATE TABLE IF NOT EXISTS _rivetkit_kv_storage( + key BLOB PRIMARY KEY, + value BLOB + ); + `); + + // Initialize SQL table for actor metadata + // + // id always equals 1 in order to ensure that there's always exactly 1 row in this table + this.ctx.storage.sql.exec(` + CREATE TABLE IF NOT EXISTS _rivetkit_metadata( + id INTEGER PRIMARY KEY CHECK (id = 1), + name TEXT NOT NULL, + key TEXT NOT NULL, + destroyed INTEGER DEFAULT 0, + generation INTEGER DEFAULT 0 + ); + `); + + // Get or create the actor state from the global WeakMap + const state = globalState.getActorState(this.ctx); + if (state) { + this.#state = state; + } else { + this.#state = new ActorGlobalState(); + globalState.setActorState(this.ctx, this.#state); + } + } + async #loadActor() { + invariant(this.#state, "State should be initialized"); + + // Check if initialized + if (!this.#state.initialized) { + // Query SQL for initialization data + const cursor = this.ctx.storage.sql.exec( + "SELECT name, key, destroyed, generation FROM _rivetkit_metadata WHERE id = 1", + ); + const result = cursor.raw().next(); + + if (!result.done && result.value) { + const name = result.value[0] as string; + const key = JSON.parse( + result.value[1] as string, + ) as ActorKey; + const destroyed = result.value[2] as number; + const generation = result.value[3] as number; + + // Only initialize if not destroyed + if (!destroyed) { logger().debug({ msg: "already initialized", name, key, + generation, }); - this.#initialized = { name, key }; - this.#initializedPromise.resolve(); + this.#state.initialized = { name, key, generation }; } else { - logger().debug("waiting to initialize"); + logger().debug("actor is destroyed, cannot load"); + throw new Error("Actor is destroyed"); } + } else { + logger().debug("not initialized"); + throw new Error("Actor is not initialized"); } } // Check if already loaded - if (this.#actor) { - return this.#actor; + if (this.#state.actor) { + // Assert that the cached actor has the correct generation + // This will catch any cases where #state.actor has a stale generation + invariant( + !this.#state.initialized || + this.#state.actor.generation === + this.#state.initialized.generation, + `Stale actor cached: actor generation ${this.#state.actor.generation} != initialized generation ${this.#state.initialized?.generation}. This should not happen.`, + ); + return this.#state.actor; } - if (!this.#initialized) throw new Error("Not initialized"); + if (!this.#state.initialized) throw new Error("Not initialized"); // Register DO with global state first // HACK: This leaks the DO context, but DO does not provide a native way @@ -155,51 +202,223 @@ export function createActorDurableObject( false, ); - // Save actor - this.#actor = { + // Save actor with generation + this.#state.actor = { actorRouter, actorDriver, + generation: this.#state.initialized.generation, }; + // Build actor ID with generation for loading + const actorIdWithGen = buildActorId( + actorId, + this.#state.initialized.generation, + ); + // Initialize the actor instance with proper metadata // This ensures the actor driver knows about this actor - await actorDriver.loadActor(actorId); + await actorDriver.loadActor(actorIdWithGen); - return this.#actor; + return this.#state.actor; } - /** RPC called by the service that creates the DO to initialize it. */ - async initialize(req: ActorInitRequest) { - // TODO: Need to add this to a core promise that needs to be resolved before start + /** RPC called to get actor metadata without creating it */ + async getMetadata(): Promise< + | { + actorId: string; + name: string; + key: ActorKey; + destroying: boolean; + } + | undefined + > { + // Query the metadata + const cursor = this.ctx.storage.sql.exec( + "SELECT name, key, destroyed, generation FROM _rivetkit_metadata WHERE id = 1", + ); + const result = cursor.raw().next(); + + if (!result.done && result.value) { + const name = result.value[0] as string; + const key = JSON.parse(result.value[1] as string) as ActorKey; + const destroyed = result.value[2] as number; + const generation = result.value[3] as number; + + // Check if destroyed + if (destroyed) { + logger().debug({ + msg: "getMetadata: actor is destroyed", + name, + key, + generation, + }); + return undefined; + } - await this.ctx.storage.put({ - [KEYS.NAME]: req.name, - [KEYS.KEY]: req.key, - [KEYS.PERSIST_DATA]: serializeEmptyPersistData(req.input), + // Build actor ID with generation + const doId = this.ctx.id.toString(); + const actorId = buildActorId(doId, generation); + const destroying = + globalState.getActorState(this.ctx)?.destroying ?? false; + + logger().debug({ + msg: "getMetadata: found actor metadata", + actorId, + name, + key, + generation, + destroying, + }); + + return { actorId, name, key, destroying }; + } + + logger().debug({ + msg: "getMetadata: no metadata found", }); - this.#initialized = { + return undefined; + } + + /** RPC called by the manager to create a DO. Can optionally allow existing actors. */ + async create(req: ActorInitRequest): Promise { + // Check if actor exists + const checkCursor = this.ctx.storage.sql.exec( + "SELECT destroyed, generation FROM _rivetkit_metadata WHERE id = 1", + ); + const checkResult = checkCursor.raw().next(); + + let created = false; + let generation = 0; + + if (!checkResult.done && checkResult.value) { + const destroyed = checkResult.value[0] as number; + generation = checkResult.value[1] as number; + + if (!destroyed) { + // Actor exists and is not destroyed + if (!req.allowExisting) { + // Fail if not allowing existing actors + logger().debug({ + msg: "create failed: actor already exists", + name: req.name, + key: req.key, + generation, + }); + return { error: { actorAlreadyExists: true } }; + } + + // Return existing actor + logger().debug({ + msg: "actor already exists", + key: req.key, + generation, + }); + const doId = this.ctx.id.toString(); + const actorId = buildActorId(doId, generation); + return { success: { actorId, created: false } }; + } + + // Actor exists but is destroyed - resurrect with incremented generation + generation = generation + 1; + created = true; + + // Clear stale actor from previous generation + // This is necessary because the DO instance may still be in memory + // with the old #state.actor field from before the destroy + if (this.#state) { + this.#state.actor = undefined; + } + + logger().debug({ + msg: "resurrecting destroyed actor", + key: req.key, + oldGeneration: generation - 1, + newGeneration: generation, + }); + } else { + // No actor exists - will create with generation 0 + generation = 0; + created = true; + logger().debug({ + msg: "creating new actor", + key: req.key, + generation, + }); + } + + // Perform upsert - either inserts new or updates destroyed actor + this.ctx.storage.sql.exec( + `INSERT INTO _rivetkit_metadata (id, name, key, destroyed, generation) + VALUES (1, ?, ?, 0, ?) + ON CONFLICT(id) DO UPDATE SET + name = excluded.name, + key = excluded.key, + destroyed = 0, + generation = excluded.generation`, + req.name, + JSON.stringify(req.key), + generation, + ); + + this.#state.initialized = { name: req.name, key: req.key, + generation, }; - logger().debug({ msg: "initialized actor", key: req.key }); + // Build actor ID with generation + const doId = this.ctx.id.toString(); + const actorId = buildActorId(doId, generation); + + // Initialize storage and update KV when created or resurrected + if (created) { + // Initialize persist data in KV storage + initializeActorKvStorage(this.ctx.storage.sql, req.input); + + // Update metadata in the background + const env = getCloudflareAmbientEnv(); + const actorData = { name: req.name, key: req.key, generation }; + this.ctx.waitUntil( + env.ACTOR_KV.put( + GLOBAL_KV_KEYS.actorMetadata(actorId), + JSON.stringify(actorData), + ), + ); + } - // Preemptively actor so the lifecycle hooks are called + // Preemptively load actor so the lifecycle hooks are called await this.#loadActor(); + + logger().debug({ + msg: created + ? "actor created/resurrected" + : "returning existing actor", + actorId, + created, + generation, + }); + + return { success: { actorId, created } }; } async fetch(request: Request): Promise { - const { actorRouter } = await this.#loadActor(); + const { actorRouter, generation } = await this.#loadActor(); + + // Build actor ID with generation + const doId = this.ctx.id.toString(); + const actorId = buildActorId(doId, generation); - const actorId = this.ctx.id.toString(); return await actorRouter.fetch(request, { actorId, }); } async alarm(): Promise { - const { actorDriver } = await this.#loadActor(); - const actorId = this.ctx.id.toString(); + const { actorDriver, generation } = await this.#loadActor(); + + // Build actor ID with generation + const doId = this.ctx.id.toString(); + const actorId = buildActorId(doId, generation); // Load the actor instance and trigger alarm const actor = await actorDriver.loadActor(actorId); @@ -207,3 +426,13 @@ export function createActorDurableObject( } }; } + +function initializeActorKvStorage( + sql: SqlStorage, + input: unknown | undefined, +): void { + const initialKvState = getInitialActorKvState(input); + for (const [key, value] of initialKvState) { + kvPut(sql, key, value); + } +} diff --git a/rivetkit-typescript/packages/cloudflare-workers/src/actor-id.ts b/rivetkit-typescript/packages/cloudflare-workers/src/actor-id.ts new file mode 100644 index 0000000000..a4e850f48f --- /dev/null +++ b/rivetkit-typescript/packages/cloudflare-workers/src/actor-id.ts @@ -0,0 +1,38 @@ +/** + * Actor ID utilities for managing actor IDs with generation tracking. + * + * Actor IDs are formatted as: `{doId}:{generation}` + * This allows tracking actor resurrection and preventing stale references. + */ + +/** + * Build an actor ID from a Durable Object ID and generation number. + * @param doId The Durable Object ID + * @param generation The generation number (increments on resurrection) + * @returns The formatted actor ID + */ +export function buildActorId(doId: string, generation: number): string { + return `${doId}:${generation}`; +} + +/** + * Parse an actor ID into its components. + * @param actorId The actor ID to parse + * @returns A tuple of [doId, generation] + * @throws Error if the actor ID format is invalid + */ +export function parseActorId(actorId: string): [string, number] { + const parts = actorId.split(":"); + if (parts.length !== 2) { + throw new Error(`Invalid actor ID format: ${actorId}`); + } + + const [doId, generationStr] = parts; + const generation = parseInt(generationStr, 10); + + if (Number.isNaN(generation)) { + throw new Error(`Invalid generation number in actor ID: ${actorId}`); + } + + return [doId, generation]; +} diff --git a/rivetkit-typescript/packages/cloudflare-workers/src/actor-kv.ts b/rivetkit-typescript/packages/cloudflare-workers/src/actor-kv.ts new file mode 100644 index 0000000000..2891d823c5 --- /dev/null +++ b/rivetkit-typescript/packages/cloudflare-workers/src/actor-kv.ts @@ -0,0 +1,71 @@ +export function kvGet(sql: SqlStorage, key: Uint8Array): Uint8Array | null { + const cursor = sql.exec( + "SELECT value FROM _rivetkit_kv_storage WHERE key = ?", + key, + ); + const result = cursor.raw().next(); + + if (!result.done && result.value) { + return toUint8Array(result.value[0]); + } + return null; +} + +export function kvPut( + sql: SqlStorage, + key: Uint8Array, + value: Uint8Array, +): void { + sql.exec( + "INSERT OR REPLACE INTO _rivetkit_kv_storage (key, value) VALUES (?, ?)", + key, + value, + ); +} + +export function kvDelete(sql: SqlStorage, key: Uint8Array): void { + sql.exec("DELETE FROM _rivetkit_kv_storage WHERE key = ?", key); +} + +export function kvListPrefix( + sql: SqlStorage, + prefix: Uint8Array, +): [Uint8Array, Uint8Array][] { + const cursor = sql.exec("SELECT key, value FROM _rivetkit_kv_storage"); + const entries: [Uint8Array, Uint8Array][] = []; + + for (const row of cursor.raw()) { + const key = toUint8Array(row[0]); + const value = toUint8Array(row[1]); + + // Check if key starts with prefix + if (hasPrefix(key, prefix)) { + entries.push([key, value]); + } + } + + return entries; +} + +// Helper function to convert SqlStorageValue to Uint8Array +function toUint8Array( + value: string | number | ArrayBuffer | Uint8Array | null, +): Uint8Array { + if (value instanceof Uint8Array) { + return value; + } + if (value instanceof ArrayBuffer) { + return new Uint8Array(value); + } + throw new Error( + `Unexpected SQL value type: ${typeof value} (${value?.constructor?.name})`, + ); +} + +function hasPrefix(arr: Uint8Array, prefix: Uint8Array): boolean { + if (prefix.length > arr.length) return false; + for (let i = 0; i < prefix.length; i++) { + if (arr[i] !== prefix[i]) return false; + } + return true; +} diff --git a/rivetkit-typescript/packages/cloudflare-workers/src/global-kv.ts b/rivetkit-typescript/packages/cloudflare-workers/src/global-kv.ts new file mode 100644 index 0000000000..e9c205a2d4 --- /dev/null +++ b/rivetkit-typescript/packages/cloudflare-workers/src/global-kv.ts @@ -0,0 +1,6 @@ +/** KV keys for using Workers KV to store actor metadata globally. */ +export const GLOBAL_KV_KEYS = { + actorMetadata: (actorId: string): string => { + return `actor:${actorId}:metadata`; + }, +}; diff --git a/rivetkit-typescript/packages/cloudflare-workers/src/handler.ts b/rivetkit-typescript/packages/cloudflare-workers/src/handler.ts index 750ec91559..2d186564ce 100644 --- a/rivetkit-typescript/packages/cloudflare-workers/src/handler.ts +++ b/rivetkit-typescript/packages/cloudflare-workers/src/handler.ts @@ -57,11 +57,12 @@ export function createHandler>( const ActorHandler = createActorDurableObject(registry, runConfig); // Create server - const serverOutput = registry.start(runConfig); + const serverOutputPromise = registry.start(runConfig); // Create Cloudflare handler const handler = { - fetch: (request, cfEnv, ctx) => { + fetch: async (request, cfEnv, ctx) => { + const serverOutput = await serverOutputPromise; const url = new URL(request.url); // Inject Rivet env diff --git a/rivetkit-typescript/packages/cloudflare-workers/src/manager-driver.ts b/rivetkit-typescript/packages/cloudflare-workers/src/manager-driver.ts index cfdf78ebbe..8e3023983a 100644 --- a/rivetkit-typescript/packages/cloudflare-workers/src/manager-driver.ts +++ b/rivetkit-typescript/packages/cloudflare-workers/src/manager-driver.ts @@ -16,30 +16,17 @@ import { WS_PROTOCOL_STANDARD, WS_PROTOCOL_TARGET, } from "rivetkit/driver-helpers"; -import { ActorAlreadyExists, InternalError } from "rivetkit/errors"; +import { + ActorAlreadyExists, + ActorDestroying, + InternalError, +} from "rivetkit/errors"; +import { assertUnreachable } from "rivetkit/utils"; +import { parseActorId } from "./actor-id"; import { getCloudflareAmbientEnv } from "./handler"; import { logger } from "./log"; import type { Bindings } from "./mod"; -import { serializeKey, serializeNameAndKey } from "./util"; - -// Actor metadata structure -interface ActorData { - name: string; - key: string[]; -} - -const KEYS = { - ACTOR: { - // Combined key for actor metadata (name and key) - metadata: (actorId: string) => `actor:${actorId}:metadata`, - - // Key index function for actor lookup - keyIndex: (name: string, key: string[] = []) => { - // Use serializeKey for consistent handling of all keys - return `actor_key:${serializeKey(key)}`; - }, - }, -}; +import { serializeNameAndKey } from "./util"; const STANDARD_WEBSOCKET_HEADERS = [ "connection", @@ -57,14 +44,18 @@ export class CloudflareActorsManagerDriver implements ManagerDriver { ): Promise { const env = getCloudflareAmbientEnv(); + // Parse actor ID to get DO ID + const [doId] = parseActorId(actorId); + logger().debug({ msg: "sending request to durable object", actorId, + doId, method: actorRequest.method, url: actorRequest.url, }); - const id = env.ACTOR_DO.idFromString(actorId); + const id = env.ACTOR_DO.idFromString(doId); const stub = env.ACTOR_DO.get(id); return await stub.fetch(actorRequest); @@ -78,14 +69,18 @@ export class CloudflareActorsManagerDriver implements ManagerDriver { ): Promise { const env = getCloudflareAmbientEnv(); + // Parse actor ID to get DO ID + const [doId] = parseActorId(actorId); + logger().debug({ msg: "opening websocket to durable object", actorId, + doId, path, }); // Make a fetch request to the Durable Object with WebSocket upgrade - const id = env.ACTOR_DO.idFromString(actorId); + const id = env.ACTOR_DO.idFromString(doId); const stub = env.ACTOR_DO.get(id); const protocols: string[] = []; @@ -146,14 +141,18 @@ export class CloudflareActorsManagerDriver implements ManagerDriver { actorRequest: Request, actorId: string, ): Promise { + // Parse actor ID to get DO ID + const [doId] = parseActorId(actorId); + logger().debug({ msg: "forwarding request to durable object", actorId, + doId, method: actorRequest.method, url: actorRequest.url, }); - const id = c.env.ACTOR_DO.idFromString(actorId); + const id = c.env.ACTOR_DO.idFromString(doId); const stub = c.env.ACTOR_DO.get(id); return await stub.fetch(actorRequest); @@ -218,7 +217,9 @@ export class CloudflareActorsManagerDriver implements ManagerDriver { protocols.join(", "), ); - const id = c.env.ACTOR_DO.idFromString(actorId); + // Parse actor ID to get DO ID + const [doId] = parseActorId(actorId); + const id = c.env.ACTOR_DO.idFromString(doId); const stub = c.env.ACTOR_DO.get(id); return await stub.fetch(actorRequest); @@ -232,23 +233,42 @@ export class CloudflareActorsManagerDriver implements ManagerDriver { > { const env = getCloudflareAmbientEnv(); - // Get actor metadata from KV (combined name and key) - const actorData = (await env.ACTOR_KV.get( - KEYS.ACTOR.metadata(actorId), - { - type: "json", - }, - )) as ActorData | null; + // Parse actor ID to get DO ID and expected generation + const [doId, expectedGeneration] = parseActorId(actorId); + + // Get the Durable Object stub + const id = env.ACTOR_DO.idFromString(doId); + const stub = env.ACTOR_DO.get(id); + + // Call the DO's getMetadata method + const result = await stub.getMetadata(); - // If the actor doesn't exist, return undefined - if (!actorData) { + if (!result) { + logger().debug({ + msg: "getForId: actor not found", + actorId, + }); + return undefined; + } + + // Check if the actor IDs match in order to check if the generation matches + if (result.actorId !== actorId) { + logger().debug({ + msg: "getForId: generation mismatch", + requestedActorId: actorId, + actualActorId: result.actorId, + }); return undefined; } + if (result.destroying) { + throw new ActorDestroying(actorId); + } + return { - actorId, - name: actorData.name, - key: actorData.key, + actorId: result.actorId, + name: result.name, + key: result.key, }; } @@ -264,43 +284,79 @@ export class CloudflareActorsManagerDriver implements ManagerDriver { logger().debug({ msg: "getWithKey: searching for actor", name, key }); // Generate deterministic ID from the name and key - // This is aligned with how createActor generates IDs const nameKeyString = serializeNameAndKey(name, key); - const actorId = env.ACTOR_DO.idFromName(nameKeyString).toString(); + const doId = env.ACTOR_DO.idFromName(nameKeyString).toString(); - // Check if the actor metadata exists - const actorData = await env.ACTOR_KV.get(KEYS.ACTOR.metadata(actorId), { - type: "json", - }); + // Try to get the Durable Object to see if it exists + const id = env.ACTOR_DO.idFromString(doId); + const stub = env.ACTOR_DO.get(id); - if (!actorData) { + // Check if actor exists without creating it + const result = await stub.getMetadata(); + + if (result) { + logger().debug({ + msg: "getWithKey: found actor with matching name and key", + actorId: result.actorId, + name: result.name, + key: result.key, + }); + return { + actorId: result.actorId, + name: result.name, + key: result.key, + }; + } else { logger().debug({ msg: "getWithKey: no actor found with matching name and key", name, key, - actorId, + doId, }); return undefined; } + } - logger().debug({ - msg: "getWithKey: found actor with matching name and key", - actorId, + async getOrCreateWithKey({ + c, + name, + key, + input, + }: GetOrCreateWithKeyInput<{ Bindings: Bindings }>): Promise { + const env = getCloudflareAmbientEnv(); + + // Create a deterministic ID from the actor name and key + // This ensures that actors with the same name and key will have the same ID + const nameKeyString = serializeNameAndKey(name, key); + const doId = env.ACTOR_DO.idFromName(nameKeyString); + + // Get or create actor using the Durable Object's method + const actor = env.ACTOR_DO.get(doId); + const result = await actor.create({ name, key, + input, + allowExisting: true, }); - return this.#buildActorOutput(c, actorId); - } + if ("success" in result) { + const { actorId, created } = result.success; + logger().debug({ + msg: "getOrCreateWithKey result", + actorId, + name, + key, + created, + }); - async getOrCreateWithKey( - input: GetOrCreateWithKeyInput, - ): Promise { - // TODO: Prevent race condition here - const getOutput = await this.getWithKey(input); - if (getOutput) { - return getOutput; + return { + actorId, + name, + key, + }; + } else if ("error" in result) { + throw new Error(`Error: ${JSON.stringify(result.error)}`); } else { - return await this.createActor(input); + assertUnreachable(result); } } @@ -312,41 +368,38 @@ export class CloudflareActorsManagerDriver implements ManagerDriver { }: CreateInput<{ Bindings: Bindings }>): Promise { const env = getCloudflareAmbientEnv(); - // Check if actor with the same name and key already exists - const existingActor = await this.getWithKey({ c, name, key }); - if (existingActor) { - throw new ActorAlreadyExists(name, key); - } - // Create a deterministic ID from the actor name and key // This ensures that actors with the same name and key will have the same ID const nameKeyString = serializeNameAndKey(name, key); const doId = env.ACTOR_DO.idFromName(nameKeyString); - const actorId = doId.toString(); - // Init actor + // Create actor - this will fail if it already exists const actor = env.ACTOR_DO.get(doId); - await actor.initialize({ + const result = await actor.create({ name, key, input, + allowExisting: false, }); - // Store combined actor metadata (name and key) - const actorData: ActorData = { name, key }; - await env.ACTOR_KV.put( - KEYS.ACTOR.metadata(actorId), - JSON.stringify(actorData), - ); - - // Add to key index for lookups by name and key - await env.ACTOR_KV.put(KEYS.ACTOR.keyIndex(name, key), actorId); + if ("success" in result) { + const { actorId } = result.success; + return { + actorId, + name, + key, + }; + } else if ("error" in result) { + if (result.error.actorAlreadyExists) { + throw new ActorAlreadyExists(name, key); + } - return { - actorId, - name, - key, - }; + throw new InternalError( + `Unknown error creating actor: ${JSON.stringify(result.error)}`, + ); + } else { + assertUnreachable(result); + } } async listActors({ c, name }: ListActorsInput): Promise { @@ -357,31 +410,6 @@ export class CloudflareActorsManagerDriver implements ManagerDriver { return []; } - // Helper method to build actor output from an ID - async #buildActorOutput( - c: any, - actorId: string, - ): Promise { - const env = getCloudflareAmbientEnv(); - - const actorData = (await env.ACTOR_KV.get( - KEYS.ACTOR.metadata(actorId), - { - type: "json", - }, - )) as ActorData | null; - - if (!actorData) { - return undefined; - } - - return { - actorId, - name: actorData.name, - key: actorData.key, - }; - } - displayInformation(): ManagerDisplayInformation { return { name: "Cloudflare Workers", diff --git a/rivetkit-typescript/packages/cloudflare-workers/tests/driver-tests.test.ts b/rivetkit-typescript/packages/cloudflare-workers/tests/driver-tests.test.ts index 11012dca63..106baacbdd 100644 --- a/rivetkit-typescript/packages/cloudflare-workers/tests/driver-tests.test.ts +++ b/rivetkit-typescript/packages/cloudflare-workers/tests/driver-tests.test.ts @@ -180,7 +180,7 @@ async function setupProject(projectPath: string) { ], migrations: [ { - new_classes: ["ActorHandler"], + new_sqlite_classes: ["ActorHandler"], tag: "v1", }, ], diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/destroy.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/destroy.ts new file mode 100644 index 0000000000..141160a015 --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/destroy.ts @@ -0,0 +1,43 @@ +import { actor } from "rivetkit"; +import type { registry } from "./registry"; + +export const destroyObserver = actor({ + state: { destroyedActors: [] as string[] }, + actions: { + notifyDestroyed: (c, actorKey: string) => { + c.state.destroyedActors.push(actorKey); + }, + wasDestroyed: (c, actorKey: string) => { + return c.state.destroyedActors.includes(actorKey); + }, + reset: (c) => { + c.state.destroyedActors = []; + }, + }, +}); + +export const destroyActor = actor({ + state: { value: 0, key: "" }, + onWake: (c) => { + // Store the actor key so we can reference it in onDestroy + c.state.key = c.key.join("/"); + }, + onDestroy: async (c) => { + const client = c.client(); + const observer = client.destroyObserver.getOrCreate(["observer"]); + await observer.notifyDestroyed(c.state.key); + }, + actions: { + setValue: async (c, newValue: number) => { + c.state.value = newValue; + await c.saveState({ immediate: true }); + return c.state.value; + }, + getValue: (c) => { + return c.state.value; + }, + destroy: (c) => { + c.destroy(); + }, + }, +}); diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry.ts index 154decc40b..87132bf166 100644 --- a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry.ts +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry.ts @@ -18,6 +18,7 @@ import { connStateActor } from "./conn-state"; // Import actors from individual files import { counter } from "./counter"; import { counterConn } from "./counter-conn"; +import { destroyActor, destroyObserver } from "./destroy"; import { customTimeoutActor, errorHandlingActor } from "./error-handling"; import { inlineClientActor } from "./inline-client"; import { counterWithLifecycle } from "./lifecycle"; @@ -106,5 +107,8 @@ export const registry = setup({ requestAccessActor, // From actor-onstatechange.ts onStateChangeActor, + // From destroy.ts + destroyActor, + destroyObserver, }, }); diff --git a/rivetkit-typescript/packages/rivetkit/scripts/dump-openapi.ts b/rivetkit-typescript/packages/rivetkit/scripts/dump-openapi.ts index 005fc4e210..4595527475 100644 --- a/rivetkit-typescript/packages/rivetkit/scripts/dump-openapi.ts +++ b/rivetkit-typescript/packages/rivetkit/scripts/dump-openapi.ts @@ -2,7 +2,11 @@ import * as fs from "node:fs/promises"; import { resolve } from "node:path"; import { ClientConfigSchema } from "@/client/config"; import { createFileSystemOrMemoryDriver } from "@/drivers/file-system/mod"; -import type { ManagerDriver } from "@/manager/driver"; +import type { + ActorOutput, + ListActorsInput, + ManagerDriver, +} from "@/manager/driver"; import { createManagerRouter } from "@/manager/router"; import { createClientWithDriver, @@ -32,13 +36,13 @@ async function main() { getWithKey: unimplemented, getOrCreateWithKey: unimplemented, createActor: unimplemented, + listActors: unimplemented, sendRequest: unimplemented, openWebSocket: unimplemented, proxyRequest: unimplemented, proxyWebSocket: unimplemented, displayInformation: unimplemented, getOrCreateInspectorAccessToken: unimplemented, - listActors: unimplemented, }; const client = createClientWithDriver( diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/config.ts b/rivetkit-typescript/packages/rivetkit/src/actor/config.ts index 332ceba3cc..a8df7b4057 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/config.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/config.ts @@ -43,6 +43,7 @@ export interface ActorTypes< export const ActorConfigSchema = z .object({ onCreate: z.function().optional(), + onDestroy: z.function().optional(), onWake: z.function().optional(), onSleep: z.function().optional(), onStateChange: z.function().optional(), @@ -66,7 +67,8 @@ export const ActorConfigSchema = z createConnStateTimeout: z.number().positive().default(5000), onConnectTimeout: z.number().positive().default(5000), // This must be less than ACTOR_STOP_THRESHOLD_MS - onStopTimeout: z.number().positive().default(5000), + onSleepTimeout: z.number().positive().default(5000), + onDestroyTimeout: z.number().positive().default(5000), stateSaveInterval: z.number().positive().default(10_000), actionTimeout: z.number().positive().default(60_000), // Max time to wait for waitUntil background promises during shutdown @@ -242,6 +244,20 @@ interface BaseActorConfig< input: TInput, ) => void | Promise; + /** + * Called when the actor is destroyed. + */ + onDestroy?: ( + c: ActorContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase + >, + ) => void | Promise; + /** * Called when the actor is started and ready to receive connections and action. * @@ -459,6 +475,7 @@ export type ActorConfig< z.infer, | "actions" | "onCreate" + | "onDestroy" | "onWake" | "onStateChange" | "onBeforeConnect" @@ -518,6 +535,7 @@ export type ActorConfigInput< z.input, | "actions" | "onCreate" + | "onDestroy" | "onWake" | "onSleep" | "onStateChange" diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/contexts/actor.ts b/rivetkit-typescript/packages/rivetkit/src/actor/contexts/actor.ts index dde24c45bd..442df7f952 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/contexts/actor.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/contexts/actor.ts @@ -165,4 +165,15 @@ export class ActorContext< sleep() { this.#actor.startSleep(); } + + /** + * Forces the actor to destroy. + * + * This will return immediately, then call `onStop` and `onDestroy`. + * + * @experimental + */ + destroy() { + this.#actor.startDestroy(); + } } diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/driver.ts b/rivetkit-typescript/packages/rivetkit/src/actor/driver.ts index 58ea192c7e..c2fa43726b 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/driver.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/driver.ts @@ -55,10 +55,17 @@ export interface ActorDriver { /** * Requests the actor to go to sleep. * - * This will call `_stop` independently. + * This will call `ActorInstance.onStop` independently. */ startSleep?(actorId: string): void; + /** + * Destroys the actor and its associated data. + * + * This will call `ActorInstance.onStop` independently. + */ + startDestroy(actorId: string): void; + /** * Shuts down the actor runner. */ diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/errors.ts b/rivetkit-typescript/packages/rivetkit/src/actor/errors.ts index 8b7b9bfd05..826448bf3a 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/errors.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/errors.ts @@ -250,6 +250,17 @@ export class ActorAlreadyExists extends ActorError { } } +export class ActorDestroying extends ActorError { + constructor(identifier?: string) { + super( + "actor", + "destroying", + identifier ? `Actor destroying: ${identifier}` : "Actor destroying", + { public: true }, + ); + } +} + export class ProxyError extends ActorError { constructor(operation: string, error?: unknown) { super( diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts b/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts index 76be9d2bfd..5f825075d0 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts @@ -34,7 +34,12 @@ import { serializeActorKey } from "../keys"; import { processMessage } from "../protocol/old"; import { CachedSerializer } from "../protocol/serde"; import { Schedule } from "../schedule"; -import { DeadlineError, deadline, generateSecureToken } from "../utils"; +import { + assertUnreachable, + DeadlineError, + deadline, + generateSecureToken, +} from "../utils"; import { ConnectionManager } from "./connection-manager"; import { EventManager } from "./event-manager"; import { KEYS } from "./kv"; @@ -97,6 +102,7 @@ export class ActorInstance { // MARK: - Lifecycle State #ready = false; #sleepCalled = false; + #destroyCalled = false; #stopCalled = false; #sleepTimeout?: NodeJS.Timeout; #abortController = new AbortController(); @@ -369,7 +375,7 @@ export class ActorInstance { } // MARK: - Stop - async onStop() { + async onStop(mode: "sleep" | "destroy") { if (this.#stopCalled) { this.#rLog.warn({ msg: "already stopping actor" }); return; @@ -389,7 +395,13 @@ export class ActorInstance { } catch {} // Call onStop lifecycle - await this.#callOnStop(); + if (mode === "sleep") { + await this.#callOnSleep(); + } else if (mode === "destroy") { + await this.#callOnDestroy(); + } else { + assertUnreachable(mode); + } // Disconnect non-hibernatable connections await this.#disconnectConnections(); @@ -410,7 +422,7 @@ export class ActorInstance { // MARK: - Sleep startSleep() { - if (this.#stopCalled) { + if (this.#stopCalled || this.#destroyCalled) { this.#rLog.debug({ msg: "cannot call startSleep if actor already stopping", }); @@ -431,11 +443,42 @@ export class ActorInstance { this.#rLog.info({ msg: "actor sleeping" }); + // Start sleep on next tick so call site of startSleep can exit setImmediate(() => { sleep(); }); } + // MARK: - Destroy + startDestroy() { + if (this.#stopCalled || this.#sleepCalled) { + this.#rLog.debug({ + msg: "cannot call startDestroy if actor already stopping or sleeping", + }); + return; + } + + if (this.#destroyCalled) { + this.#rLog.warn({ + msg: "cannot call startDestroy twice, actor already destroying", + }); + return; + } + this.#destroyCalled = true; + + const destroy = this.driver.startDestroy.bind( + this.driver, + this.#actorId, + ); + + this.#rLog.info({ msg: "actor destroying" }); + + // Start destroy on next tick so call site of startDestroy can exit + setImmediate(() => { + destroy(); + }); + } + // MARK: - HTTP Request Tracking beginHonoHttpRequest() { this.#activeHonoHttpRequests++; @@ -855,21 +898,46 @@ export class ActorInstance { } } - async #callOnStop() { + async #callOnSleep() { if (this.#config.onSleep) { try { - this.#rLog.debug({ msg: "calling onStop" }); + this.#rLog.debug({ msg: "calling onSleep" }); const result = this.#config.onSleep(this.actorContext); if (result instanceof Promise) { - await deadline(result, this.#config.options.onStopTimeout); + await deadline(result, this.#config.options.onSleepTimeout); + } + this.#rLog.debug({ msg: "onSleep completed" }); + } catch (error) { + if (error instanceof DeadlineError) { + this.#rLog.error({ msg: "onSleep timed out" }); + } else { + this.#rLog.error({ + msg: "error in onSleep", + error: stringifyError(error), + }); + } + } + } + } + + async #callOnDestroy() { + if (this.#config.onDestroy) { + try { + this.#rLog.debug({ msg: "calling onDestroy" }); + const result = this.#config.onDestroy(this.actorContext); + if (result instanceof Promise) { + await deadline( + result, + this.#config.options.onDestroyTimeout, + ); } - this.#rLog.debug({ msg: "onStop completed" }); + this.#rLog.debug({ msg: "onDestroy completed" }); } catch (error) { if (error instanceof DeadlineError) { - this.#rLog.error({ msg: "onStop timed out" }); + this.#rLog.error({ msg: "onDestroy timed out" }); } else { this.#rLog.error({ - msg: "error in onStop", + msg: "error in onDestroy", error: stringifyError(error), }); } diff --git a/rivetkit-typescript/packages/rivetkit/src/driver-helpers/mod.ts b/rivetkit-typescript/packages/rivetkit/src/driver-helpers/mod.ts index d6ad658acc..6c618c1fc1 100644 --- a/rivetkit-typescript/packages/rivetkit/src/driver-helpers/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/driver-helpers/mod.ts @@ -32,4 +32,4 @@ export { DriverConfigSchema, RunnerConfigSchema as RunConfigSchema, } from "@/registry/run-config"; -export { serializeEmptyPersistData } from "./utils"; +export { getInitialActorKvState } from "./utils"; diff --git a/rivetkit-typescript/packages/rivetkit/src/driver-helpers/utils.ts b/rivetkit-typescript/packages/rivetkit/src/driver-helpers/utils.ts index 0936f816b2..de71ea202e 100644 --- a/rivetkit-typescript/packages/rivetkit/src/driver-helpers/utils.ts +++ b/rivetkit-typescript/packages/rivetkit/src/driver-helpers/utils.ts @@ -1,11 +1,11 @@ import * as cbor from "cbor-x"; +import { KEYS } from "@/actor/instance/kv"; import type * as persistSchema from "@/schemas/actor-persist/mod"; import { ACTOR_VERSIONED } from "@/schemas/actor-persist/versioned"; import { bufferToArrayBuffer } from "@/utils"; +import type { ActorDriver } from "./mod"; -export function serializeEmptyPersistData( - input: unknown | undefined, -): Uint8Array { +function serializeEmptyPersistData(input: unknown | undefined): Uint8Array { const persistData: persistSchema.Actor = { input: input !== undefined @@ -18,3 +18,14 @@ export function serializeEmptyPersistData( }; return ACTOR_VERSIONED.serializeWithEmbeddedVersion(persistData); } + +/** + * Returns the initial KV state for a new actor. This is ued by the drivers to + * write the initial state in to KV storage before starting the actor. + */ +export function getInitialActorKvState( + input: unknown | undefined, +): [Uint8Array, Uint8Array][] { + const persistData = serializeEmptyPersistData(input); + return [[KEYS.PERSIST_DATA, persistData]]; +} diff --git a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/mod.ts b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/mod.ts index 57f18acc28..09362aa171 100644 --- a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/mod.ts @@ -19,6 +19,7 @@ import { logger } from "./log"; import { runActionFeaturesTests } from "./tests/action-features"; import { runActorConnTests } from "./tests/actor-conn"; import { runActorConnStateTests } from "./tests/actor-conn-state"; +import { runActorDestroyTests } from "./tests/actor-destroy"; import { runActorDriverTests } from "./tests/actor-driver"; import { runActorErrorHandlingTests } from "./tests/actor-error-handling"; import { runActorHandleTests } from "./tests/actor-handle"; @@ -103,6 +104,8 @@ export function runDriverTests( runActorConnStateTests(driverTestConfig); + runActorDestroyTests(driverTestConfig); + runRequestAccessTests(driverTestConfig); runActorHandleTests(driverTestConfig); diff --git a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-destroy.ts b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-destroy.ts new file mode 100644 index 0000000000..0d944a4504 --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-destroy.ts @@ -0,0 +1,186 @@ +import { describe, expect, test, vi } from "vitest"; +import type { ActorError } from "@/client/mod"; +import type { DriverTestConfig } from "../mod"; +import { setupDriverTest } from "../utils"; + +export function runActorDestroyTests(driverTestConfig: DriverTestConfig) { + describe("Actor Destroy Tests", () => { + test("actor destroy clears state (without connect)", async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + + const actorKey = "test-destroy-without-connect"; + + // Get destroy observer + const observer = client.destroyObserver.getOrCreate(["observer"]); + await observer.reset(); + + // Create actor + const destroyActor = client.destroyActor.getOrCreate([actorKey]); + + // Update state and save immediately + await destroyActor.setValue(42); + + // Verify state was saved + const value = await destroyActor.getValue(); + expect(value).toBe(42); + + // Get actor ID before destroying + const actorId = await destroyActor.resolve(); + + // Destroy the actor + await destroyActor.destroy(); + + // Wait until the observer confirms the actor was destroyed + await vi.waitFor(async () => { + const wasDestroyed = await observer.wasDestroyed(actorKey); + expect(wasDestroyed, "actor onDestroy not called").toBeTruthy(); + }); + + // Wait until the actor is fully cleaned up (getForId returns error) + await vi.waitFor(async () => { + let actorRunning = false; + try { + await client.destroyActor.getForId(actorId).getValue(); + actorRunning = true; + } catch (err) { + expect((err as ActorError).group).toBe("actor"); + expect((err as ActorError).code).toBe("not_found"); + } + + expect(actorRunning, "actor still running").toBeFalsy(); + }); + + // Verify actor no longer exists via getForId + let existsById = false; + try { + await client.destroyActor.getForId(actorId).getValue(); + existsById = true; + } catch (err) { + expect((err as ActorError).group).toBe("actor"); + expect((err as ActorError).code).toBe("not_found"); + } + expect( + existsById, + "actor should not exist after destroy", + ).toBeFalsy(); + + // Verify actor no longer exists via get + let existsByKey = false; + try { + await client.destroyActor + .get(["test-destroy-without-connect"]) + .resolve(); + existsByKey = true; + } catch (err) { + expect((err as ActorError).group).toBe("actor"); + expect((err as ActorError).code).toBe("not_found"); + } + expect( + existsByKey, + "actor should not exist after destroy", + ).toBeFalsy(); + + // Create new actor with same key using getOrCreate + const newActor = client.destroyActor.getOrCreate([ + "test-destroy-without-connect", + ]); + + // Verify state is fresh (default value) + const newValue = await newActor.getValue(); + expect(newValue).toBe(0); + }); + + test("actor destroy clears state (with connect)", async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + + const actorKey = "test-destroy-with-connect"; + + // Get destroy observer + const observer = client.destroyObserver.getOrCreate(["observer"]); + await observer.reset(); + + // Create actor handle + const destroyActorHandle = client.destroyActor.getOrCreate([ + actorKey, + ]); + + // Get actor ID before destroying + const actorId = await destroyActorHandle.resolve(); + + // Create persistent connection + const destroyActor = destroyActorHandle.connect(); + + // Update state and save immediately + await destroyActor.setValue(99); + + // Verify state was saved + const value = await destroyActor.getValue(); + expect(value).toBe(99); + + // Destroy the actor + await destroyActor.destroy(); + + // Dispose the connection + await destroyActor.dispose(); + + // Wait until the observer confirms the actor was destroyed + await vi.waitFor(async () => { + const wasDestroyed = await observer.wasDestroyed(actorKey); + expect(wasDestroyed, "actor onDestroy not called").toBeTruthy(); + }); + + // Wait until the actor is fully cleaned up (getForId returns error) + await vi.waitFor(async () => { + let actorRunning = false; + try { + await client.destroyActor.getForId(actorId).getValue(); + actorRunning = true; + } catch (err) { + expect((err as ActorError).group).toBe("actor"); + expect((err as ActorError).code).toBe("not_found"); + } + + expect(actorRunning, "actor still running").toBeFalsy(); + }); + + // Verify actor no longer exists via getForId + let existsById = false; + try { + await client.destroyActor.getForId(actorId).getValue(); + existsById = true; + } catch (err) { + expect((err as ActorError).group).toBe("actor"); + expect((err as ActorError).code).toBe("not_found"); + } + expect( + existsById, + "actor should not exist after destroy", + ).toBeFalsy(); + + // Verify actor no longer exists via get + let existsByKey = false; + try { + await client.destroyActor + .get(["test-destroy-with-connect"]) + .resolve(); + existsByKey = true; + } catch (err) { + expect((err as ActorError).group).toBe("actor"); + expect((err as ActorError).code).toBe("not_found"); + } + expect( + existsByKey, + "actor should not exist after destroy", + ).toBeFalsy(); + + // Create new actor with same key using getOrCreate + const newActor = client.destroyActor.getOrCreate([ + "test-destroy-with-connect", + ]); + + // Verify state is fresh (default value) + const newValue = await newActor.getValue(); + expect(newValue).toBe(0); + }); + }); +} diff --git a/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts b/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts index 320487c83c..f97148e901 100644 --- a/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts +++ b/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts @@ -33,8 +33,8 @@ import type { import { type ActorDriver, type AnyActorInstance, + getInitialActorKvState, type ManagerDriver, - serializeEmptyPersistData, } from "@/driver-helpers/mod"; import { buildActorNames, type RegistryConfig } from "@/registry/config"; import type { RunnerConfig } from "@/registry/run-config"; @@ -73,6 +73,11 @@ export class EngineActorDriver implements ActorDriver { #runnerStopped: PromiseWithResolvers = promiseWithResolvers(); #isRunnerStopped: boolean = false; + // HACK: Track actor stop intent locally since the runner protocol doesn't + // pass the stop reason to onActorStop. This will be fixed when the runner + // protocol is updated to send the intent directly (see RVT-5284) + #actorStopIntent: Map = new Map(); + // WebSocket message acknowledgment debouncing for hibernatable websockets #hibernatableWebSocketAckQueue: Map< string, @@ -461,6 +466,25 @@ export class EngineActorDriver implements ActorDriver { invariant(actorConfig.key, "actor should have a key"); const key = deserializeActorKey(actorConfig.key); + // Initialize storage + const [persistDataBuffer] = await this.#runner.kvGet(actorId, [ + KEYS.PERSIST_DATA, + ]); + if (persistDataBuffer === null) { + const initialKvState = getInitialActorKvState(input); + await this.#runner.kvPut(actorId, initialKvState); + logger().debug({ + msg: "initialized persist data for new actor", + actorId, + }); + } else { + logger().debug({ + msg: "found existing persist data for actor", + actorId, + dataSize: persistDataBuffer.byteLength, + }); + } + // Create actor instance const definition = lookupInRegistry( this.#registryConfig, @@ -491,10 +515,20 @@ export class EngineActorDriver implements ActorDriver { ): Promise { logger().debug({ msg: "runner actor stopping", actorId, generation }); + // HACK: Retrieve the stop intent we tracked locally (see RVT-5284) + // Default to "sleep" if no intent was recorded (e.g., if the runner + // initiated the stop) + // + // TODO: This will not work if the actor is destroyed from the API + // correctly. Currently, it will use the sleep intent, but it's + // actually a destroy intent. + const reason = this.#actorStopIntent.get(actorId) ?? "sleep"; + this.#actorStopIntent.delete(actorId); + const handler = this.#actors.get(actorId); if (handler?.actor) { try { - await handler.actor.onStop(); + await handler.actor.onStop(reason); } catch (err) { logger().error({ msg: "error in onStop, proceeding with removing actor", @@ -504,7 +538,7 @@ export class EngineActorDriver implements ActorDriver { this.#actors.delete(actorId); } - logger().debug({ msg: "runner actor stopped", actorId }); + logger().debug({ msg: "runner actor stopped", actorId, reason }); } async #runnerFetch( @@ -777,9 +811,17 @@ export class EngineActorDriver implements ActorDriver { } startSleep(actorId: string) { + // HACK: Track intent for onActorStop (see RVT-5284) + this.#actorStopIntent.set(actorId, "sleep"); this.#runner.sleepActor(actorId); } + startDestroy(actorId: string) { + // HACK: Track intent for onActorStop (see RVT-5284) + this.#actorStopIntent.set(actorId, "destroy"); + this.#runner.stopActor(actorId); + } + async shutdownRunner(immediate: boolean): Promise { logger().info({ msg: "stopping engine actor driver", immediate }); @@ -816,7 +858,7 @@ export class EngineActorDriver implements ActorDriver { for (const [_actorId, handler] of this.#actors.entries()) { if (handler.actor) { stopPromises.push( - handler.actor.onStop().catch((err) => { + handler.actor.onStop("sleep").catch((err) => { handler.actor?.rLog.error({ msg: "onStop errored", error: stringifyError(err), diff --git a/rivetkit-typescript/packages/rivetkit/src/drivers/engine/mod.ts b/rivetkit-typescript/packages/rivetkit/src/drivers/engine/mod.ts index 2437df6f72..9fa1ef0095 100644 --- a/rivetkit-typescript/packages/rivetkit/src/drivers/engine/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/drivers/engine/mod.ts @@ -4,7 +4,6 @@ import type { RegistryConfig } from "@/registry/config"; import type { DriverConfig, RunnerConfig } from "@/registry/run-config"; import { RemoteManagerDriver } from "@/remote-manager-driver/mod"; import { EngineActorDriver } from "./actor-driver"; -import { type EngineConfigInput, EngingConfigSchema } from "./config"; export { EngineActorDriver } from "./actor-driver"; export { diff --git a/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/actor.ts b/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/actor.ts index 37989f220b..f78ceeadbc 100644 --- a/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/actor.ts +++ b/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/actor.ts @@ -91,4 +91,8 @@ export class FileSystemActorDriver implements ActorDriver { // Spawns the sleepActor promise this.#state.sleepActor(actorId); } + + async startDestroy(actorId: string): Promise { + await this.#state.destroyActor(actorId); + } } diff --git a/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/global-state.ts b/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/global-state.ts index 98578aecf1..c47cb4db8f 100644 --- a/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/global-state.ts +++ b/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/global-state.ts @@ -5,10 +5,7 @@ import type { AnyActorInstance } from "@/actor/instance/mod"; import type { ActorKey } from "@/actor/mod"; import { generateRandomString } from "@/actor/utils"; import type { AnyClient } from "@/client/client"; -import { - type ActorDriver, - serializeEmptyPersistData, -} from "@/driver-helpers/mod"; +import { type ActorDriver, getInitialActorKvState } from "@/driver-helpers/mod"; import type { RegistryConfig } from "@/registry/config"; import type { RunnerConfig } from "@/registry/run-config"; import type * as schema from "@/schemas/file-system-driver/mod"; @@ -43,6 +40,7 @@ interface ActorEntry { id: string; state?: schema.ActorState; + /** Promise for loading the actor state. */ loadPromise?: Promise; @@ -57,8 +55,13 @@ interface ActorEntry { /** Resolver for pending write operations that need to be notified when any write completes */ pendingWriteResolver?: PromiseWithResolvers; - /** If the actor has been removed by destroy or sleep. */ - removed: boolean; + /** If the actor is being destroyed. */ + destroying: boolean; + + // TODO: This might make sense to move in to actorstate, but we have a + // single reader/writer so it's not an issue + /** Generation of this actor when creating/destroying. */ + generation: string; } /** @@ -71,7 +74,12 @@ export class FileSystemGlobalState { #alarmsDir: string; #persist: boolean; + + // IMPORTANT: Never delete from this map. Doing so will result in race + // conditions since the actor generation will cease to be tracked + // correctly. Always increment generation if a new actor is created. #actors = new Map(); + #actorCountOnStartup: number = 0; #runnerParams?: { @@ -197,7 +205,8 @@ export class FileSystemGlobalState { entry = { id: actorId, - removed: false, + destroying: false, + generation: crypto.randomUUID(), }; this.#actors.set(actorId, entry); return entry; @@ -220,15 +229,17 @@ export class FileSystemGlobalState { const entry = this.#upsertEntry(actorId); - // Initialize kvStorage with the initial persist data + // Initialize storage const kvStorage: schema.ActorKvEntry[] = []; - const persistData = serializeEmptyPersistData(input); - // Store under key [1] - kvStorage.push({ - key: bufferToArrayBuffer(new Uint8Array([1])), - value: bufferToArrayBuffer(persistData), - }); + const initialKvState = getInitialActorKvState(input); + for (const [key, value] of initialKvState) { + kvStorage.push({ + key: bufferToArrayBuffer(key), + value: bufferToArrayBuffer(value), + }); + } + // Initialize metadata entry.state = { actorId, name, @@ -236,7 +247,11 @@ export class FileSystemGlobalState { createdAt: BigInt(Date.now()), kvStorage, }; - await this.writeActor(actorId, entry.state); + entry.destroying = false; + entry.generation = crypto.randomUUID(); + + await this.writeActor(actorId, entry.generation, entry.state); + return entry; } @@ -264,6 +279,7 @@ export class FileSystemGlobalState { // Start loading state entry.loadPromise = this.loadActorState(entry); + entry.loadPromise.then((res) => {}); return entry.loadPromise; } @@ -307,14 +323,19 @@ export class FileSystemGlobalState { // If no state for this actor, then create & write state if (!entry.state) { + if (entry.destroying) { + throw new Error(`Actor ${actorId} destroying`); + } + // Initialize kvStorage with the initial persist data const kvStorage: schema.ActorKvEntry[] = []; - const persistData = serializeEmptyPersistData(input); - // Store under key [1] - kvStorage.push({ - key: bufferToArrayBuffer(new Uint8Array([1])), - value: bufferToArrayBuffer(persistData), - }); + const initialKvState = getInitialActorKvState(input); + for (const [key, value] of initialKvState) { + kvStorage.push({ + key: bufferToArrayBuffer(key), + value: bufferToArrayBuffer(value), + }); + } entry.state = { actorId, @@ -323,7 +344,7 @@ export class FileSystemGlobalState { createdAt: BigInt(Date.now()), kvStorage, }; - await this.writeActor(actorId, entry.state); + await this.writeActor(actorId, entry.generation, entry.state); } return entry; } @@ -334,29 +355,128 @@ export class FileSystemGlobalState { "cannot sleep actor with memory driver, must use file system driver", ); - const actor = this.#actors.get(actorId); + // Get the actor. We upsert it even though we're about to destroy it so we have a lock on flagging `destroying` as true. + const actor = this.#upsertEntry(actorId); invariant(actor, `tried to sleep ${actorId}, does not exist`); + // Check if already destroying + if (actor.destroying) { + return; + } + actor.destroying = true; + // Wait for actor to fully start before stopping it to avoid race conditions if (actor.loadPromise) await actor.loadPromise.catch(); if (actor.startPromise?.promise) await actor.startPromise.promise.catch(); - // Mark as removed - actor.removed = true; - // Stop actor invariant(actor.actor, "actor should be loaded"); - await actor.actor.onStop(); + await actor.actor.onStop("sleep"); // Remove from map after stop is complete this.#actors.delete(actorId); } + async destroyActor(actorId: string) { + // Get the actor. We upsert it even though we're about to destroy it so we have a lock on flagging `destroying` as true. + const actor = this.#upsertEntry(actorId); + + // If actor is loaded, stop it first + // Check if already destroying + if (actor.destroying) { + return; + } + actor.destroying = true; + + // Wait for actor to fully start before stopping it to avoid race conditions + if (actor.loadPromise) await actor.loadPromise.catch(); + if (actor.startPromise?.promise) + await actor.startPromise.promise.catch(); + + // Stop actor if it's running + if (actor.actor) { + await actor.actor.onStop("destroy"); + } + + // Clear alarm timeout if exists + if (actor.alarmTimeout) { + actor.alarmTimeout.abort(); + } + + // Delete persisted files if using file system driver + if (this.#persist) { + const fs = getNodeFs(); + + // Delete all actor files in parallel + await Promise.all([ + // Delete actor state file + (async () => { + try { + await fs.unlink(this.getActorStatePath(actorId)); + } catch (err: any) { + if (err?.code !== "ENOENT") { + logger().error({ + msg: "failed to delete actor state file", + actorId, + error: stringifyError(err), + }); + } + } + })(), + // Delete actor database file + (async () => { + try { + await fs.unlink(this.getActorDbPath(actorId)); + } catch (err: any) { + if (err?.code !== "ENOENT") { + logger().error({ + msg: "failed to delete actor database file", + actorId, + error: stringifyError(err), + }); + } + } + })(), + // Delete actor alarm file + (async () => { + try { + await fs.unlink(this.getActorAlarmPath(actorId)); + } catch (err: any) { + if (err?.code !== "ENOENT") { + logger().error({ + msg: "failed to delete actor alarm file", + actorId, + error: stringifyError(err), + }); + } + } + })(), + ]); + } + + // Reset the entry + // + // Do not remove entry in order to avoid race condition with + // destroying. Next actor creation will increment the generation. + actor.state = undefined; + actor.loadPromise = undefined; + actor.actor = undefined; + actor.startPromise = undefined; + actor.alarmTimeout = undefined; + actor.alarmTimeout = undefined; + actor.pendingWriteResolver = undefined; + actor.destroying = false; + } + /** * Save actor state to disk. */ - async writeActor(actorId: string, state: schema.ActorState): Promise { + async writeActor( + actorId: string, + generation: string, + state: schema.ActorState, + ): Promise { if (!this.#persist) { return; } @@ -364,13 +484,26 @@ export class FileSystemGlobalState { const entry = this.#actors.get(actorId); invariant(entry, "actor entry does not exist"); - await this.#performWrite(actorId, state); + await this.#performWrite(actorId, generation, state); + } + + isGenerationCurrent(actorId: string, generation: string): boolean { + const entry = this.#upsertEntry(actorId); + if (!entry) return false; + return !entry.destroying && entry.generation === generation; } async setActorAlarm(actorId: string, timestamp: number) { const entry = this.#actors.get(actorId); invariant(entry, "actor entry does not exist"); + // Track generation of the actor when the write started to detect + // destroy/create race condition + const writeGeneration = entry.generation; + if (entry.destroying) { + logger().info("skipping set alarm since actor destroying"); + } + // Persist alarm to disk if (this.#persist) { const alarmPath = this.getActorAlarmPath(actorId); @@ -389,6 +522,14 @@ export class FileSystemGlobalState { ); const fs = getNodeFs(); await fs.writeFile(tempPath, data); + + if (this.isGenerationCurrent(actorId, writeGeneration)) { + logger().debug( + "skipping writing alarm since actor destroying or new generation", + ); + return; + } + await fs.rename(tempPath, alarmPath); } catch (error) { try { @@ -413,6 +554,7 @@ export class FileSystemGlobalState { */ async #performWrite( actorId: string, + generation: string, state: schema.ActorState, ): Promise { const dataPath = this.getActorStatePath(actorId); @@ -439,6 +581,14 @@ export class FileSystemGlobalState { ACTOR_STATE_VERSIONED.serializeWithEmbeddedVersion(bareState); const fs = getNodeFs(); await fs.writeFile(tempPath, serializedState); + + if (this.isGenerationCurrent(actorId, generation)) { + logger().debug( + "skipping writing alarm since actor destroying or new generation", + ); + return; + } + await fs.rename(tempPath, dataPath); } catch (error) { // Cleanup temp file on error @@ -763,7 +913,11 @@ export class FileSystemGlobalState { ): Promise { const entry = await this.loadActor(actorId); if (!entry.state) { - throw new Error(`Actor ${actorId} state not loaded`); + if (entry.destroying) { + return; + } else { + throw new Error(`Actor ${actorId} state not loaded`); + } } // Create a mutable copy of kvStorage @@ -798,7 +952,7 @@ export class FileSystemGlobalState { }; // Save state to disk - await this.writeActor(actorId, entry.state); + await this.writeActor(actorId, entry.generation, entry.state); } /** @@ -810,7 +964,11 @@ export class FileSystemGlobalState { ): Promise<(Uint8Array | null)[]> { const entry = await this.loadActor(actorId); if (!entry.state) { - throw new Error(`Actor ${actorId} state not loaded`); + if (entry.destroying) { + throw new Error(`Actor ${actorId} is destroying`); + } else { + throw new Error(`Actor ${actorId} state not loaded`); + } } const results: (Uint8Array | null)[] = []; @@ -835,7 +993,11 @@ export class FileSystemGlobalState { async kvBatchDelete(actorId: string, keys: Uint8Array[]): Promise { const entry = await this.loadActor(actorId); if (!entry.state) { - throw new Error(`Actor ${actorId} state not loaded`); + if (entry.destroying) { + return; + } else { + throw new Error(`Actor ${actorId} state not loaded`); + } } // Create a mutable copy of kvStorage @@ -859,7 +1021,7 @@ export class FileSystemGlobalState { }; // Save state to disk - await this.writeActor(actorId, entry.state); + await this.writeActor(actorId, entry.generation, entry.state); } /** @@ -871,7 +1033,11 @@ export class FileSystemGlobalState { ): Promise<[Uint8Array, Uint8Array][]> { const entry = await this.loadActor(actorId); if (!entry.state) { - throw new Error(`Actor ${actorId} state not loaded`); + if (entry.destroying) { + throw new Error(`Actor ${actorId} is destroying`); + } else { + throw new Error(`Actor ${actorId} state not loaded`); + } } const results: [Uint8Array, Uint8Array][] = []; diff --git a/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/manager.ts b/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/manager.ts index abf5e2a12e..6216d5ff7e 100644 --- a/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/manager.ts +++ b/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/manager.ts @@ -1,6 +1,7 @@ import type { Context as HonoContext } from "hono"; import invariant from "invariant"; import { generateConnRequestId } from "@/actor/conn/mod"; +import { ActorDestroying } from "@/actor/errors"; import { type ActorRouter, createActorRouter } from "@/actor/router"; import { handleRawWebSocket, @@ -263,6 +264,9 @@ export class FileSystemManagerDriver implements ManagerDriver { if (!actor.state) { return undefined; } + if (actor.destroying) { + throw new ActorDestroying(actorId); + } try { // Load actor state diff --git a/website/src/content/docs/actors/quickstart/cloudflare-workers.mdx b/website/src/content/docs/actors/quickstart/cloudflare-workers.mdx index 9f8081bbdf..aa0a25efb7 100644 --- a/website/src/content/docs/actors/quickstart/cloudflare-workers.mdx +++ b/website/src/content/docs/actors/quickstart/cloudflare-workers.mdx @@ -114,7 +114,7 @@ Configure your `wrangler.json` for Cloudflare Workers: "migrations": [ { "tag": "v1", - "new_classes": ["ActorHandler"] + "new_sqlite_classes": ["ActorHandler"] } ], "durable_objects": { diff --git a/website/src/content/docs/integrations/cloudflare-workers.mdx b/website/src/content/docs/integrations/cloudflare-workers.mdx index f7ffc32e3a..b5a8a71cea 100644 --- a/website/src/content/docs/integrations/cloudflare-workers.mdx +++ b/website/src/content/docs/integrations/cloudflare-workers.mdx @@ -87,7 +87,7 @@ Update your `wrangler.json` configuration to support `ACTOR_DO` and `ACTOR_KV` b "migrations": [ { "tag": "v1", - "new_classes": ["ActorHandler"] + "new_sqlite_classes": ["ActorHandler"] } ], "durable_objects": {