diff --git a/packages/core/package.json b/packages/core/package.json index 77fa87ae6..c428d344c 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -150,7 +150,7 @@ "scripts": { "dev": "pnpm build --watch", "build": "tsup src/mod.ts src/client/mod.ts src/common/log.ts src/common/websocket.ts src/actor/errors.ts src/topologies/coordinate/mod.ts src/topologies/partition/mod.ts src/utils.ts src/driver-helpers/mod.ts src/driver-test-suite/mod.ts src/test/mod.ts src/inspector/mod.ts", - "build:schema": "node ../../packages/misc/bare-compiler/dist/cli.js compile schemas/client-protocol/v1.bare -o dist/schemas/client-protocol/v1.ts && node ../../packages/misc/bare-compiler/dist/cli.js compile schemas/file-system-driver/v1.bare -o dist/schemas/file-system-driver/v1.ts ", + "build:schema": "node ../../packages/misc/bare-compiler/dist/cli.js compile schemas/client-protocol/v1.bare -o dist/schemas/client-protocol/v1.ts && node ../../packages/misc/bare-compiler/dist/cli.js compile schemas/file-system-driver/v1.bare -o dist/schemas/file-system-driver/v1.ts && node ../../packages/misc/bare-compiler/dist/cli.js compile schemas/actor-persist/v1.bare -o dist/schemas/actor-persist/v1.ts", "check-types": "tsc --noEmit", "test": "vitest run", "test:watch": "vitest", diff --git a/packages/core/schemas/actor-persist/v1.bare b/packages/core/schemas/actor-persist/v1.bare new file mode 100644 index 000000000..3c50f950a --- /dev/null +++ b/packages/core/schemas/actor-persist/v1.bare @@ -0,0 +1,69 @@ +# MARK: Connection +# Represents an event subscription. +type PersistedSubscription struct { + # Event name + eventName: str +} + +# Represents a persisted connection to an actor. +type PersistedConnection struct { + # Connection ID + id: str + # Connection token + token: str + # Connection driver type + driver: str + # Connection driver state + driverState: data + # Connection parameters + parameters: data + # Connection state + state: data + # Authentication data + auth: optional + # Active subscriptions + subscriptions: list + # Last seen timestamp + lastSeen: u64 +} + +# MARK: Schedule Event +# Represents a generic scheduled event. +type GenericPersistedScheduleEvent struct { + # Action name + action: str + # Arguments for the action + # + # CBOR array + args: optional +} + +# Event kind union +type PersistedScheduleEventKind union { + GenericPersistedScheduleEvent +} + +# Scheduled event with metadata +type PersistedScheduleEvent struct { + # Event ID + eventId: str + # Timestamp when the event should fire + timestamp: u64 + # Event kind + kind: PersistedScheduleEventKind +} + +# MARK: Actor +# Represents the persisted state of an actor. +type PersistedActor struct { + # Input data passed to the actor on initialization + input: optional + # Whether the actor has been initialized + hasInitialized: bool + # Actor's state + state: data + # Active connections + connections: list + # Scheduled events + scheduledEvents: list +} diff --git a/packages/core/schemas/file-system-driver/v1.bare b/packages/core/schemas/file-system-driver/v1.bare index 46c382667..0a2ffb724 100644 --- a/packages/core/schemas/file-system-driver/v1.bare +++ b/packages/core/schemas/file-system-driver/v1.bare @@ -2,19 +2,17 @@ # MARK: Actor State # Represents the persisted state for an actor on disk. -# Note: createdAt is not persisted; it is derived from the file's birthtime. type ActorState struct { - id: str - name: str - key: list - persistedData: data + actorId: str + name: str + key: list + persistedData: data + createdAt: u64 } # MARK: Actor Alarm -# Represents a scheduled alarm for an actor. -# Stored per-actor; the actor id is implied by the filename. -# The timestamp is milliseconds since epoch. type ActorAlarm struct { - timestamp: uint + actorId: str + timestamp: uint } diff --git a/packages/core/src/actor/connection.ts b/packages/core/src/actor/connection.ts index 11779c311..9525dfe8b 100644 --- a/packages/core/src/actor/connection.ts +++ b/packages/core/src/actor/connection.ts @@ -68,15 +68,15 @@ export class Conn { #driver: ConnDriver; public get params(): CP { - return this.__persist.p; + return this.__persist.params; } public get auth(): AD { - return this.__persist.a as AD; + return this.__persist.authData as AD; } public get driver(): ConnectionDriver { - return this.__persist.d as ConnectionDriver; + return this.__persist.connDriver as ConnectionDriver; } public get _stateEnabled() { @@ -90,8 +90,8 @@ export class Conn { */ public get state(): CS { this.#validateStateEnabled(); - if (!this.__persist.s) throw new Error("state should exists"); - return this.__persist.s; + if (!this.__persist.state) throw new Error("state should exists"); + return this.__persist.state; } /** @@ -101,21 +101,21 @@ export class Conn { */ public set state(value: CS) { this.#validateStateEnabled(); - this.__persist.s = value; + this.__persist.state = value; } /** * Unique identifier for the connection. */ public get id(): ConnId { - return this.__persist.i; + return this.__persist.connId; } /** * Token used to authenticate this request. */ public get _token(): string { - return this.__persist.t; + return this.__persist.token; } /** @@ -129,7 +129,7 @@ export class Conn { * Timestamp of the last time the connection was seen, i.e. the last time the connection was active and checked for liveness. */ public get lastSeen(): number { - return this.__persist.l; + return this.__persist.lastSeen; } /** @@ -165,7 +165,12 @@ export class Conn { * @protected */ public _sendMessage(message: CachedSerializer) { - this.#driver.sendMessage?.(this.#actor, this, this.__persist.ds, message); + this.#driver.sendMessage?.( + this.#actor, + this, + this.__persist.connDriverState, + message, + ); } /** @@ -205,7 +210,12 @@ export class Conn { */ public async disconnect(reason?: string) { this.#status = "reconnecting"; - await this.#driver.disconnect(this.#actor, this, this.__persist.ds, reason); + await this.#driver.disconnect( + this.#actor, + this, + this.__persist.connDriverState, + reason, + ); } /** @@ -233,18 +243,18 @@ export class Conn { status: this.#status, newStatus, - lastSeen: this.__persist.l, + lastSeen: this.__persist.lastSeen, currentTs: newLastSeen, }); if (!isConnectionClosed) { - this.__persist.l = newLastSeen; + this.__persist.lastSeen = newLastSeen; } this.#status = newStatus; return { status: this.#status, - lastSeen: this.__persist.l, + lastSeen: this.__persist.lastSeen, }; } } diff --git a/packages/core/src/actor/instance.ts b/packages/core/src/actor/instance.ts index 387cb7002..c29395304 100644 --- a/packages/core/src/actor/instance.ts +++ b/packages/core/src/actor/instance.ts @@ -8,6 +8,8 @@ import { isCborSerializable, stringifyError } from "@/common/utils"; import type { UniversalWebSocket } from "@/common/websocket-interface"; import { ActorInspector } from "@/inspector/actor"; import type { Registry } from "@/mod"; +import type * as bareSchema from "@/schemas/actor-persist/mod"; +import { PERSISTED_ACTOR_VERSIONED } from "@/schemas/actor-persist/versioned"; import type * as protocol from "@/schemas/client-protocol/mod"; import { TO_CLIENT_VERSIONED } from "@/schemas/client-protocol/versioned"; import { bufferToArrayBuffer, SinglePromiseQueue } from "@/utils"; @@ -199,7 +201,7 @@ export class ActorInstance< this.#validateStateEnabled(); // Must return from `#persistRaw` in order to not return the `onchange` proxy - return this.#persistRaw.s as Record as unknown; + return this.#persistRaw.state as Record as unknown; }, getRpcs: async () => { return Object.keys(this.#config.actions); @@ -221,7 +223,7 @@ export class ActorInstance< // We have to use `...` so `on-change` recognizes the changes to `state` (i.e. set #persistChanged` to true). This is because: // 1. In `getState`, we returned the value from `persistRaw`, which does not have the Proxy to monitor state changes // 2. If we were to assign `state` to `#persist.s`, `on-change` would assume nothing changed since `state` is still === `#persist.s` since we returned a reference in `getState` - this.#persist.s = { ...(state as S) }; + this.#persist.state = { ...(state as S) }; await this.saveState({ immediate: true }); }, }; @@ -331,8 +333,8 @@ export class ActorInstance< } // Set alarm for next scheduled event if any exist after finishing initiation sequence - if (this.#persist.e.length > 0) { - await this.#queueSetAlarm(this.#persist.e[0].t); + if (this.#persist.scheduledEvents.length > 0) { + await this.#queueSetAlarm(this.#persist.scheduledEvents[0].timestamp); } logger().info("actor ready"); @@ -364,33 +366,25 @@ export class ActorInstance< async #scheduleEventInner(newEvent: PersistedScheduleEvent) { this.actorContext.log.info("scheduling event", newEvent); - // remove old ccl event - if ("ccl" in newEvent) { - const existingIndex = this.#persist.e.findIndex( - (event) => "ccl" in event, - ); - if (existingIndex !== -1) { - this.#persist.e.splice(existingIndex, 1); - } - } - // Insert event in to index - const insertIndex = this.#persist.e.findIndex((x) => x.t > newEvent.t); + const insertIndex = this.#persist.scheduledEvents.findIndex( + (x) => x.timestamp > newEvent.timestamp, + ); if (insertIndex === -1) { - this.#persist.e.push(newEvent); + this.#persist.scheduledEvents.push(newEvent); } else { - this.#persist.e.splice(insertIndex, 0, newEvent); + this.#persist.scheduledEvents.splice(insertIndex, 0, newEvent); } // Update alarm if: // - this is the newest event (i.e. at beginning of array) or // - this is the only event (i.e. the only event in the array) - if (insertIndex === 0 || this.#persist.e.length === 1) { + if (insertIndex === 0 || this.#persist.scheduledEvents.length === 1) { this.actorContext.log.info("setting alarm", { - timestamp: newEvent.t, - eventCount: this.#persist.e.length, + timestamp: newEvent.timestamp, + eventCount: this.#persist.scheduledEvents.length, }); - await this.#queueSetAlarm(newEvent.t); + await this.#queueSetAlarm(newEvent.timestamp); } } @@ -398,7 +392,7 @@ export class ActorInstance< const now = Date.now(); this.actorContext.log.debug("alarm triggered", { now, - events: this.#persist.e.length, + events: this.#persist.scheduledEvents.length, }); // Update sleep @@ -407,13 +401,15 @@ export class ActorInstance< this.#resetSleepTimer(); // Remove events from schedule that we're about to run - const runIndex = this.#persist.e.findIndex((x) => x.t <= now); + const runIndex = this.#persist.scheduledEvents.findIndex( + (x) => x.timestamp <= now, + ); if (runIndex === -1) { // No events are due yet. This will happen if timers fire slightly early. // Ensure we reschedule the alarm for the next upcoming event to avoid losing it. logger().warn("no events are due yet, time may have broken"); - if (this.#persist.e.length > 0) { - const nextTs = this.#persist.e[0].t; + if (this.#persist.scheduledEvents.length > 0) { + const nextTs = this.#persist.scheduledEvents[0].timestamp; this.actorContext.log.warn( "alarm fired early, rescheduling for next event", { @@ -427,17 +423,20 @@ export class ActorInstance< this.actorContext.log.debug("no events to run", { now }); return; } - const scheduleEvents = this.#persist.e.splice(0, runIndex + 1); + const scheduleEvents = this.#persist.scheduledEvents.splice( + 0, + runIndex + 1, + ); this.actorContext.log.debug("running events", { count: scheduleEvents.length, }); // Set alarm for next event - if (this.#persist.e.length > 0) { - const nextTs = this.#persist.e[0].t; + if (this.#persist.scheduledEvents.length > 0) { + const nextTs = this.#persist.scheduledEvents[0].timestamp; this.actorContext.log.info("setting next alarm", { nextTs, - remainingEvents: this.#persist.e.length, + remainingEvents: this.#persist.scheduledEvents.length, }); await this.#queueSetAlarm(nextTs); } @@ -445,41 +444,37 @@ export class ActorInstance< // Iterate by event key in order to ensure we call the events in order for (const event of scheduleEvents) { try { - if ("ccl" in event) { - this.#checkConnectionsLiveness(); - } else { - this.actorContext.log.info("running action for event", { - event: event.e, - timestamp: event.t, - action: event.k.g.a, - args: event.k.g.ar, - }); + this.actorContext.log.info("running action for event", { + event: event.eventId, + timestamp: event.timestamp, + action: event.kind.generic.actionName, + }); - // Look up function - const fn: unknown = this.#config.actions[event.k.g.a]; + // Look up function + const fn: unknown = this.#config.actions[event.kind.generic.actionName]; - if (!fn) throw new Error(`Missing action for alarm ${event.k.g.a}`); - if (typeof fn !== "function") - throw new Error( - `Alarm function lookup for ${event.k.g.a} returned ${typeof fn}`, - ); + if (!fn) + throw new Error( + `Missing action for alarm ${event.kind.generic.actionName}`, + ); + if (typeof fn !== "function") + throw new Error( + `Alarm function lookup for ${event.kind.generic.actionName} returned ${typeof fn}`, + ); - // Call function - try { - await fn.call( - undefined, - this.actorContext, - ...(event.k.g.ar || []), - ); - } catch (error) { - this.actorContext.log.error("error while running event", { - error: stringifyError(error), - event: event.e, - timestamp: event.t, - action: event.k.g.a, - args: event.k.g.ar, - }); - } + // Call function + try { + const args = event.kind.generic.args + ? cbor.decode(new Uint8Array(event.kind.generic.args)) + : []; + await fn.call(undefined, this.actorContext, ...args); + } catch (error) { + this.actorContext.log.error("error while running event", { + error: stringifyError(error), + event: event.eventId, + timestamp: event.timestamp, + action: event.kind.generic.actionName, + }); } } catch (error) { this.actorContext.log.error("internal error while running event", { @@ -496,9 +491,14 @@ export class ActorInstance< args: unknown[], ): Promise { return this.#scheduleEventInner({ - e: crypto.randomUUID(), - t: timestamp, - k: { g: { a: action, ar: args } }, + eventId: crypto.randomUUID(), + timestamp, + kind: { + generic: { + actionName: action, + args: bufferToArrayBuffer(cbor.encode(args)), + }, + }, }); } @@ -562,10 +562,11 @@ export class ActorInstance< // before writing to KV in order to avoid a race condition. this.#persistChanged = false; - // Write to KV + // Convert to BARE types and write to KV + const bareData = this.#convertToBarePersisted(this.#persistRaw); await this.#actorDriver.writePersistedData( this.#actorId, - cbor.encode(this.#persistRaw), + PERSISTED_ACTOR_VERSIONED.serializeWithEmbeddedVersion(bareData), ); logger().debug("persist saved"); @@ -640,12 +641,15 @@ export class ActorInstance< this.#persistChanged = true; // Inform the inspector about state changes - this.inspector.emitter.emit("stateUpdated", this.#persist.s); + this.inspector.emitter.emit("stateUpdated", this.#persist.state); // Call onStateChange if it exists if (this.#config.onStateChange && this.#ready) { try { - this.#config.onStateChange(this.actorContext, this.#persistRaw.s); + this.#config.onStateChange( + this.actorContext, + this.#persistRaw.state, + ); } catch (error) { logger().error("error in `_onStateChange`", { error: stringifyError(error), @@ -668,25 +672,24 @@ export class ActorInstance< persistDataBuffer !== undefined, "persist data has not been set, it should be set when initialized", ); - const persistData = cbor.decode(persistDataBuffer) as PersistedActor< - S, - CP, - CS, - I - >; - - if (persistData.hi) { + const bareData = + PERSISTED_ACTOR_VERSIONED.deserializeWithEmbeddedVersion( + persistDataBuffer, + ); + const persistData = this.#convertFromBarePersisted(bareData); + + if (persistData.hasInitiated) { logger().info("actor restoring", { - connections: persistData.c.length, + connections: persistData.connections.length, }); // Set initial state this.#setPersist(persistData); // Load connections - for (const connPersist of this.#persist.c) { + for (const connPersist of this.#persist.connections) { // Create connections - const driver = this.__getConnDriver(connPersist.d); + const driver = this.__getConnDriver(connPersist.connDriver); const conn = new Conn( this, connPersist, @@ -696,8 +699,8 @@ export class ActorInstance< this.#connections.set(conn.id, conn); // Register event subscriptions - for (const sub of connPersist.su) { - this.#addSubscription(sub.n, conn, true); + for (const sub of connPersist.subscriptions) { + this.#addSubscription(sub.eventName, conn, true); } } } else { @@ -722,7 +725,7 @@ export class ActorInstance< undefined, undefined >, - persistData.i!, + persistData.input!, ); } else if ("state" in this.#config) { stateData = structuredClone(this.#config.state); @@ -734,21 +737,22 @@ export class ActorInstance< } // Save state and mark as initialized - persistData.s = stateData as S; - persistData.hi = true; + persistData.state = stateData as S; + persistData.hasInitiated = true; // Update state logger().debug("writing state"); + const bareData = this.#convertToBarePersisted(persistData); await this.#actorDriver.writePersistedData( this.#actorId, - cbor.encode(persistData), + PERSISTED_ACTOR_VERSIONED.serializeWithEmbeddedVersion(bareData), ); this.#setPersist(persistData); // Notify creation if (this.#config.onCreate) { - await this.#config.onCreate(this.actorContext, persistData.i!); + await this.#config.onCreate(this.actorContext, persistData.input!); } } } @@ -767,9 +771,11 @@ export class ActorInstance< } // Remove from persist & save immediately - const connIdx = this.#persist.c.findIndex((c) => c.i === conn.id); + const connIdx = this.#persist.connections.findIndex( + (c) => c.connId === conn.id, + ); if (connIdx !== -1) { - this.#persist.c.splice(connIdx, 1); + this.#persist.connections.splice(connIdx, 1); this.saveState({ immediate: true, allowStoppingState: true }); } else { logger().warn("could not find persisted connection to remove", { @@ -891,15 +897,15 @@ export class ActorInstance< // Create connection const driver = this.__getConnDriver(driverId); const persist: PersistedConn = { - i: connectionId, - t: connectionToken, - d: driverId, - ds: driverState, - p: params, - s: state, - a: authData, - l: Date.now(), - su: [], + connId: connectionId, + token: connectionToken, + connDriver: driverId, + connDriverState: driverState, + params: params, + state: state, + authData: authData, + lastSeen: Date.now(), + subscriptions: [], }; const conn = new Conn( this, @@ -915,7 +921,7 @@ export class ActorInstance< this.#resetSleepTimer(); // Add to persistence & save immediately - this.#persist.c.push(persist); + this.#persist.connections.push(persist); this.saveState({ immediate: true }); // Handle connection @@ -1011,7 +1017,7 @@ export class ActorInstance< // // Don't update persistence if already restoring from persistence if (!fromPersist) { - connection.__persist.su.push({ n: eventName }); + connection.__persist.subscriptions.push({ eventName: eventName }); this.saveState({ immediate: true }); } @@ -1043,11 +1049,11 @@ export class ActorInstance< if (!fromRemoveConn) { connection.subscriptions.delete(eventName); - const subIdx = connection.__persist.su.findIndex( - (s) => s.n === eventName, + const subIdx = connection.__persist.subscriptions.findIndex( + (s) => s.eventName === eventName, ); if (subIdx !== -1) { - connection.__persist.su.splice(subIdx, 1); + connection.__persist.subscriptions.splice(subIdx, 1); } else { logger().warn("subscription does not exist with name", { eventName }); } @@ -1382,7 +1388,7 @@ export class ActorInstance< */ get state(): S { this.#validateStateEnabled(); - return this.#persist.s; + return this.#persist.state; } /** @@ -1404,7 +1410,7 @@ export class ActorInstance< */ set state(value: S) { this.#validateStateEnabled(); - this.#persist.s = value; + this.#persist.state = value; } get vars(): V { @@ -1698,4 +1704,84 @@ export class ActorInstance< logger().debug("background promises finished"); } } + + // MARK: BARE Conversion Helpers + #convertToBarePersisted( + persist: PersistedActor, + ): bareSchema.PersistedActor { + return { + input: + persist.input !== undefined + ? bufferToArrayBuffer(cbor.encode(persist.input)) + : null, + hasInitialized: persist.hasInitiated, + state: bufferToArrayBuffer(cbor.encode(persist.state)), + connections: persist.connections.map((conn) => ({ + id: conn.connId, + token: conn.token, + driver: conn.connDriver as string, + driverState: bufferToArrayBuffer( + cbor.encode(conn.connDriverState || {}), + ), + parameters: bufferToArrayBuffer(cbor.encode(conn.params || {})), + state: bufferToArrayBuffer(cbor.encode(conn.state || {})), + auth: + conn.authData !== undefined + ? bufferToArrayBuffer(cbor.encode(conn.authData)) + : null, + subscriptions: conn.subscriptions.map((sub) => ({ + eventName: sub.eventName, + })), + lastSeen: BigInt(conn.lastSeen), + })), + scheduledEvents: persist.scheduledEvents.map((event) => ({ + eventId: event.eventId, + timestamp: BigInt(event.timestamp), + kind: { + tag: "GenericPersistedScheduleEvent" as const, + val: { + action: event.kind.generic.actionName, + args: event.kind.generic.args ?? null, + }, + }, + })), + }; + } + + #convertFromBarePersisted( + bareData: bareSchema.PersistedActor, + ): PersistedActor { + return { + input: bareData.input + ? cbor.decode(new Uint8Array(bareData.input)) + : undefined, + hasInitiated: bareData.hasInitialized, + state: cbor.decode(new Uint8Array(bareData.state)), + connections: bareData.connections.map((conn) => ({ + connId: conn.id, + token: conn.token, + connDriver: conn.driver as ConnectionDriver, + connDriverState: cbor.decode(new Uint8Array(conn.driverState)), + params: cbor.decode(new Uint8Array(conn.parameters)), + state: cbor.decode(new Uint8Array(conn.state)), + authData: conn.auth + ? cbor.decode(new Uint8Array(conn.auth)) + : undefined, + subscriptions: conn.subscriptions.map((sub) => ({ + eventName: sub.eventName, + })), + lastSeen: Number(conn.lastSeen), + })), + scheduledEvents: bareData.scheduledEvents.map((event) => ({ + eventId: event.eventId, + timestamp: Number(event.timestamp), + kind: { + generic: { + actionName: event.kind.val.action, + args: event.kind.val.args, + }, + }, + })), + }; + } } diff --git a/packages/core/src/actor/persisted.ts b/packages/core/src/actor/persisted.ts index 688d6ac51..b3287b41f 100644 --- a/packages/core/src/actor/persisted.ts +++ b/packages/core/src/actor/persisted.ts @@ -2,58 +2,41 @@ import type { ConnectionDriver } from "./connection"; /** State object that gets automatically persisted to storage. */ export interface PersistedActor { - // Input - i?: I; - // Has initialized - hi: boolean; - // State - s: S; - // Connections - c: PersistedConn[]; - // Scheduled events - e: PersistedScheduleEvent[]; + input?: I; + hasInitiated: boolean; + state: S; + connections: PersistedConn[]; + scheduledEvents: PersistedScheduleEvent[]; } /** Object representing connection that gets persisted to storage. */ export interface PersistedConn { - // ID - i: string; - // Token - t: string; - // Connection driver - d: ConnectionDriver; - // Connection driver state - ds: unknown; - // Parameters - p: CP; - // State - s: CS; - // Auth data - a?: unknown; - // Subscriptions - su: PersistedSubscription[]; - // Last seen - l: number; + connId: string; + token: string; + connDriver: ConnectionDriver; + connDriverState: unknown; + params: CP; + state: CS; + authData?: unknown; + subscriptions: PersistedSubscription[]; + lastSeen: number; } export interface PersistedSubscription { - // Event name - n: string; + eventName: string; } export interface GenericPersistedScheduleEvent { - // Action name - a: string; - // Arguments - ar?: unknown[]; + actionName: string; + args: ArrayBuffer | null; } -export type PersistedScheduleEventKind = { g: GenericPersistedScheduleEvent }; +export type PersistedScheduleEventKind = { + generic: GenericPersistedScheduleEvent; +}; export interface PersistedScheduleEvent { - // Event ID - e: string; - // Timestamp - t: number; - k: PersistedScheduleEventKind; + eventId: string; + timestamp: number; + kind: PersistedScheduleEventKind; } diff --git a/packages/core/src/driver-helpers/utils.ts b/packages/core/src/driver-helpers/utils.ts index 1e60d2e10..293f6dd64 100644 --- a/packages/core/src/driver-helpers/utils.ts +++ b/packages/core/src/driver-helpers/utils.ts @@ -1,15 +1,17 @@ import * as cbor from "cbor-x"; -import type { PersistedActor } from "@/actor/persisted"; +import type * as schema from "@/schemas/actor-persist/mod"; +import { PERSISTED_ACTOR_VERSIONED } from "@/schemas/actor-persist/versioned"; +import { bufferToArrayBuffer } from "@/utils"; export function serializeEmptyPersistData( input: unknown | undefined, ): Uint8Array { - const persistData: PersistedActor = { - i: input, - hi: false, - s: undefined, - c: [], - e: [], + const persistData: schema.PersistedActor = { + input: input !== undefined ? bufferToArrayBuffer(cbor.encode(input)) : null, + hasInitialized: false, + state: bufferToArrayBuffer(cbor.encode(undefined)), + connections: [], + scheduledEvents: [], }; - return cbor.encode(persistData); + return PERSISTED_ACTOR_VERSIONED.serializeWithEmbeddedVersion(persistData); } diff --git a/packages/core/src/drivers/file-system/actor.ts b/packages/core/src/drivers/file-system/actor.ts index f52532d6b..d41dbf505 100644 --- a/packages/core/src/drivers/file-system/actor.ts +++ b/packages/core/src/drivers/file-system/actor.ts @@ -7,6 +7,7 @@ import type { ManagerDriver, } from "@/driver-helpers/mod"; import type { RegistryConfig, RunConfig } from "@/mod"; +import { bufferToArrayBuffer } from "@/utils"; import type { FileSystemGlobalState } from "./global-state"; export type ActorDriverContext = Record; @@ -61,15 +62,19 @@ export class FileSystemActorDriver implements ActorDriver { } async readPersistedData(actorId: string): Promise { - return (await this.#state.loadActorStateOrError(actorId)).persistedData; + return new Uint8Array( + (await this.#state.loadActorStateOrError(actorId)).persistedData, + ); } async writePersistedData(actorId: string, data: Uint8Array): Promise { const state = await this.#state.loadActorStateOrError(actorId); - state.persistedData = data; - // Save state to disk (pass state to avoid race with sleep/removal) - await this.#state.writeActor(actorId, state); + // Save state to disk + await this.#state.writeActor(actorId, { + ...state, + persistedData: bufferToArrayBuffer(data), + }); } async setAlarm(actor: AnyActorInstance, timestamp: number): Promise { diff --git a/packages/core/src/drivers/file-system/global-state.ts b/packages/core/src/drivers/file-system/global-state.ts index 2c3146d8b..a48527198 100644 --- a/packages/core/src/drivers/file-system/global-state.ts +++ b/packages/core/src/drivers/file-system/global-state.ts @@ -2,7 +2,6 @@ import * as crypto from "node:crypto"; import * as fsSync from "node:fs"; import * as fs from "node:fs/promises"; import * as path from "node:path"; -import * as cbor from "cbor-x"; import invariant from "invariant"; import { lookupInRegistry } from "@/actor/definition"; import { ActorAlreadyExists } from "@/actor/errors"; @@ -20,7 +19,13 @@ import { } from "@/driver-helpers/mod"; import type { RegistryConfig } from "@/registry/config"; import type { RunConfig } from "@/registry/run-config"; +import type * as schema from "@/schemas/file-system-driver/mod"; import { + ACTOR_ALARM_VERSIONED, + ACTOR_STATE_VERSIONED, +} from "@/schemas/file-system-driver/versioned"; +import { + bufferToArrayBuffer, type LongTimeoutHandle, SinglePromiseQueue, setLongTimeout, @@ -38,7 +43,7 @@ import { interface ActorEntry { id: string; - state?: ActorState; + state?: schema.ActorState; /** Promise for loading the actor state. */ loadPromise?: Promise; @@ -59,17 +64,6 @@ interface ActorEntry { removed: boolean; } -/** - * Interface representing a actor's state - */ -export interface ActorState { - id: string; - name: string; - key: ActorKey; - createdAt?: Date; - persistedData: Uint8Array; -} - /** * Global state for the file system driver */ @@ -148,7 +142,7 @@ export class FileSystemGlobalState { async *getActorsIterator(params: { cursor?: string; - }): AsyncGenerator { + }): AsyncGenerator { let actorIds = Array.from(this.#actors.keys()).sort(); // Check if state directory exists first @@ -213,10 +207,11 @@ export class FileSystemGlobalState { const entry = this.#upsertEntry(actorId); entry.state = { - id: actorId, + actorId, name, key, - persistedData: serializeEmptyPersistData(input), + createdAt: BigInt(Date.now()), + persistedData: bufferToArrayBuffer(serializeEmptyPersistData(input)), }; await this.writeActor(actorId, entry.state); return entry; @@ -255,13 +250,11 @@ export class FileSystemGlobalState { // Read & parse file try { const stateData = await fs.readFile(stateFilePath); - const state = cbor.decode(stateData) as ActorState; - - const stats = await fs.stat(stateFilePath); - state.createdAt = stats.birthtime; // Cache the loaded state in handler - entry.state = state; + entry.state = ACTOR_STATE_VERSIONED.deserializeWithEmbeddedVersion( + new Uint8Array(stateData), + ); return entry; } catch (innerError: any) { @@ -289,10 +282,11 @@ export class FileSystemGlobalState { // If no state for this actor, then create & write state if (!entry.state) { entry.state = { - id: actorId, + actorId, name, - key, - persistedData: serializeEmptyPersistData(input), + key: key as readonly string[], + createdAt: BigInt(Date.now()), + persistedData: bufferToArrayBuffer(serializeEmptyPersistData(input)), }; await this.writeActor(actorId, entry.state); } @@ -326,7 +320,7 @@ export class FileSystemGlobalState { /** * Save actor state to disk. */ - async writeActor(actorId: string, state: ActorState): Promise { + async writeActor(actorId: string, state: schema.ActorState): Promise { if (!this.#persist) { return; } @@ -347,7 +341,12 @@ export class FileSystemGlobalState { const tempPath = `${alarmPath}.tmp.${crypto.randomUUID()}`; try { await ensureDirectoryExists(path.dirname(alarmPath)); - const data = cbor.encode(timestamp); + const alarmData: schema.ActorAlarm = { + actorId, + timestamp: BigInt(timestamp), + }; + const data = + ACTOR_ALARM_VERSIONED.serializeWithEmbeddedVersion(alarmData); await fs.writeFile(tempPath, data); await fs.rename(tempPath, alarmPath); } catch (error) { @@ -366,7 +365,10 @@ export class FileSystemGlobalState { /** * Perform the actual write operation with atomic writes */ - async #performWrite(actorId: string, state: ActorState): Promise { + async #performWrite( + actorId: string, + state: schema.ActorState, + ): Promise { const dataPath = this.getActorStatePath(actorId); // Generate unique temp filename to prevent any race conditions const tempPath = `${dataPath}.tmp.${crypto.randomUUID()}`; @@ -375,8 +377,18 @@ export class FileSystemGlobalState { // Create directory if needed await ensureDirectoryExists(path.dirname(dataPath)); + // Convert to BARE types for serialization + const bareState: schema.ActorState = { + actorId: state.actorId, + name: state.name, + key: state.key, + createdAt: state.createdAt, + persistedData: state.persistedData, + }; + // Perform atomic write - const serializedState = cbor.encode(state); + const serializedState = + ACTOR_STATE_VERSIONED.serializeWithEmbeddedVersion(bareState); await fs.writeFile(tempPath, serializedState); await fs.rename(tempPath, dataPath); } catch (error) { @@ -468,7 +480,7 @@ export class FileSystemGlobalState { inlineClient, actorId, entry.state.name, - entry.state.key, + entry.state.key as string[], "unknown", ); @@ -480,6 +492,7 @@ export class FileSystemGlobalState { } catch (innerError) { const error = new Error( `Failed to start actor ${actorId}: ${innerError}`, + { cause: innerError }, ); entry.startPromise?.reject(error); entry.startPromise = undefined; @@ -487,7 +500,7 @@ export class FileSystemGlobalState { } } - async loadActorStateOrError(actorId: string): Promise { + async loadActorStateOrError(actorId: string): Promise { const state = (await this.loadActor(actorId)).state; if (!state) throw new Error(`Actor does not exist: ${actorId}`); return state; @@ -517,9 +530,13 @@ export class FileSystemGlobalState { const fullPath = path.join(this.#alarmsDir, file); try { const buf = fsSync.readFileSync(fullPath); - const timestamp = cbor.decode(buf) as number; - if (typeof timestamp === "number" && Number.isFinite(timestamp)) { - this.#scheduleAlarmTimeout(file, timestamp); + const alarmData = + ACTOR_ALARM_VERSIONED.deserializeWithEmbeddedVersion( + new Uint8Array(buf), + ); + const timestamp = Number(alarmData.timestamp); + if (Number.isFinite(timestamp)) { + this.#scheduleAlarmTimeout(alarmData.actorId, timestamp); } else { logger().debug("invalid alarm file contents", { file }); } diff --git a/packages/core/src/drivers/file-system/manager.ts b/packages/core/src/drivers/file-system/manager.ts index ed400448f..3eecaaa69 100644 --- a/packages/core/src/drivers/file-system/manager.ts +++ b/packages/core/src/drivers/file-system/manager.ts @@ -28,7 +28,8 @@ import { type RegistryConfig, type RunConfig, } from "@/mod"; -import type { ActorState, FileSystemGlobalState } from "./global-state"; +import type * as schema from "@/schemas/file-system-driver/mod"; +import type { FileSystemGlobalState } from "./global-state"; import { logger } from "./log"; import { generateActorId } from "./utils"; @@ -60,14 +61,13 @@ export class FileSystemManagerDriver implements ManagerDriver { this.#state.getOrCreateInspectorAccessToken(); } const startedAt = new Date().toISOString(); - function transformActor(actorState: ActorState): Actor { + function transformActor(actorState: schema.ActorState): Actor { return { - id: actorState.id as ActorId, + id: actorState.actorId as ActorId, name: actorState.name, - key: actorState.key, + key: actorState.key as string[], startedAt: startedAt, - createdAt: - actorState.createdAt?.toISOString() || new Date().toISOString(), + createdAt: new Date(Number(actorState.createdAt)).toISOString(), features: [ ActorFeature.State, ActorFeature.Connections, @@ -234,7 +234,7 @@ export class FileSystemManagerDriver implements ManagerDriver { return { actorId, name: actor.state.name, - key: actor.state.key, + key: actor.state.key as string[], }; } catch (error) { logger().error("failed to read actor state", { actorId, error }); @@ -278,9 +278,9 @@ export class FileSystemManagerDriver implements ManagerDriver { invariant(actorEntry.state, "must have state"); return { - actorId: actorEntry.state.id, + actorId: actorEntry.state.actorId, name: actorEntry.state.name, - key: actorEntry.state.key, + key: actorEntry.state.key as string[], }; } diff --git a/packages/core/src/schemas/actor-persist/mod.ts b/packages/core/src/schemas/actor-persist/mod.ts new file mode 100644 index 000000000..4e67d4023 --- /dev/null +++ b/packages/core/src/schemas/actor-persist/mod.ts @@ -0,0 +1 @@ +export * from "../../../dist/schemas/actor-persist/v1"; diff --git a/packages/core/src/schemas/actor-persist/versioned.ts b/packages/core/src/schemas/actor-persist/versioned.ts new file mode 100644 index 000000000..86aa4b395 --- /dev/null +++ b/packages/core/src/schemas/actor-persist/versioned.ts @@ -0,0 +1,25 @@ +import { + createVersionedDataHandler, + type MigrationFn, +} from "@rivetkit/versioned-data-util"; +import * as v1 from "../../../dist/schemas/actor-persist/v1"; + +export const CURRENT_VERSION = 1; + +export type CurrentPersistedActor = v1.PersistedActor; +export type CurrentPersistedConnection = v1.PersistedConnection; +export type CurrentPersistedSubscription = v1.PersistedSubscription; +export type CurrentGenericPersistedScheduleEvent = + v1.GenericPersistedScheduleEvent; +export type CurrentPersistedScheduleEventKind = v1.PersistedScheduleEventKind; +export type CurrentPersistedScheduleEvent = v1.PersistedScheduleEvent; + +const migrations = new Map>(); + +export const PERSISTED_ACTOR_VERSIONED = + createVersionedDataHandler({ + currentVersion: CURRENT_VERSION, + migrations, + serializeVersion: (data) => v1.encodePersistedActor(data), + deserializeVersion: (bytes) => v1.decodePersistedActor(bytes), + }); diff --git a/packages/core/src/utils.ts b/packages/core/src/utils.ts index 1350cd668..a016565a3 100644 --- a/packages/core/src/utils.ts +++ b/packages/core/src/utils.ts @@ -164,7 +164,7 @@ export class SinglePromiseQueue { } } -export function bufferToArrayBuffer(buf: Buffer): ArrayBuffer { +export function bufferToArrayBuffer(buf: Buffer | Uint8Array): ArrayBuffer { return buf.buffer.slice( buf.byteOffset, buf.byteOffset + buf.byteLength,