Skip to content

Commit eb0e6f2

Browse files
committed
chore(rivetkit): move conns to separate persisted kv keys
1 parent c0334fa commit eb0e6f2

File tree

20 files changed

+1416
-966
lines changed

20 files changed

+1416
-966
lines changed

rivetkit-typescript/packages/cloudflare-workers/src/actor-driver.ts

Lines changed: 93 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -139,21 +139,106 @@ export class CloudflareActorsActorDriver implements ActorDriver {
139139
return { state: state.ctx };
140140
}
141141

142-
async readPersistedData(actorId: string): Promise<Uint8Array | undefined> {
143-
return await this.#getDOCtx(actorId).storage.get(KEYS.PERSIST_DATA);
144-
}
145-
146-
async writePersistedData(actorId: string, data: Uint8Array): Promise<void> {
147-
await this.#getDOCtx(actorId).storage.put(KEYS.PERSIST_DATA, data);
148-
}
149-
150142
async setAlarm(actor: AnyActorInstance, timestamp: number): Promise<void> {
151143
await this.#getDOCtx(actor.id).storage.setAlarm(timestamp);
152144
}
153145

154146
async getDatabase(actorId: string): Promise<unknown | undefined> {
155147
return this.#getDOCtx(actorId).storage.sql;
156148
}
149+
150+
// Batch KV operations - convert between Uint8Array and Cloudflare's string-based API
151+
async kvBatchPut(
152+
actorId: string,
153+
entries: [Uint8Array, Uint8Array][],
154+
): Promise<void> {
155+
const storage = this.#getDOCtx(actorId).storage;
156+
const encoder = new TextDecoder();
157+
158+
// Convert Uint8Array entries to object for Cloudflare batch put
159+
const storageObj: Record<string, Uint8Array> = {};
160+
for (const [key, value] of entries) {
161+
// Convert key from Uint8Array to string
162+
const keyStr = this.#uint8ArrayToKey(key);
163+
storageObj[keyStr] = value;
164+
}
165+
166+
await storage.put(storageObj);
167+
}
168+
169+
async kvBatchGet(
170+
actorId: string,
171+
keys: Uint8Array[],
172+
): Promise<(Uint8Array | null)[]> {
173+
const storage = this.#getDOCtx(actorId).storage;
174+
175+
// Convert keys to strings
176+
const keyStrs = keys.map((k) => this.#uint8ArrayToKey(k));
177+
178+
// Get values from storage
179+
const results = await storage.get<Uint8Array>(keyStrs);
180+
181+
// Convert Map results to array in same order as input keys
182+
return keyStrs.map((k) => results.get(k) ?? null);
183+
}
184+
185+
async kvBatchDelete(actorId: string, keys: Uint8Array[]): Promise<void> {
186+
const storage = this.#getDOCtx(actorId).storage;
187+
188+
// Convert keys to strings
189+
const keyStrs = keys.map((k) => this.#uint8ArrayToKey(k));
190+
191+
await storage.delete(keyStrs);
192+
}
193+
194+
async kvListPrefix(
195+
actorId: string,
196+
prefix: Uint8Array,
197+
): Promise<[Uint8Array, Uint8Array][]> {
198+
const storage = this.#getDOCtx(actorId).storage;
199+
200+
// Convert prefix to string
201+
const prefixStr = this.#uint8ArrayToKey(prefix);
202+
203+
// List with prefix
204+
const results = await storage.list<Uint8Array>({ prefix: prefixStr });
205+
206+
// Convert Map to array of [key, value] tuples
207+
const entries: [Uint8Array, Uint8Array][] = [];
208+
for (const [key, value] of results) {
209+
entries.push([this.#keyToUint8Array(key), value]);
210+
}
211+
212+
return entries;
213+
}
214+
215+
// Helper to convert Uint8Array key to string for Cloudflare storage
216+
#uint8ArrayToKey(key: Uint8Array): string {
217+
// Check if this is a connection key (starts with [2])
218+
if (key.length > 0 && key[0] === 2) {
219+
// Connection key - extract connId
220+
const connId = new TextDecoder().decode(key.slice(1));
221+
return `${KEYS.CONN_PREFIX}${connId}`;
222+
}
223+
// Otherwise, treat as persist data key [1]
224+
return KEYS.PERSIST_DATA;
225+
}
226+
227+
// Helper to convert string key back to Uint8Array
228+
#keyToUint8Array(key: string): Uint8Array {
229+
if (key.startsWith(KEYS.CONN_PREFIX)) {
230+
// Connection key
231+
const connId = key.slice(KEYS.CONN_PREFIX.length);
232+
const encoder = new TextEncoder();
233+
const connIdBytes = encoder.encode(connId);
234+
const result = new Uint8Array(1 + connIdBytes.length);
235+
result[0] = 2; // Connection prefix
236+
result.set(connIdBytes, 1);
237+
return result;
238+
}
239+
// Persist data key
240+
return Uint8Array.from([1]);
241+
}
157242
}
158243

159244
export function createCloudflareActorsActorDriverBuilder(

rivetkit-typescript/packages/cloudflare-workers/src/actor-handler-do.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,14 @@ export const KEYS = {
2020
NAME: "rivetkit:name",
2121
KEY: "rivetkit:key",
2222
PERSIST_DATA: "rivetkit:data",
23+
CONN_PREFIX: "rivetkit:conn:",
2324
};
2425

26+
// Helper to create a connection key for Cloudflare
27+
export function makeCloudflareConnKey(connId: string): string {
28+
return `${KEYS.CONN_PREFIX}${connId}`;
29+
}
30+
2531
export interface ActorHandlerInterface extends DurableObject {
2632
initialize(req: ActorInitRequest): Promise<void>;
2733
}

rivetkit-typescript/packages/rivetkit/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@
153153
],
154154
"scripts": {
155155
"build": "tsup src/mod.ts src/client/mod.ts src/common/log.ts src/common/websocket.ts src/actor/errors.ts src/topologies/coordinate/mod.ts src/topologies/partition/mod.ts src/utils.ts src/driver-helpers/mod.ts src/driver-test-suite/mod.ts src/test/mod.ts src/inspector/mod.ts",
156-
"build:schema": "./scripts/compile-bare.ts compile schemas/client-protocol/v1.bare -o dist/schemas/client-protocol/v1.ts && ./scripts/compile-bare.ts compile schemas/client-protocol/v2.bare -o dist/schemas/client-protocol/v2.ts && ./scripts/compile-bare.ts compile schemas/file-system-driver/v1.bare -o dist/schemas/file-system-driver/v1.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v1.bare -o dist/schemas/actor-persist/v1.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v2.bare -o dist/schemas/actor-persist/v2.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v3.bare -o dist/schemas/actor-persist/v3.ts",
156+
"build:schema": "./scripts/compile-bare.ts compile schemas/client-protocol/v1.bare -o dist/schemas/client-protocol/v1.ts && ./scripts/compile-bare.ts compile schemas/client-protocol/v2.bare -o dist/schemas/client-protocol/v2.ts && ./scripts/compile-bare.ts compile schemas/file-system-driver/v1.bare -o dist/schemas/file-system-driver/v1.ts && ./scripts/compile-bare.ts compile schemas/file-system-driver/v2.bare -o dist/schemas/file-system-driver/v2.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v1.bare -o dist/schemas/actor-persist/v1.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v2.bare -o dist/schemas/actor-persist/v2.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v3.bare -o dist/schemas/actor-persist/v3.ts",
157157
"check-types": "tsc --noEmit",
158158
"test": "vitest run",
159159
"test:watch": "vitest",
Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
11
# MARK: Connection
2+
type Subscription struct {
3+
eventName: str
4+
}
5+
26
# Connection associated with hibernatable WebSocket that should persist across lifecycles.
3-
type PersistedHibernatableConn struct {
7+
type HibernatableConn struct {
48
# Connection ID generated by RivetKit
59
id: str
610
parameters: data
711
state: data
12+
subscriptions: list<Subscription>
813

914
# Request ID of the hibernatable WebSocket
1015
hibernatableRequestId: data
@@ -15,19 +20,19 @@ type PersistedHibernatableConn struct {
1520
}
1621

1722
# MARK: Schedule Event
18-
type PersistedScheduleEvent struct {
23+
type ScheduleEvent struct {
1924
eventId: str
2025
timestamp: i64
2126
action: str
2227
args: optional<data>
2328
}
2429

2530
# MARK: Actor
26-
type PersistedActor struct {
31+
type Actor struct {
2732
# Input data passed to the actor on initialization
2833
input: optional<data>
2934
hasInitialized: bool
3035
state: data
31-
hibernatableConns: list<PersistedHibernatableConn>
32-
scheduledEvents: list<PersistedScheduleEvent>
36+
hibernatableConns: list<HibernatableConn>
37+
scheduledEvents: list<ScheduleEvent>
3338
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# File System Driver Schema (v2)
2+
3+
# MARK: Actor State
4+
type ActorKvEntry struct {
5+
key: data
6+
value: data
7+
}
8+
9+
type ActorState struct {
10+
actorId: str
11+
name: str
12+
key: list<str>
13+
# KV storage map for actor and connection data
14+
# Keys are strings (base64 encoded), values are byte arrays
15+
kvStorage: list<ActorKvEntry>
16+
createdAt: u64
17+
}
18+
19+
# MARK: Actor Alarm
20+
type ActorAlarm struct {
21+
actorId: str
22+
timestamp: uint
23+
}

rivetkit-typescript/packages/rivetkit/src/actor/conn.ts

Lines changed: 85 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import * as cbor from "cbor-x";
2+
import onChange from "on-change";
3+
import { isCborSerializable } from "@/common/utils";
24
import type * as protocol from "@/schemas/client-protocol/mod";
35
import { TO_CLIENT_VERSIONED } from "@/schemas/client-protocol/versioned";
46
import { arrayBuffersEqual, bufferToArrayBuffer } from "@/utils";
@@ -44,7 +46,13 @@ export class Conn<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
4446
* This will only be persisted if using hibernatable WebSockets. If not,
4547
* this is just used to hole state.
4648
*/
47-
__persist: PersistedConn<CP, CS>;
49+
__persist!: PersistedConn<CP, CS>;
50+
51+
/** Raw persist object without the proxy wrapper */
52+
#persistRaw: PersistedConn<CP, CS>;
53+
54+
/** Track if this connection's state has changed */
55+
#changed = false;
4856

4957
get __driverState(): ConnDriverState | undefined {
5058
return this.__socket?.driverState;
@@ -103,9 +111,9 @@ export class Conn<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
103111
return false;
104112
}
105113
return (
106-
this.#actor[PERSIST_SYMBOL].hibernatableWebSocket.findIndex((x) =>
114+
this.#actor[PERSIST_SYMBOL].hibernatableConns.findIndex((conn) =>
107115
arrayBuffersEqual(
108-
x.requestId,
116+
conn.hibernatableRequestId,
109117
this.__persist.hibernatableRequestId!,
110118
),
111119
) > -1
@@ -131,7 +139,80 @@ export class Conn<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
131139
persist: PersistedConn<CP, CS>,
132140
) {
133141
this.#actor = actor;
134-
this.__persist = persist;
142+
this.#persistRaw = persist;
143+
this.#setupPersistProxy(persist);
144+
}
145+
146+
/**
147+
* Sets up the proxy for connection persistence with change tracking
148+
*/
149+
#setupPersistProxy(persist: PersistedConn<CP, CS>) {
150+
// If this can't be proxied, return raw value
151+
if (persist === null || typeof persist !== "object") {
152+
this.__persist = persist;
153+
return;
154+
}
155+
156+
// Listen for changes to the object
157+
this.__persist = onChange(
158+
persist,
159+
(
160+
path: string,
161+
value: any,
162+
_previousValue: any,
163+
_applyData: any,
164+
) => {
165+
// Validate CBOR serializability for state changes
166+
if (path.startsWith("state")) {
167+
let invalidPath = "";
168+
if (
169+
!isCborSerializable(
170+
value,
171+
(invalidPathPart: string) => {
172+
invalidPath = invalidPathPart;
173+
},
174+
"",
175+
)
176+
) {
177+
throw new errors.InvalidStateType({
178+
path: path + (invalidPath ? `.${invalidPath}` : ""),
179+
});
180+
}
181+
}
182+
183+
this.#changed = true;
184+
this.#actor.rLog.debug({
185+
msg: "conn onChange triggered",
186+
connId: this.id,
187+
path,
188+
});
189+
190+
// Notify actor that this connection has changed
191+
this.#actor.__markConnChanged(this);
192+
},
193+
{ ignoreDetached: true },
194+
);
195+
}
196+
197+
/**
198+
* Returns whether this connection has unsaved changes
199+
*/
200+
get hasChanges(): boolean {
201+
return this.#changed;
202+
}
203+
204+
/**
205+
* Marks changes as saved
206+
*/
207+
markSaved() {
208+
this.#changed = false;
209+
}
210+
211+
/**
212+
* Gets the raw persist data for serialization
213+
*/
214+
get persistRaw(): PersistedConn<CP, CS> {
215+
return this.#persistRaw;
135216
}
136217

137218
#validateStateEnabled() {

rivetkit-typescript/packages/rivetkit/src/actor/driver.ts

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,27 @@ export interface ActorDriver {
1919

2020
getContext(actorId: string): unknown;
2121

22-
readPersistedData(actorId: string): Promise<Uint8Array | undefined>;
22+
// Batch KV operations
23+
/** Batch write multiple key-value pairs. Keys and values are Uint8Arrays. */
24+
kvBatchPut(
25+
actorId: string,
26+
entries: [Uint8Array, Uint8Array][],
27+
): Promise<void>;
2328

24-
/** ActorInstance ensure that only one instance of writePersistedData is called in parallel at a time. */
25-
writePersistedData(actorId: string, data: Uint8Array): Promise<void>;
29+
/** Batch read multiple keys. Returns null for keys that don't exist. */
30+
kvBatchGet(
31+
actorId: string,
32+
keys: Uint8Array[],
33+
): Promise<(Uint8Array | null)[]>;
34+
35+
/** Batch delete multiple keys. */
36+
kvBatchDelete(actorId: string, keys: Uint8Array[]): Promise<void>;
37+
38+
/** List all keys with a given prefix. */
39+
kvListPrefix(
40+
actorId: string,
41+
prefix: Uint8Array,
42+
): Promise<[Uint8Array, Uint8Array][]>;
2643

2744
// Schedule
2845
/** ActorInstance ensure that only one instance of setAlarm is called in parallel at a time. */

0 commit comments

Comments
 (0)