diff --git a/rivetkit-typescript/packages/cloudflare-workers/src/actor-driver.ts b/rivetkit-typescript/packages/cloudflare-workers/src/actor-driver.ts index 9278862d87..bcc2b5ca6b 100644 --- a/rivetkit-typescript/packages/cloudflare-workers/src/actor-driver.ts +++ b/rivetkit-typescript/packages/cloudflare-workers/src/actor-driver.ts @@ -1,5 +1,7 @@ import invariant from "invariant"; import type { + ActorKey, + ActorRouter, AnyActorInstance as CoreAnyActorInstance, RegistryConfig, RunConfig, @@ -31,8 +33,8 @@ export class CloudflareDurableObjectGlobalState { // Map of actor ID -> DO state #dos: Map = new Map(); - // Map of DO ID -> ActorHandler - #actors: Map = new Map(); + // WeakMap of DO state -> ActorGlobalState for proper GC + #actors: WeakMap = new WeakMap(); getDOState(doId: string): DurableObjectGlobalState { const state = this.#dos.get(doId); @@ -47,8 +49,12 @@ export class CloudflareDurableObjectGlobalState { this.#dos.set(doId, state); } - get actors() { - return this.#actors; + getActorState(ctx: DurableObjectState): ActorGlobalState | undefined { + return this.#actors.get(ctx); + } + + setActorState(ctx: DurableObjectState, actorState: ActorGlobalState): void { + this.#actors.set(ctx, actorState); } } @@ -56,11 +62,27 @@ export interface DriverContext { state: DurableObjectState; } -// Actor handler to track running instances -class ActorHandler { - actor?: AnyActorInstance; - actorPromise?: ReturnType> = - promiseWithResolvers(); +interface InitializedData { + name: string; + key: ActorKey; + generation: number; +} + +interface LoadedActor { + actorRouter: ActorRouter; + actorDriver: ActorDriver; + generation: number; +} + +// Actor global state to track running instances +export class ActorGlobalState { + // Initialization state + initialized?: InitializedData; + + // Loaded actor state + actor?: LoadedActor; + actorInstance?: AnyActorInstance; + actorPromise?: ReturnType>; /** * Indicates if `startDestroy` has been called. @@ -70,6 +92,14 @@ class ActorHandler { * See the corresponding `destroyed` property in SQLite metadata. */ destroying: boolean = false; + + reset() { + this.initialized = undefined; + this.actor = undefined; + this.actorInstance = undefined; + this.actorPromise = undefined; + this.destroying = false; + } } export class CloudflareActorsActorDriver implements ActorDriver { @@ -103,20 +133,24 @@ export class CloudflareActorsActorDriver implements ActorDriver { // Parse actor ID to get DO ID and generation const [doId, expectedGeneration] = parseActorId(actorId); + // Get the DO state + const doState = this.#globalState.getDOState(doId); + // Check if actor is already loaded - let handler = this.#globalState.actors.get(doId); - if (handler) { - if (handler.actorPromise) await handler.actorPromise.promise; - if (!handler.actor) throw new Error("Actor should be loaded"); - return handler.actor; + let handler = this.#globalState.getActorState(doState.ctx); + if (handler && handler.actorInstance) { + // Actor is already loaded, return it + return handler.actorInstance; } - // Create new actor handler - handler = new ActorHandler(); - this.#globalState.actors.set(doId, handler); + // Create new actor handler if it doesn't exist + if (!handler) { + handler = new ActorGlobalState(); + handler.actorPromise = promiseWithResolvers(); + this.#globalState.setActorState(doState.ctx, handler); + } // Get the actor metadata from Durable Object storage - const doState = this.#globalState.getDOState(doId); const sql = doState.ctx.storage.sql; // Load actor metadata from SQL table @@ -150,10 +184,10 @@ export class CloudflareActorsActorDriver implements ActorDriver { // Create actor instance const definition = lookupInRegistry(this.#registryConfig, name); - handler.actor = definition.instantiate(); + handler.actorInstance = definition.instantiate(); // Start actor - await handler.actor.start( + await handler.actorInstance.start( this, this.#inlineClient, actorId, @@ -166,7 +200,7 @@ export class CloudflareActorsActorDriver implements ActorDriver { handler.actorPromise?.resolve(); handler.actorPromise = undefined; - return handler.actor; + return handler.actorInstance; } getContext(actorId: string): DriverContext { @@ -231,10 +265,12 @@ export class CloudflareActorsActorDriver implements ActorDriver { // Parse actor ID to get DO ID and generation const [doId, generation] = parseActorId(actorId); - const handler = this.#globalState.actors.get(doId); + // Get the DO state + const doState = this.#globalState.getDOState(doId); + const handler = this.#globalState.getActorState(doState.ctx); // Actor not loaded, nothing to destroy - if (!handler || !handler.actor) { + if (!handler || !handler.actorInstance) { return; } @@ -244,8 +280,17 @@ export class CloudflareActorsActorDriver implements ActorDriver { } handler.destroying = true; + // Spawn onStop + this.#callOnStopAsync(actorId, doId, handler.actorInstance); + } + + async #callOnStopAsync( + actorId: string, + doId: string, + actor: CoreAnyActorInstance, + ) { // Stop - handler.actor.onStop("destroy"); + await actor.onStop("destroy"); // Remove state const doState = this.#globalState.getDOState(doId); @@ -254,7 +299,7 @@ export class CloudflareActorsActorDriver implements ActorDriver { sql.exec("DELETE FROM _rivetkit_kv_storage"); // Clear any scheduled alarms - doState.ctx.storage.deleteAlarm(); + await doState.ctx.storage.deleteAlarm(); // Delete from ACTOR_KV in the background - use full actorId including generation const env = getCloudflareAmbientEnv(); @@ -262,7 +307,9 @@ export class CloudflareActorsActorDriver implements ActorDriver { env.ACTOR_KV.delete(GLOBAL_KV_KEYS.actorMetadata(actorId)), ); - this.#globalState.actors.delete(doId); + // Reset global state using the DO context + const actorHandle = this.#globalState.getActorState(doState.ctx); + actorHandle?.reset(); } } diff --git a/rivetkit-typescript/packages/cloudflare-workers/src/actor-handler-do.ts b/rivetkit-typescript/packages/cloudflare-workers/src/actor-handler-do.ts index 7aceea8926..dce9271e4f 100644 --- a/rivetkit-typescript/packages/cloudflare-workers/src/actor-handler-do.ts +++ b/rivetkit-typescript/packages/cloudflare-workers/src/actor-handler-do.ts @@ -5,8 +5,9 @@ import type { ActorKey, ActorRouter, Registry, RunConfig } from "rivetkit"; import { createActorRouter, createClientWithDriver } from "rivetkit"; import type { ActorDriver, ManagerDriver } from "rivetkit/driver-helpers"; import { serializeEmptyPersistData } from "rivetkit/driver-helpers"; -import { promiseWithResolvers, stringifyError } from "rivetkit/utils"; +import { stringifyError } from "rivetkit/utils"; import { + ActorGlobalState, CloudflareDurableObjectGlobalState, createCloudflareActorsActorDriverBuilder, } from "./actor-driver"; @@ -23,12 +24,7 @@ const KEYS = { }; export interface ActorHandlerInterface extends DurableObject { - create( - req: ActorInitRequest & { allowExisting?: boolean }, - ): Promise< - | { success: { actorId: string; created: boolean } } - | { error: { actorAlreadyExists: true } } - >; + create(req: ActorInitRequest): Promise; getMetadata(): Promise< | { actorId: string; @@ -44,24 +40,16 @@ export interface ActorInitRequest { name: string; key: ActorKey; input?: unknown; + allowExisting: boolean; } - -interface InitializedData { - name: string; - key: ActorKey; - generation: number; -} +export type ActorInitResponse = + | { success: { actorId: string; created: boolean } } + | { error: { actorAlreadyExists: true } }; export type DurableObjectConstructor = new ( ...args: ConstructorParameters> ) => DurableObject; -interface LoadedActor { - actorRouter: ActorRouter; - actorDriver: ActorDriver; - generation: number; -} - export function createActorDurableObject( registry: Registry, rootRunConfig: RunConfig, @@ -81,10 +69,12 @@ export function createActorDurableObject( extends DurableObject implements ActorHandlerInterface { - #initialized?: InitializedData; - #initializedPromise?: ReturnType>; - - #actor?: LoadedActor; + /** + * This holds a strong reference to ActorGlobalState. + * CloudflareDurableObjectGlobalState holds a weak reference so we can + * access it elsewhere. + **/ + #state?: ActorGlobalState; constructor( ...args: ConstructorParameters> @@ -113,59 +103,68 @@ export function createActorDurableObject( generation INTEGER DEFAULT 0 ); `); + + // Get or create the actor state from the global WeakMap + this.#state = globalState.getActorState(this.ctx); + if (!this.#state) { + this.#state = new ActorGlobalState(); + globalState.setActorState(this.ctx, this.#state); + } } - async #loadActor(): Promise { - // Wait for init - if (!this.#initialized) { - // Wait for init - if (this.#initializedPromise) { - await this.#initializedPromise.promise; - } else { - this.#initializedPromise = promiseWithResolvers(); - - // Query SQL for initialization data - const cursor = this.ctx.storage.sql.exec( - "SELECT name, key, destroyed, generation FROM _rivetkit_metadata WHERE id = 1", - ); - const result = cursor.raw().next(); - - if (!result.done && result.value) { - const name = result.value[0] as string; - const key = JSON.parse( - result.value[1] as string, - ) as ActorKey; - const destroyed = result.value[2] as number; - const generation = result.value[3] as number; - - // Only initialize if not destroyed - if (!destroyed) { - logger().debug({ - msg: "already initialized", - name, - key, - generation, - }); - - this.#initialized = { name, key, generation }; - this.#initializedPromise.resolve(); - } else { - logger().debug( - "actor is destroyed, waiting to re-initialize", - ); - } + async #loadActor() { + invariant(this.#state, "State should be initialized"); + + // Check if initialized + if (!this.#state.initialized) { + // Query SQL for initialization data + const cursor = this.ctx.storage.sql.exec( + "SELECT name, key, destroyed, generation FROM _rivetkit_metadata WHERE id = 1", + ); + const result = cursor.raw().next(); + + if (!result.done && result.value) { + const name = result.value[0] as string; + const key = JSON.parse( + result.value[1] as string, + ) as ActorKey; + const destroyed = result.value[2] as number; + const generation = result.value[3] as number; + + // Only initialize if not destroyed + if (!destroyed) { + logger().debug({ + msg: "already initialized", + name, + key, + generation, + }); + + this.#state.initialized = { name, key, generation }; } else { - logger().debug("waiting to initialize"); + logger().debug("actor is destroyed, cannot load"); + throw new Error("Actor is destroyed"); } + } else { + logger().debug("not initialized"); + throw new Error("Actor is not initialized"); } } // Check if already loaded - if (this.#actor) { - return this.#actor; + if (this.#state.actor) { + // Assert that the cached actor has the correct generation + // This will catch any cases where #state.actor has a stale generation + invariant( + !this.#state.initialized || + this.#state.actor.generation === + this.#state.initialized.generation, + `Stale actor cached: actor generation ${this.#state.actor.generation} != initialized generation ${this.#state.initialized?.generation}. This should not happen.`, + ); + return this.#state.actor; } - if (!this.#initialized) throw new Error("Not initialized"); + if (!this.#state.initialized) throw new Error("Not initialized"); // Register DO with global state first // HACK: This leaks the DO context, but DO does not provide a native way @@ -207,23 +206,23 @@ export function createActorDurableObject( ); // Save actor with generation - this.#actor = { + this.#state.actor = { actorRouter, actorDriver, - generation: this.#initialized.generation, + generation: this.#state.initialized.generation, }; // Build actor ID with generation for loading const actorIdWithGen = buildActorId( actorId, - this.#initialized.generation, + this.#state.initialized.generation, ); // Initialize the actor instance with proper metadata // This ensures the actor driver knows about this actor await actorDriver.loadActor(actorIdWithGen); - return this.#actor; + return this.#state.actor; } /** RPC called to get actor metadata without creating it */ @@ -263,7 +262,7 @@ export function createActorDurableObject( const doId = this.ctx.id.toString(); const actorId = buildActorId(doId, generation); const destroying = - globalState.actors.get(doId)?.destroying ?? false; + globalState.getActorState(this.ctx)?.destroying ?? false; logger().debug({ msg: "getMetadata: found actor metadata", @@ -284,12 +283,7 @@ export function createActorDurableObject( } /** RPC called by the manager to create a DO. Can optionally allow existing actors. */ - async create( - req: ActorInitRequest & { allowExisting?: boolean }, - ): Promise< - | { success: { actorId: string; created: boolean } } - | { error: { actorAlreadyExists: true } } - > { + async create(req: ActorInitRequest): Promise { // Check if actor exists const checkCursor = this.ctx.storage.sql.exec( "SELECT destroyed, generation FROM _rivetkit_metadata WHERE id = 1", @@ -330,6 +324,14 @@ export function createActorDurableObject( // Actor exists but is destroyed - resurrect with incremented generation generation = generation + 1; created = true; + + // Clear stale actor from previous generation + // This is necessary because the DO instance may still be in memory + // with the old #state.actor field from before the destroy + if (this.#state) { + this.#state.actor = undefined; + } + logger().debug({ msg: "resurrecting destroyed actor", key: req.key, @@ -361,7 +363,13 @@ export function createActorDurableObject( generation, ); - this.#initialized = { + // Ensure state is initialized + if (!this.#state) { + this.#state = new ActorGlobalState(); + globalState.setActorState(this.ctx, this.#state); + } + + this.#state.initialized = { name: req.name, key: req.key, generation, @@ -391,7 +399,9 @@ export function createActorDurableObject( await this.#loadActor(); logger().debug({ - msg: created ? "actor created/resurrected" : "returning existing actor", + msg: created + ? "actor created/resurrected" + : "returning existing actor", actorId, created, generation, diff --git a/rivetkit-typescript/packages/cloudflare-workers/src/manager-driver.ts b/rivetkit-typescript/packages/cloudflare-workers/src/manager-driver.ts index 7d50c521f3..5a871815fe 100644 --- a/rivetkit-typescript/packages/cloudflare-workers/src/manager-driver.ts +++ b/rivetkit-typescript/packages/cloudflare-workers/src/manager-driver.ts @@ -21,7 +21,7 @@ import { ActorDestroying, InternalError, } from "rivetkit/errors"; -import { stringifyError } from "rivetkit/utils"; +import { assertUnreachable, stringifyError } from "rivetkit/utils"; import { buildActorId, parseActorId } from "./actor-id"; import { GLOBAL_KV_KEYS } from "./global_kv"; import { getCloudflareAmbientEnv } from "./handler"; @@ -333,25 +333,32 @@ export class CloudflareActorsManagerDriver implements ManagerDriver { // Get or create actor using the Durable Object's method const actor = env.ACTOR_DO.get(doId); - const result = await actor.getOrCreate({ + const result = await actor.create({ name, key, input, + allowExisting: true, }); + if ("success" in result) { + const { actorId, created } = result.success; + logger().debug({ + msg: "getOrCreateWithKey result", + actorId, + name, + key, + created, + }); - logger().debug({ - msg: "getOrCreateWithKey result", - actorId: result.actorId, - name, - key, - created: result.created, - }); - - return { - actorId: result.actorId, - name, - key, - }; + return { + actorId, + name, + key, + }; + } else if ("error" in result) { + throw new Error(`Error: ${JSON.stringify(result.error)}`); + } else { + assertUnreachable(result); + } } async createActor({ @@ -373,22 +380,27 @@ export class CloudflareActorsManagerDriver implements ManagerDriver { name, key, input, + allowExisting: false, }); - // Check if there was an error - if ("error" in result) { + if ("success" in result) { + const { actorId } = result.success; + return { + actorId, + name, + key, + }; + } else if ("error" in result) { if (result.error.actorAlreadyExists) { throw new ActorAlreadyExists(name, key); } - // Should never happen, but handle unknown errors - throw new InternalError("Unknown error creating actor"); - } - return { - actorId: result.success.actorId, - name: result.success.name, - key: result.success.key, - }; + throw new InternalError( + `Unknown error creating actor: ${JSON.stringify(result.error)}`, + ); + } else { + assertUnreachable(result); + } } async listActors({ c, name }: ListActorsInput): Promise {