Skip to content

Commit 63562bb

Browse files
committed
feat(rivetkit): add ability to destroy actors
1 parent 3777832 commit 63562bb

File tree

30 files changed

+1257
-283
lines changed

30 files changed

+1257
-283
lines changed

examples/cloudflare-workers-hono/wrangler.json

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

examples/cloudflare-workers/wrangler.json

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pnpm-lock.yaml

Lines changed: 0 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rivetkit-typescript/packages/cloudflare-workers/src/actor-driver.ts

Lines changed: 106 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@ import type {
1212
ManagerDriver,
1313
} from "rivetkit/driver-helpers";
1414
import { promiseWithResolvers } from "rivetkit/utils";
15-
import { KEYS } from "./actor-handler-do";
15+
import { buildActorId, parseActorId } from "./actor-id";
16+
import { GLOBAL_KV_KEYS } from "./global_kv";
17+
import { getCloudflareAmbientEnv } from "./handler";
18+
import { kvDelete, kvGet, kvListPrefix, kvPut } from "./kv_query";
1619

1720
interface DurableObjectGlobalState {
1821
ctx: DurableObjectState;
@@ -25,20 +28,27 @@ interface DurableObjectGlobalState {
2528
* This allows for storing the actor context globally and looking it up by ID in `CloudflareActorsActorDriver`.
2629
*/
2730
export class CloudflareDurableObjectGlobalState {
28-
// Single map for all actor state
31+
// Map of actor ID -> DO state
2932
#dos: Map<string, DurableObjectGlobalState> = new Map();
3033

31-
getDOState(actorId: string): DurableObjectGlobalState {
32-
const state = this.#dos.get(actorId);
34+
// Map of DO ID -> ActorHandler
35+
#actors: Map<string, ActorHandler> = new Map();
36+
37+
getDOState(doId: string): DurableObjectGlobalState {
38+
const state = this.#dos.get(doId);
3339
invariant(
3440
state !== undefined,
3541
"durable object state not in global state",
3642
);
3743
return state;
3844
}
3945

40-
setDOState(actorId: string, state: DurableObjectGlobalState) {
41-
this.#dos.set(actorId, state);
46+
setDOState(doId: string, state: DurableObjectGlobalState) {
47+
this.#dos.set(doId, state);
48+
}
49+
50+
get actors() {
51+
return this.#actors;
4252
}
4353
}
4454

@@ -51,6 +61,15 @@ class ActorHandler {
5161
actor?: AnyActorInstance;
5262
actorPromise?: ReturnType<typeof promiseWithResolvers<void>> =
5363
promiseWithResolvers();
64+
65+
/**
66+
* Indicates if `startDestroy` has been called.
67+
*
68+
* This is stored in memory instead of SQLite since the destroy may be cancelled.
69+
*
70+
* See the corresponding `destroyed` property in SQLite metadata.
71+
*/
72+
destroying: boolean = false;
5473
}
5574

5675
export class CloudflareActorsActorDriver implements ActorDriver {
@@ -59,7 +78,6 @@ export class CloudflareActorsActorDriver implements ActorDriver {
5978
#managerDriver: ManagerDriver;
6079
#inlineClient: Client<any>;
6180
#globalState: CloudflareDurableObjectGlobalState;
62-
#actors: Map<string, ActorHandler> = new Map();
6381

6482
constructor(
6583
registryConfig: RegistryConfig,
@@ -76,12 +94,17 @@ export class CloudflareActorsActorDriver implements ActorDriver {
7694
}
7795

7896
#getDOCtx(actorId: string) {
79-
return this.#globalState.getDOState(actorId).ctx;
97+
// Parse actor ID to get DO ID
98+
const [doId] = parseActorId(actorId);
99+
return this.#globalState.getDOState(doId).ctx;
80100
}
81101

82102
async loadActor(actorId: string): Promise<AnyActorInstance> {
103+
// Parse actor ID to get DO ID and generation
104+
const [doId, expectedGeneration] = parseActorId(actorId);
105+
83106
// Check if actor is already loaded
84-
let handler = this.#actors.get(actorId);
107+
let handler = this.#globalState.actors.get(doId);
85108
if (handler) {
86109
if (handler.actorPromise) await handler.actorPromise.promise;
87110
if (!handler.actor) throw new Error("Actor should be loaded");
@@ -90,26 +113,38 @@ export class CloudflareActorsActorDriver implements ActorDriver {
90113

91114
// Create new actor handler
92115
handler = new ActorHandler();
93-
this.#actors.set(actorId, handler);
116+
this.#globalState.actors.set(doId, handler);
94117

95118
// Get the actor metadata from Durable Object storage
96-
const doState = this.#globalState.getDOState(actorId);
97-
const storage = doState.ctx.storage;
119+
const doState = this.#globalState.getDOState(doId);
120+
const sql = doState.ctx.storage.sql;
98121

99-
// Load actor metadata
100-
const [name, key] = await Promise.all([
101-
storage.get<string>(KEYS.NAME),
102-
storage.get<string[]>(KEYS.KEY),
103-
]);
122+
// Load actor metadata from SQL table
123+
const cursor = sql.exec(
124+
"SELECT name, key, destroyed, generation FROM _rivetkit_metadata LIMIT 1",
125+
);
126+
const result = cursor.raw().next();
104127

105-
if (!name) {
128+
if (result.done || !result.value) {
106129
throw new Error(
107-
`Actor ${actorId} is not initialized - missing name`,
130+
`Actor ${actorId} is not initialized - missing metadata`,
108131
);
109132
}
110-
if (!key) {
133+
134+
const name = result.value[0] as string;
135+
const key = JSON.parse(result.value[1] as string) as string[];
136+
const destroyed = result.value[2] as number;
137+
const generation = result.value[3] as number;
138+
139+
// Check if actor is destroyed
140+
if (destroyed) {
141+
throw new Error(`Actor ${actorId} is destroyed`);
142+
}
143+
144+
// Check if generation matches
145+
if (generation !== expectedGeneration) {
111146
throw new Error(
112-
`Actor ${actorId} is not initialized - missing key`,
147+
`Actor ${actorId} generation mismatch - expected ${expectedGeneration}, got ${generation}`,
113148
);
114149
}
115150

@@ -135,7 +170,9 @@ export class CloudflareActorsActorDriver implements ActorDriver {
135170
}
136171

137172
getContext(actorId: string): DriverContext {
138-
const state = this.#globalState.getDOState(actorId);
173+
// Parse actor ID to get DO ID
174+
const [doId] = parseActorId(actorId);
175+
const state = this.#globalState.getDOState(doId);
139176
return { state: state.ctx };
140177
}
141178

@@ -147,97 +184,85 @@ export class CloudflareActorsActorDriver implements ActorDriver {
147184
return this.#getDOCtx(actorId).storage.sql;
148185
}
149186

150-
// Batch KV operations - convert between Uint8Array and Cloudflare's string-based API
187+
// Batch KV operations
151188
async kvBatchPut(
152189
actorId: string,
153190
entries: [Uint8Array, Uint8Array][],
154191
): Promise<void> {
155-
const storage = this.#getDOCtx(actorId).storage;
156-
const encoder = new TextDecoder();
192+
const sql = this.#getDOCtx(actorId).storage.sql;
157193

158-
// Convert Uint8Array entries to object for Cloudflare batch put
159-
const storageObj: Record<string, Uint8Array> = {};
160194
for (const [key, value] of entries) {
161-
// Convert key from Uint8Array to string
162-
const keyStr = this.#uint8ArrayToKey(key);
163-
storageObj[keyStr] = value;
195+
kvPut(sql, key, value);
164196
}
165-
166-
await storage.put(storageObj);
167197
}
168198

169199
async kvBatchGet(
170200
actorId: string,
171201
keys: Uint8Array[],
172202
): Promise<(Uint8Array | null)[]> {
173-
const storage = this.#getDOCtx(actorId).storage;
174-
175-
// Convert keys to strings
176-
const keyStrs = keys.map((k) => this.#uint8ArrayToKey(k));
203+
const sql = this.#getDOCtx(actorId).storage.sql;
177204

178-
// Get values from storage
179-
const results = await storage.get<Uint8Array>(keyStrs);
205+
const results: (Uint8Array | null)[] = [];
206+
for (const key of keys) {
207+
results.push(kvGet(sql, key));
208+
}
180209

181-
// Convert Map results to array in same order as input keys
182-
return keyStrs.map((k) => results.get(k) ?? null);
210+
return results;
183211
}
184212

185213
async kvBatchDelete(actorId: string, keys: Uint8Array[]): Promise<void> {
186-
const storage = this.#getDOCtx(actorId).storage;
187-
188-
// Convert keys to strings
189-
const keyStrs = keys.map((k) => this.#uint8ArrayToKey(k));
214+
const sql = this.#getDOCtx(actorId).storage.sql;
190215

191-
await storage.delete(keyStrs);
216+
for (const key of keys) {
217+
kvDelete(sql, key);
218+
}
192219
}
193220

194221
async kvListPrefix(
195222
actorId: string,
196223
prefix: Uint8Array,
197224
): Promise<[Uint8Array, Uint8Array][]> {
198-
const storage = this.#getDOCtx(actorId).storage;
199-
200-
// Convert prefix to string
201-
const prefixStr = this.#uint8ArrayToKey(prefix);
225+
const sql = this.#getDOCtx(actorId).storage.sql;
202226

203-
// List with prefix
204-
const results = await storage.list<Uint8Array>({ prefix: prefixStr });
227+
return kvListPrefix(sql, prefix);
228+
}
205229

206-
// Convert Map to array of [key, value] tuples
207-
const entries: [Uint8Array, Uint8Array][] = [];
208-
for (const [key, value] of results) {
209-
entries.push([this.#keyToUint8Array(key), value]);
210-
}
230+
startDestroy(actorId: string): void {
231+
// Parse actor ID to get DO ID and generation
232+
const [doId, generation] = parseActorId(actorId);
211233

212-
return entries;
213-
}
234+
const handler = this.#globalState.actors.get(doId);
214235

215-
// Helper to convert Uint8Array key to string for Cloudflare storage
216-
#uint8ArrayToKey(key: Uint8Array): string {
217-
// Check if this is a connection key (starts with [2])
218-
if (key.length > 0 && key[0] === 2) {
219-
// Connection key - extract connId
220-
const connId = new TextDecoder().decode(key.slice(1));
221-
return `${KEYS.CONN_PREFIX}${connId}`;
236+
// Actor not loaded, nothing to destroy
237+
if (!handler || !handler.actor) {
238+
return;
222239
}
223-
// Otherwise, treat as persist data key [1]
224-
return KEYS.PERSIST_DATA;
225-
}
226240

227-
// Helper to convert string key back to Uint8Array
228-
#keyToUint8Array(key: string): Uint8Array {
229-
if (key.startsWith(KEYS.CONN_PREFIX)) {
230-
// Connection key
231-
const connId = key.slice(KEYS.CONN_PREFIX.length);
232-
const encoder = new TextEncoder();
233-
const connIdBytes = encoder.encode(connId);
234-
const result = new Uint8Array(1 + connIdBytes.length);
235-
result[0] = 2; // Connection prefix
236-
result.set(connIdBytes, 1);
237-
return result;
241+
// Check if already destroying
242+
if (handler.destroying) {
243+
return;
238244
}
239-
// Persist data key
240-
return Uint8Array.from([1]);
245+
handler.destroying = true;
246+
247+
// Stop
248+
handler.actor.onStop("destroy");
249+
250+
// Remove state
251+
const doState = this.#globalState.getDOState(doId);
252+
const sql = doState.ctx.storage.sql;
253+
sql.exec("UPDATE _rivetkit_metadata SET destroyed = 1 WHERE 1=1");
254+
sql.exec("DELETE FROM _rivetkit_kv_storage");
255+
256+
// Clear any scheduled alarms
257+
doState.ctx.storage.deleteAlarm();
258+
259+
// Delete from ACTOR_KV in the background - use full actorId including generation
260+
const env = getCloudflareAmbientEnv();
261+
doState.ctx.waitUntil(
262+
env.ACTOR_KV.delete(GLOBAL_KV_KEYS.actorMetadata(actorId)),
263+
);
264+
265+
this.#globalState.actors.delete(doId);
241266
}
242267
}
243268

0 commit comments

Comments
 (0)