Skip to content

Commit 892e9cb

Browse files
committed
wip: cf workers fix
1 parent 90b2547 commit 892e9cb

File tree

5 files changed

+218
-145
lines changed

5 files changed

+218
-145
lines changed

rivetkit-typescript/packages/cloudflare-workers/src/actor-driver.ts

Lines changed: 92 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import invariant from "invariant";
22
import type {
3+
ActorKey,
4+
ActorRouter,
35
AnyActorInstance as CoreAnyActorInstance,
46
RegistryConfig,
57
RunConfig,
@@ -12,10 +14,10 @@ import type {
1214
ManagerDriver,
1315
} from "rivetkit/driver-helpers";
1416
import { promiseWithResolvers } from "rivetkit/utils";
15-
import { buildActorId, parseActorId } from "./actor-id";
16-
import { GLOBAL_KV_KEYS } from "./global_kv";
17+
import { parseActorId } from "./actor-id";
18+
import { kvDelete, kvGet, kvListPrefix, kvPut } from "./actor-kv";
19+
import { GLOBAL_KV_KEYS } from "./global-kv";
1720
import { getCloudflareAmbientEnv } from "./handler";
18-
import { kvDelete, kvGet, kvListPrefix, kvPut } from "./kv_query";
1921

2022
interface DurableObjectGlobalState {
2123
ctx: DurableObjectState;
@@ -31,8 +33,8 @@ export class CloudflareDurableObjectGlobalState {
3133
// Map of actor ID -> DO state
3234
#dos: Map<string, DurableObjectGlobalState> = new Map();
3335

34-
// Map of DO ID -> ActorHandler
35-
#actors: Map<string, ActorHandler> = new Map();
36+
// WeakMap of DO state -> ActorGlobalState for proper GC
37+
#actors: WeakMap<DurableObjectState, ActorGlobalState> = new WeakMap();
3638

3739
getDOState(doId: string): DurableObjectGlobalState {
3840
const state = this.#dos.get(doId);
@@ -47,20 +49,40 @@ export class CloudflareDurableObjectGlobalState {
4749
this.#dos.set(doId, state);
4850
}
4951

50-
get actors() {
51-
return this.#actors;
52+
getActorState(ctx: DurableObjectState): ActorGlobalState | undefined {
53+
return this.#actors.get(ctx);
54+
}
55+
56+
setActorState(ctx: DurableObjectState, actorState: ActorGlobalState): void {
57+
this.#actors.set(ctx, actorState);
5258
}
5359
}
5460

5561
export interface DriverContext {
5662
state: DurableObjectState;
5763
}
5864

59-
// Actor handler to track running instances
60-
class ActorHandler {
61-
actor?: AnyActorInstance;
62-
actorPromise?: ReturnType<typeof promiseWithResolvers<void>> =
63-
promiseWithResolvers();
65+
interface InitializedData {
66+
name: string;
67+
key: ActorKey;
68+
generation: number;
69+
}
70+
71+
interface LoadedActor {
72+
actorRouter: ActorRouter;
73+
actorDriver: ActorDriver;
74+
generation: number;
75+
}
76+
77+
// Actor global state to track running instances
78+
export class ActorGlobalState {
79+
// Initialization state
80+
initialized?: InitializedData;
81+
82+
// Loaded actor state
83+
actor?: LoadedActor;
84+
actorInstance?: AnyActorInstance;
85+
actorPromise?: ReturnType<typeof promiseWithResolvers<void>>;
6486

6587
/**
6688
* Indicates if `startDestroy` has been called.
@@ -70,6 +92,14 @@ class ActorHandler {
7092
* See the corresponding `destroyed` property in SQLite metadata.
7193
*/
7294
destroying: boolean = false;
95+
96+
reset() {
97+
this.initialized = undefined;
98+
this.actor = undefined;
99+
this.actorInstance = undefined;
100+
this.actorPromise = undefined;
101+
this.destroying = false;
102+
}
73103
}
74104

75105
export class CloudflareActorsActorDriver implements ActorDriver {
@@ -103,23 +133,36 @@ export class CloudflareActorsActorDriver implements ActorDriver {
103133
// Parse actor ID to get DO ID and generation
104134
const [doId, expectedGeneration] = parseActorId(actorId);
105135

136+
// Get the DO state
137+
const doState = this.#globalState.getDOState(doId);
138+
106139
// Check if actor is already loaded
107-
let handler = this.#globalState.actors.get(doId);
108-
if (handler) {
109-
if (handler.actorPromise) await handler.actorPromise.promise;
110-
if (!handler.actor) throw new Error("Actor should be loaded");
111-
return handler.actor;
140+
let actorState = this.#globalState.getActorState(doState.ctx);
141+
if (actorState?.actorInstance) {
142+
// Actor is already loaded, return it
143+
return actorState.actorInstance;
112144
}
113145

114-
// Create new actor handler
115-
handler = new ActorHandler();
116-
this.#globalState.actors.set(doId, handler);
146+
// Create new actor state if it doesn't exist
147+
if (!actorState) {
148+
actorState = new ActorGlobalState();
149+
actorState.actorPromise = promiseWithResolvers();
150+
this.#globalState.setActorState(doState.ctx, actorState);
151+
}
117152

118-
// Get the actor metadata from Durable Object storage
119-
const doState = this.#globalState.getDOState(doId);
120-
const sql = doState.ctx.storage.sql;
153+
// Another request is already loading this actor, wait for it
154+
if (actorState.actorPromise) {
155+
await actorState.actorPromise.promise;
156+
if (!actorState.actorInstance) {
157+
throw new Error(
158+
`Actor ${actorId} failed to load in concurrent request`,
159+
);
160+
}
161+
return actorState.actorInstance;
162+
}
121163

122-
// Load actor metadata from SQL table
164+
// Load actor metadata
165+
const sql = doState.ctx.storage.sql;
123166
const cursor = sql.exec(
124167
"SELECT name, key, destroyed, generation FROM _rivetkit_metadata LIMIT 1",
125168
);
@@ -150,10 +193,10 @@ export class CloudflareActorsActorDriver implements ActorDriver {
150193

151194
// Create actor instance
152195
const definition = lookupInRegistry(this.#registryConfig, name);
153-
handler.actor = definition.instantiate();
196+
actorState.actorInstance = definition.instantiate();
154197

155198
// Start actor
156-
await handler.actor.start(
199+
await actorState.actorInstance.start(
157200
this,
158201
this.#inlineClient,
159202
actorId,
@@ -163,10 +206,10 @@ export class CloudflareActorsActorDriver implements ActorDriver {
163206
);
164207

165208
// Finish
166-
handler.actorPromise?.resolve();
167-
handler.actorPromise = undefined;
209+
actorState.actorPromise?.resolve();
210+
actorState.actorPromise = undefined;
168211

169-
return handler.actor;
212+
return actorState.actorInstance;
170213
}
171214

172215
getContext(actorId: string): DriverContext {
@@ -231,21 +274,32 @@ export class CloudflareActorsActorDriver implements ActorDriver {
231274
// Parse actor ID to get DO ID and generation
232275
const [doId, generation] = parseActorId(actorId);
233276

234-
const handler = this.#globalState.actors.get(doId);
277+
// Get the DO state
278+
const doState = this.#globalState.getDOState(doId);
279+
const actorState = this.#globalState.getActorState(doState.ctx);
235280

236281
// Actor not loaded, nothing to destroy
237-
if (!handler || !handler.actor) {
282+
if (!actorState?.actorInstance) {
238283
return;
239284
}
240285

241286
// Check if already destroying
242-
if (handler.destroying) {
287+
if (actorState.destroying) {
243288
return;
244289
}
245-
handler.destroying = true;
290+
actorState.destroying = true;
291+
292+
// Spawn onStop in background
293+
this.#callOnStopAsync(actorId, doId, actorState.actorInstance);
294+
}
246295

296+
async #callOnStopAsync(
297+
actorId: string,
298+
doId: string,
299+
actor: CoreAnyActorInstance,
300+
) {
247301
// Stop
248-
handler.actor.onStop("destroy");
302+
await actor.onStop("destroy");
249303

250304
// Remove state
251305
const doState = this.#globalState.getDOState(doId);
@@ -254,15 +308,17 @@ export class CloudflareActorsActorDriver implements ActorDriver {
254308
sql.exec("DELETE FROM _rivetkit_kv_storage");
255309

256310
// Clear any scheduled alarms
257-
doState.ctx.storage.deleteAlarm();
311+
await doState.ctx.storage.deleteAlarm();
258312

259313
// Delete from ACTOR_KV in the background - use full actorId including generation
260314
const env = getCloudflareAmbientEnv();
261315
doState.ctx.waitUntil(
262316
env.ACTOR_KV.delete(GLOBAL_KV_KEYS.actorMetadata(actorId)),
263317
);
264318

265-
this.#globalState.actors.delete(doId);
319+
// Reset global state using the DO context
320+
const actorHandle = this.#globalState.getActorState(doState.ctx);
321+
actorHandle?.reset();
266322
}
267323
}
268324

0 commit comments

Comments
 (0)