Skip to content
This repository was archived by the owner on Oct 22, 2025. It is now read-only.

Commit f2f76c3

Browse files
committed
chore: migrate persisted data storage to cbor (#1011)
1 parent 5523057 commit f2f76c3

File tree

20 files changed

+114
-126
lines changed

20 files changed

+114
-126
lines changed

examples/rivet/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
"typescript": "^5.5.2"
1515
},
1616
"dependencies": {
17-
"@rivetkit/actor": "https://pkg.pr.new/rivet-gg/rivetkit/@rivetkit/actor@cb1e6d4"
17+
"@rivetkit/actor": "https://pkg.pr.new/rivet-gg/rivetkit/@rivetkit/actor@7e018f2"
1818
},
1919
"stableVersion": "0.8.0"
2020
}

examples/rivet/src/registry.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,6 @@ export const counter = actor({
1414
});
1515

1616
export const registry = setup({
17-
actors: { counter },
17+
use: { counter },
1818
});
1919

packages/core/src/actor/driver.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,8 @@ export interface ActorDriver {
99
//load(): Promise<LoadOutput>;
1010
getContext(actorId: string): unknown;
1111

12-
readInput(actorId: string): Promise<unknown | undefined>;
13-
14-
readPersistedData(actorId: string): Promise<unknown | undefined>;
15-
writePersistedData(actorId: string, unknown: unknown): Promise<void>;
12+
readPersistedData(actorId: string): Promise<Uint8Array | undefined>;
13+
writePersistedData(actorId: string, data: Uint8Array): Promise<void>;
1614

1715
// Schedule
1816
setAlarm(actor: AnyActorInstance, timestamp: number): Promise<void>;

packages/core/src/actor/instance.ts

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import { processMessage } from "./protocol/message/mod";
2424
import { CachedSerializer } from "./protocol/serde";
2525
import { Schedule } from "./schedule";
2626
import { DeadlineError, Lock, deadline } from "./utils";
27+
import * as cbor from "cbor-x";
2728

2829
/**
2930
* Options for the `_saveState` method.
@@ -122,10 +123,10 @@ export class ActorInstance<S, CP, CS, V, I, AD, DB> {
122123
*
123124
* Any data that should be stored indefinitely should be held within this object.
124125
*/
125-
#persist!: PersistedActor<S, CP, CS>;
126+
#persist!: PersistedActor<S, CP, CS, I>;
126127

127128
/** Raw state without the proxy wrapper */
128-
#persistRaw!: PersistedActor<S, CP, CS>;
129+
#persistRaw!: PersistedActor<S, CP, CS, I>;
129130

130131
#writePersistLock = new Lock<void>(void 0);
131132

@@ -426,7 +427,7 @@ export class ActorInstance<S, CP, CS, V, I, AD, DB> {
426427
// Write to KV
427428
await this.#actorDriver.writePersistedData(
428429
this.#actorId,
429-
this.#persistRaw,
430+
cbor.encode(this.#persistRaw),
430431
);
431432

432433
logger().debug("persist saved");
@@ -443,7 +444,7 @@ export class ActorInstance<S, CP, CS, V, I, AD, DB> {
443444
/**
444445
* Creates proxy for `#persist` that handles automatically flagging when state needs to be updated.
445446
*/
446-
#setPersist(target: PersistedActor<S, CP, CS>) {
447+
#setPersist(target: PersistedActor<S, CP, CS, I>) {
447448
// Set raw persist object
448449
this.#persistRaw = target;
449450

@@ -514,11 +515,21 @@ export class ActorInstance<S, CP, CS, V, I, AD, DB> {
514515

515516
async #initialize() {
516517
// Read initial state
517-
const persistData = (await this.#actorDriver.readPersistedData(
518+
const persistDataBuffer = await this.#actorDriver.readPersistedData(
518519
this.#actorId,
519-
)) as PersistedActor<S, CP, CS>;
520-
521-
if (persistData !== undefined) {
520+
);
521+
invariant(
522+
persistDataBuffer !== undefined,
523+
"persist data has not been set, it should be set when initialized",
524+
);
525+
const persistData = cbor.decode(persistDataBuffer) as PersistedActor<
526+
S,
527+
CP,
528+
CS,
529+
I
530+
>;
531+
532+
if (persistData.hi) {
522533
logger().info("actor restoring", {
523534
connections: persistData.c.length,
524535
});
@@ -546,8 +557,6 @@ export class ActorInstance<S, CP, CS, V, I, AD, DB> {
546557
} else {
547558
logger().info("actor creating");
548559

549-
const input = (await this.#actorDriver.readInput(this.#actorId)) as I;
550-
551560
// Initialize actor state
552561
let stateData: unknown = undefined;
553562
if (this.stateEnabled) {
@@ -567,7 +576,7 @@ export class ActorInstance<S, CP, CS, V, I, AD, DB> {
567576
undefined,
568577
undefined
569578
>,
570-
{ input },
579+
{ input: persistData.i },
571580
);
572581
} else if ("state" in this.#config) {
573582
stateData = structuredClone(this.#config.state);
@@ -578,21 +587,24 @@ export class ActorInstance<S, CP, CS, V, I, AD, DB> {
578587
logger().debug("state not enabled");
579588
}
580589

581-
const persist: PersistedActor<S, CP, CS> = {
582-
s: stateData as S,
583-
c: [],
584-
e: [],
585-
};
590+
// Save state and mark as initialized
591+
persistData.s = stateData as S;
592+
persistData.hi = true;
586593

587594
// Update state
588595
logger().debug("writing state");
589-
await this.#actorDriver.writePersistedData(this.#actorId, persist);
596+
await this.#actorDriver.writePersistedData(
597+
this.#actorId,
598+
cbor.encode(persistData),
599+
);
590600

591-
this.#setPersist(persist);
601+
this.#setPersist(persistData);
592602

593603
// Notify creation
594604
if (this.#config.onCreate) {
595-
await this.#config.onCreate(this.actorContext, { input });
605+
await this.#config.onCreate(this.actorContext, {
606+
input: persistData.i,
607+
});
596608
}
597609
}
598610
}

packages/core/src/actor/persisted.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
/** State object that gets automatically persisted to storage. */
2-
export interface PersistedActor<S, CP, CS> {
2+
export interface PersistedActor<S, CP, CS, I> {
3+
// Input
4+
i?: I,
5+
// Has initialized
6+
hi: boolean,
37
// State
48
s: S;
59
// Connections

packages/core/src/driver-helpers/mod.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { PersistedActor } from "@/actor/persisted";
2+
13
export type { ActorInstance, AnyActorInstance } from "@/actor/instance";
24
export type {
35
AttemptAcquireLease,
@@ -27,3 +29,17 @@ export {
2729
HEADER_CONN_TOKEN,
2830
} from "@/actor/router-endpoints";
2931
export { RunConfigSchema, DriverConfigSchema } from "@/registry/run-config";
32+
import * as cbor from "cbor-x";
33+
34+
export function serializeEmptyPersistData(
35+
input: unknown | undefined,
36+
): Uint8Array {
37+
const persistData: PersistedActor<any, any, any, any> = {
38+
i: input,
39+
hi: false,
40+
s: undefined,
41+
c: [],
42+
e: [],
43+
};
44+
return cbor.encode(persistData);
45+
}

packages/core/src/drivers/memory/actor.ts

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,11 @@ export class MemoryActorDriver implements ActorDriver {
1414
return {};
1515
}
1616

17-
async readInput(actorId: string): Promise<unknown | undefined> {
18-
return this.#state.readInput(actorId);
19-
}
20-
21-
async readPersistedData(actorId: string): Promise<unknown | undefined> {
17+
async readPersistedData(actorId: string): Promise<Uint8Array | undefined> {
2218
return this.#state.readPersistedData(actorId);
2319
}
2420

25-
async writePersistedData(actorId: string, data: unknown): Promise<void> {
21+
async writePersistedData(actorId: string, data: Uint8Array): Promise<void> {
2622
this.#state.writePersistedData(actorId, data);
2723
}
2824

packages/core/src/drivers/memory/global-state.ts

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import type { ActorKey } from "@/actor/mod";
2+
import { serializeEmptyPersistData } from "@/driver-helpers/mod";
23

34
export interface ActorState {
45
id: string;
56
name: string;
67
key: ActorKey;
7-
persistedData: unknown;
8-
input?: unknown;
8+
persistedData: Uint8Array;
99
}
1010

1111
export class MemoryGlobalState {
@@ -19,32 +19,27 @@ export class MemoryGlobalState {
1919
return actor;
2020
}
2121

22-
readInput(actorId: string): unknown | undefined {
23-
return this.#getActor(actorId).input;
24-
}
25-
26-
readPersistedData(actorId: string): unknown | undefined {
22+
readPersistedData(actorId: string): Uint8Array | undefined {
2723
return this.#getActor(actorId).persistedData;
2824
}
2925

30-
writePersistedData(actorId: string, data: unknown) {
26+
writePersistedData(actorId: string, data: Uint8Array) {
3127
this.#getActor(actorId).persistedData = data;
3228
}
3329

3430
createActor(
3531
actorId: string,
3632
name: string,
3733
key: ActorKey,
38-
input?: unknown,
34+
input: unknown | undefined,
3935
): void {
4036
// Create actor state if it doesn't exist
4137
if (!this.#actors.has(actorId)) {
4238
this.#actors.set(actorId, {
4339
id: actorId,
4440
name,
4541
key,
46-
persistedData: undefined,
47-
input,
42+
persistedData: serializeEmptyPersistData(input),
4843
});
4944
} else {
5045
throw new Error(`Actor already exists for ID: ${actorId}`);

packages/core/src/drivers/rivet/actor-driver.ts

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,32 +16,18 @@ export class RivetActorDriver implements ActorDriver {
1616
return { ctx: this.#ctx };
1717
}
1818

19-
async readInput(_actorId: string): Promise<unknown | undefined> {
20-
// Read input
21-
//
22-
// We need to have a separate exists flag in order to represent `undefined`
23-
const entries = await this.#ctx.kv.getBatch([
24-
["rivetkit", "input", "exists"],
25-
["rivetkit", "input", "data"],
26-
]);
27-
28-
if (entries.get(["rivetkit", "input", "exists"]) === true) {
29-
return await entries.get(["rivetkit", "input", "data"]);
30-
} else {
31-
return undefined;
32-
}
33-
}
34-
35-
async readPersistedData(_actorId: string): Promise<unknown | undefined> {
36-
let data = await this.#ctx.kv.get(["rivetkit", "data"]);
19+
async readPersistedData(_actorId: string): Promise<Uint8Array | undefined> {
20+
let data = (await this.#ctx.kv.get(["rivetkit", "data"])) as
21+
| Uint8Array
22+
| undefined;
3723

3824
// HACK: Modify to be undefined if null. This will be fixed in Actors v2.
3925
if (data === null) data = undefined;
4026

4127
return data;
4228
}
4329

44-
async writePersistedData(_actorId: string, data: unknown): Promise<void> {
30+
async writePersistedData(_actorId: string, data: Uint8Array): Promise<void> {
4531
// Use "state" as the key for persisted data
4632
await this.#ctx.kv.put(["rivetkit", "data"], data);
4733
}

packages/core/src/drivers/rivet/actor.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { logger } from "./log";
1010
import { RivetManagerDriver } from "./manager-driver";
1111
import { type RivetClientConfig, getRivetClientConfig } from "./rivet-client";
1212
import { type RivetHandler, deserializeKeyFromTag } from "./util";
13+
import * as cbor from "cbor-x";
1314

1415
export function createActorHandler(
1516
registry: Registry<any>,
@@ -131,7 +132,9 @@ async function startActor(
131132
// TODO: This needs to assert this has only been called once
132133
// Initialize with data
133134
router.post("/initialize", async (c) => {
134-
const body = await c.req.json();
135+
const bodyBlob = await c.req.blob();
136+
const bodyBytes = await bodyBlob.bytes();
137+
const body = cbor.decode(bodyBytes);
135138

136139
logger().debug("received initialize request", {
137140
hasInput: !!body.input,
@@ -150,7 +153,7 @@ async function startActor(
150153
// Finish initialization
151154
initializedPromise.resolve(undefined);
152155

153-
return c.json({}, 200);
156+
return c.body(cbor.encode({}), 200);
154157
});
155158

156159
// Start server

0 commit comments

Comments
 (0)