diff --git a/.gitignore b/.gitignore index bfb1e5e86..fbba480f7 100644 --- a/.gitignore +++ b/.gitignore @@ -184,3 +184,4 @@ Cargo.lock **/.wrangler **/.DS_Store .aider* +/packages/core/dist/schemas/ diff --git a/package.json b/package.json index cfffa4e82..3f9f44d26 100644 --- a/package.json +++ b/package.json @@ -13,6 +13,7 @@ }, "devDependencies": { "@biomejs/biome": "^2.1.1", + "tsup": "^8.4.0", "@types/ws": "^8.5.14", "commander": "^14.0.0", "dedent": "^1.5.3", diff --git a/packages/core/package.json b/packages/core/package.json index fdc70cfbd..77fa87ae6 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -24,122 +24,122 @@ "exports": { ".": { "import": { - "types": "./dist/mod.d.ts", - "default": "./dist/mod.js" + "types": "./dist/tsup/mod.d.ts", + "default": "./dist/tsup/mod.js" }, "require": { - "types": "./dist/mod.d.cts", - "default": "./dist/mod.cjs" + "types": "./dist/tsup/mod.d.cts", + "default": "./dist/tsup/mod.cjs" } }, "./client": { "import": { - "types": "./dist/client/mod.d.ts", - "default": "./dist/client/mod.js" + "types": "./dist/tsup/client/mod.d.ts", + "default": "./dist/tsup/client/mod.js" }, "require": { - "types": "./dist/client/mod.d.cts", - "default": "./dist/client/mod.cjs" + "types": "./dist/tsup/client/mod.d.cts", + "default": "./dist/tsup/client/mod.cjs" } }, "./log": { "import": { - "types": "./dist/common/log.d.ts", - "default": "./dist/common/log.js" + "types": "./dist/tsup/common/log.d.ts", + "default": "./dist/tsup/common/log.js" }, "require": { - "types": "./dist/common/log.d.cts", - "default": "./dist/common/log.cjs" + "types": "./dist/tsup/common/log.d.cts", + "default": "./dist/tsup/common/log.cjs" } }, "./errors": { "import": { - "types": "./dist/actor/errors.d.ts", - "default": "./dist/actor/errors.js" + "types": "./dist/tsup/actor/errors.d.ts", + "default": "./dist/tsup/actor/errors.js" }, "require": { - "types": "./dist/actor/errors.d.cts", - "default": "./dist/actor/errors.cjs" + "types": "./dist/tsup/actor/errors.d.cts", + "default": "./dist/tsup/actor/errors.cjs" } }, "./utils": { "import": { - "types": "./dist/utils.d.ts", - "default": "./dist/utils.js" + "types": "./dist/tsup/utils.d.ts", + "default": "./dist/tsup/utils.js" }, "require": { - "types": "./dist/utils.d.cts", - "default": "./dist/utils.cjs" + "types": "./dist/tsup/utils.d.cts", + "default": "./dist/tsup/utils.cjs" } }, "./driver-helpers": { "import": { - "types": "./dist/driver-helpers/mod.d.ts", - "default": "./dist/driver-helpers/mod.js" + "types": "./dist/tsup/driver-helpers/mod.d.ts", + "default": "./dist/tsup/driver-helpers/mod.js" }, "require": { - "types": "./dist/driver-helpers/mod.d.cts", - "default": "./dist/driver-helpers/mod.cjs" + "types": "./dist/tsup/driver-helpers/mod.d.cts", + "default": "./dist/tsup/driver-helpers/mod.cjs" } }, "./driver-helpers/websocket": { "import": { - "types": "./dist/common/websocket.d.ts", - "default": "./dist/common/websocket.js" + "types": "./dist/tsup/common/websocket.d.ts", + "default": "./dist/tsup/common/websocket.js" }, "require": { - "types": "./dist/common/websocket.d.cts", - "default": "./dist/common/websocket.cjs" + "types": "./dist/tsup/common/websocket.d.cts", + "default": "./dist/tsup/common/websocket.cjs" } }, "./driver-test-suite": { "import": { - "types": "./dist/driver-test-suite/mod.d.ts", - "default": "./dist/driver-test-suite/mod.js" + "types": "./dist/tsup/driver-test-suite/mod.d.ts", + "default": "./dist/tsup/driver-test-suite/mod.js" }, "require": { - "types": "./dist/driver-test-suite/mod.d.cts", - "default": "./dist/driver-test-suite/mod.cjs" + "types": "./dist/tsup/driver-test-suite/mod.d.cts", + "default": "./dist/tsup/driver-test-suite/mod.cjs" } }, "./topologies/coordinate": { "import": { - "types": "./dist/topologies/coordinate/mod.d.ts", - "default": "./dist/topologies/coordinate/mod.js" + "types": "./dist/tsup/topologies/coordinate/mod.d.ts", + "default": "./dist/tsup/topologies/coordinate/mod.js" }, "require": { - "types": "./dist/topologies/coordinate/mod.d.cts", - "default": "./dist/topologies/coordinate/mod.cjs" + "types": "./dist/tsup/topologies/coordinate/mod.d.cts", + "default": "./dist/tsup/topologies/coordinate/mod.cjs" } }, "./topologies/partition": { "import": { - "types": "./dist/topologies/partition/mod.d.ts", - "default": "./dist/topologies/partition/mod.js" + "types": "./dist/tsup/topologies/partition/mod.d.ts", + "default": "./dist/tsup/topologies/partition/mod.js" }, "require": { - "types": "./dist/topologies/partition/mod.d.cts", - "default": "./dist/topologies/partition/mod.cjs" + "types": "./dist/tsup/topologies/partition/mod.d.cts", + "default": "./dist/tsup/topologies/partition/mod.cjs" } }, "./test": { "import": { - "types": "./dist/test/mod.d.ts", - "default": "./dist/test/mod.js" + "types": "./dist/tsup/test/mod.d.ts", + "default": "./dist/tsup/test/mod.js" }, "require": { - "types": "./dist/test/mod.d.cts", - "default": "./dist/test/mod.cjs" + "types": "./dist/tsup/test/mod.d.cts", + "default": "./dist/tsup/test/mod.cjs" } }, "./inspector": { "import": { - "types": "./dist/inspector/mod.d.ts", - "default": "./dist/inspector/mod.js" + "types": "./dist/tsup/inspector/mod.d.ts", + "default": "./dist/tsup/inspector/mod.js" }, "require": { - "types": "./dist/inspector/mod.d.cts", - "default": "./dist/inspector/mod.cjs" + "types": "./dist/tsup/inspector/mod.d.cts", + "default": "./dist/tsup/inspector/mod.cjs" } } }, @@ -150,16 +150,18 @@ "scripts": { "dev": "pnpm build --watch", "build": "tsup src/mod.ts src/client/mod.ts src/common/log.ts src/common/websocket.ts src/actor/errors.ts src/topologies/coordinate/mod.ts src/topologies/partition/mod.ts src/utils.ts src/driver-helpers/mod.ts src/driver-test-suite/mod.ts src/test/mod.ts src/inspector/mod.ts", + "build:schema": "node ../../packages/misc/bare-compiler/dist/cli.js compile schemas/client-protocol/v1.bare -o dist/schemas/client-protocol/v1.ts && node ../../packages/misc/bare-compiler/dist/cli.js compile schemas/file-system-driver/v1.bare -o dist/schemas/file-system-driver/v1.ts ", "check-types": "tsc --noEmit", - "boop": "tsc --outDir dist/test -d", "test": "vitest run", "test:watch": "vitest", "dump-openapi": "tsx scripts/dump-openapi.ts" }, "dependencies": { + "@bare-ts/lib": "~0.3.0", "@hono/standard-validator": "^0.1.3", "@hono/zod-openapi": "^0.19.10", "@rivetkit/fast-json-patch": "^3.1.2", + "@rivetkit/versioned-data-util": "workspace:*", "cbor-x": "^1.6.0", "hono": "^4.7.0", "invariant": "^2.2.4", @@ -173,6 +175,7 @@ "@hono/node-server": "^1.18.2", "@hono/node-ws": "^1.1.1", "@rivet-gg/actor-core": "^25.1.0", + "@rivetkit/bare-compiler": "workspace:*", "@types/invariant": "^2", "@types/node": "^22.13.1", "@types/ws": "^8", diff --git a/packages/core/schemas/client-protocol/v1.bare b/packages/core/schemas/client-protocol/v1.bare new file mode 100644 index 000000000..c3f9a0171 --- /dev/null +++ b/packages/core/schemas/client-protocol/v1.bare @@ -0,0 +1,82 @@ +# MARK: Message To Client +type Init struct { + actorId: str + connectionId: str + connectionToken: str +} + +type Error struct { + code: str + message: str + metadata: optional + actionId: optional +} + +type ActionResponse struct { + id: uint + output: data +} + +type Event struct { + name: str + # CBOR array + args: data +} + +type ToClientBody union { + Init | + Error | + ActionResponse | + Event +} + +type ToClient struct { + body: ToClientBody +} + +# MARK: Message To Server +type ActionRequest struct { + id: uint + name: str + # CBOR array + args: data +} + +type SubscriptionRequest struct { + eventName: str + subscribe: bool +} + +type ToServerBody union { + ActionRequest | + SubscriptionRequest +} + +type ToServer struct { + body: ToServerBody +} + +# MARK: HTTP Action +type HttpActionRequest struct { + # CBOR array + args: data +} + +type HttpActionResponse struct { + output: data +} + +# MARK: HTTP Error +type HttpResponseError struct { + code: str + message: str + metadata: optional +} + +# MARK: HTTP Resolve +type HttpResolveRequest void + +type HttpResolveResponse struct { + actorId: str +} + diff --git a/packages/core/schemas/file-system-driver/v1.bare b/packages/core/schemas/file-system-driver/v1.bare new file mode 100644 index 000000000..46c382667 --- /dev/null +++ b/packages/core/schemas/file-system-driver/v1.bare @@ -0,0 +1,20 @@ +# File System Driver Schema (v1) + +# MARK: Actor State +# Represents the persisted state for an actor on disk. +# Note: createdAt is not persisted; it is derived from the file's birthtime. +type ActorState struct { + id: str + name: str + key: list + persistedData: data +} + +# MARK: Actor Alarm +# Represents a scheduled alarm for an actor. +# Stored per-actor; the actor id is implied by the filename. +# The timestamp is milliseconds since epoch. +type ActorAlarm struct { + timestamp: uint +} + diff --git a/packages/core/src/actor/connection.ts b/packages/core/src/actor/connection.ts index 6ab59d50a..11779c311 100644 --- a/packages/core/src/actor/connection.ts +++ b/packages/core/src/actor/connection.ts @@ -1,5 +1,7 @@ -import type * as messageToClient from "@/actor/protocol/message/to-client"; -import type * as wsToClient from "@/actor/protocol/message/to-client"; +import * as cbor from "cbor-x"; +import type * as protocol from "@/schemas/client-protocol/mod"; +import { TO_CLIENT_VERSIONED } from "@/schemas/client-protocol/versioned"; +import { bufferToArrayBuffer } from "@/utils"; import type { AnyDatabaseProvider } from "./database"; import { type ConnDriver, ConnectionReadyState } from "./driver"; import * as errors from "./errors"; @@ -162,7 +164,7 @@ export class Conn { * * @protected */ - public _sendMessage(message: CachedSerializer) { + public _sendMessage(message: CachedSerializer) { this.#driver.sendMessage?.(this.#actor, this, this.__persist.ds, message); } @@ -181,14 +183,18 @@ export class Conn { connId: this.id, }); this._sendMessage( - new CachedSerializer({ - b: { - ev: { - n: eventName, - a: args, + new CachedSerializer( + { + body: { + tag: "Event", + val: { + name: eventName, + args: bufferToArrayBuffer(cbor.encode(args)), + }, }, }, - }), + TO_CLIENT_VERSIONED, + ), ); } diff --git a/packages/core/src/actor/driver.ts b/packages/core/src/actor/driver.ts index 78c76292e..fbee281b4 100644 --- a/packages/core/src/actor/driver.ts +++ b/packages/core/src/actor/driver.ts @@ -1,9 +1,9 @@ -import type * as messageToClient from "@/actor/protocol/message/to-client"; import type { CachedSerializer } from "@/actor/protocol/serde"; import type { AnyClient } from "@/client/client"; import type { ManagerDriver } from "@/manager/driver"; import type { RegistryConfig } from "@/registry/config"; import type { RunConfig } from "@/registry/run-config"; +import type * as protocol from "@/schemas/client-protocol/mod"; import type { AnyConn, ConnectionDriver } from "./connection"; import type { GenericConnGlobalState } from "./generic-conn-driver"; import type { AnyActorInstance } from "./instance"; @@ -42,7 +42,7 @@ export interface ActorDriver { */ getDatabase(actorId: string): Promise; - sleep?(actorId: string): void; + sleep?(actorId: string): Promise; shutdown?(immediate: boolean): Promise; } @@ -60,7 +60,7 @@ export interface ConnDriver { actor: AnyActorInstance, conn: AnyConn, state: ConnDriverState, - message: CachedSerializer, + message: CachedSerializer, ): void; /** diff --git a/packages/core/src/actor/generic-conn-driver.ts b/packages/core/src/actor/generic-conn-driver.ts index f7980b5a3..bb7701f94 100644 --- a/packages/core/src/actor/generic-conn-driver.ts +++ b/packages/core/src/actor/generic-conn-driver.ts @@ -13,9 +13,9 @@ import { ConnectionReadyState, } from "@/actor/driver"; import type { AnyActorInstance } from "@/actor/instance"; -import type * as messageToClient from "@/actor/protocol/message/to-client"; import type { CachedSerializer, Encoding } from "@/actor/protocol/serde"; import { encodeDataToString } from "@/actor/protocol/serde"; +import type * as protocol from "@/schemas/client-protocol/mod"; import { logger } from "./log"; // This state is different than `PersistedConn` state since the connection-specific state is persisted & must be serializable. This is also part of the connection driver, not part of the core actor. @@ -54,7 +54,7 @@ export function createGenericWebSocketDriver( actor: AnyActorInstance, conn: AnyConn, state: GenericWebSocketDriverState, - message: CachedSerializer, + message: CachedSerializer, ) => { const ws = globalState.websockets.get(conn.id); if (!ws) { @@ -168,7 +168,7 @@ export function createGenericSseDriver( _actor: AnyActorInstance, conn: AnyConn, state: GenericSseDriverState, - message: CachedSerializer, + message: CachedSerializer, ) => { const stream = globalState.sseStreams.get(conn.id); if (!stream) { @@ -223,7 +223,7 @@ export type GenericHttpDriverState = Record; export function createGenericHttpDriver(): ConnDriver { return { - getConnectionReadyState(_actor, conn) { + getConnectionReadyState(_actor, _conn) { // TODO: This might not be the correct logic return ConnectionReadyState.OPEN; }, diff --git a/packages/core/src/actor/instance.ts b/packages/core/src/actor/instance.ts index ef93d8b70..387cb7002 100644 --- a/packages/core/src/actor/instance.ts +++ b/packages/core/src/actor/instance.ts @@ -2,15 +2,15 @@ import * as cbor from "cbor-x"; import invariant from "invariant"; import onChange from "on-change"; import type { ActorKey } from "@/actor/mod"; -import type * as wsToClient from "@/actor/protocol/message/to-client"; -import type * as wsToServer from "@/actor/protocol/message/to-server"; import type { Client } from "@/client/client"; import type { Logger } from "@/common/log"; import { isCborSerializable, stringifyError } from "@/common/utils"; import type { UniversalWebSocket } from "@/common/websocket-interface"; import { ActorInspector } from "@/inspector/actor"; import type { Registry } from "@/mod"; -import { SinglePromiseQueue } from "@/utils"; +import type * as protocol from "@/schemas/client-protocol/mod"; +import { TO_CLIENT_VERSIONED } from "@/schemas/client-protocol/versioned"; +import { bufferToArrayBuffer, SinglePromiseQueue } from "@/utils"; import type { ActionContext } from "./action"; import type { ActorConfig, OnConnectOptions } from "./config"; import { @@ -28,9 +28,8 @@ import type { PersistedActor, PersistedConn, PersistedScheduleEvent, - PersistedScheduleEventKind, } from "./persisted"; -import { processMessage } from "./protocol/message/mod"; +import { processMessage } from "./protocol/old"; import { CachedSerializer } from "./protocol/serde"; import { Schedule } from "./schedule"; import { DeadlineError, deadline } from "./utils"; @@ -945,15 +944,19 @@ export class ActorInstance< // Send init message conn._sendMessage( - new CachedSerializer({ - b: { - i: { - ai: this.id, - ci: conn.id, - ct: conn._token, + new CachedSerializer( + { + body: { + tag: "Init", + val: { + actorId: this.id, + connectionId: conn.id, + connectionToken: conn._token, + }, }, }, - }), + TO_CLIENT_VERSIONED, + ), ); return conn; @@ -961,7 +964,7 @@ export class ActorInstance< // MARK: Messages async processMessage( - message: wsToServer.ToServer, + message: protocol.ToServer, conn: Conn, ) { await processMessage(message, this, conn, { @@ -1428,14 +1431,18 @@ export class ActorInstance< const subscriptions = this.#subscriptionIndex.get(name); if (!subscriptions) return; - const toClientSerializer = new CachedSerializer({ - b: { - ev: { - n: name, - a: args, + const toClientSerializer = new CachedSerializer( + { + body: { + tag: "Event", + val: { + name, + args: bufferToArrayBuffer(cbor.encode(args)), + }, }, }, - }); + TO_CLIENT_VERSIONED, + ); // Send message to clients for (const connection of subscriptions) { @@ -1524,6 +1531,9 @@ export class ActorInstance< this.#sleepTimeout = undefined; } + // Don't set a new timer if already sleeping + if (this.#sleepCalled) return; + if (canSleep) { this.#sleepTimeout = setTimeout(() => { this._sleep().catch((error) => { @@ -1555,8 +1565,12 @@ export class ActorInstance< /** Puts an actor to sleep. This should just start the sleep sequence, most shutdown logic should be in _stop (which is called by the ActorDriver when sleeping). */ async _sleep() { + const sleep = this.#actorDriver.sleep?.bind( + this.#actorDriver, + this.#actorId, + ); invariant(this.#sleepingSupported, "sleeping not supported"); - invariant(this.#actorDriver.sleep, "no sleep on driver"); + invariant(sleep, "no sleep on driver"); if (this.#sleepCalled) { logger().warn("already sleeping actor"); @@ -1566,10 +1580,13 @@ export class ActorInstance< logger().info("actor sleeping"); - // The actor driver should call stop when ready to stop - // - // This will call _stop once Pegboard responds with the new status - this.#actorDriver.sleep(this.#actorId); + // Schedule sleep to happen on the next tick. This allows for any action that calls _sleep to complete. + setImmediate(async () => { + // The actor driver should call stop when ready to stop + // + // This will call _stop once Pegboard responds with the new status + await sleep(); + }); } // MARK: Stop diff --git a/packages/core/src/actor/protocol/http/action.ts b/packages/core/src/actor/protocol/http/action.ts deleted file mode 100644 index 1d8cb736a..000000000 --- a/packages/core/src/actor/protocol/http/action.ts +++ /dev/null @@ -1,14 +0,0 @@ -import { z } from "zod"; - -export const ActionRequestSchema = z.object({ - // Args - a: z.array(z.unknown()), -}); - -export const ActionResponseSchema = z.object({ - // Output - o: z.unknown(), -}); - -export type ActionRequest = z.infer; -export type ActionResponse = z.infer; diff --git a/packages/core/src/actor/protocol/http/error.ts b/packages/core/src/actor/protocol/http/error.ts deleted file mode 100644 index 7c7b9216b..000000000 --- a/packages/core/src/actor/protocol/http/error.ts +++ /dev/null @@ -1,12 +0,0 @@ -import { z } from "zod"; - -export const ResponseErrorSchema = z.object({ - // Code - c: z.string(), - // Message - m: z.string(), - // Metadata - md: z.unknown().optional(), -}); - -export type ResponseError = z.infer; diff --git a/packages/core/src/actor/protocol/http/resolve.ts b/packages/core/src/actor/protocol/http/resolve.ts deleted file mode 100644 index 54c124539..000000000 --- a/packages/core/src/actor/protocol/http/resolve.ts +++ /dev/null @@ -1,8 +0,0 @@ -import { z } from "zod"; - -export const ResolveResponseSchema = z.object({ - // Actor ID - i: z.string(), -}); - -export type ResolveResponse = z.infer; diff --git a/packages/core/src/actor/protocol/message/mod.ts b/packages/core/src/actor/protocol/message/mod.ts deleted file mode 100644 index a01935bae..000000000 --- a/packages/core/src/actor/protocol/message/mod.ts +++ /dev/null @@ -1,219 +0,0 @@ -import { z } from "zod"; -import type { AnyDatabaseProvider } from "@/actor/database"; -import type * as wsToClient from "@/actor/protocol/message/to-client"; -import * as wsToServer from "@/actor/protocol/message/to-server"; -import { - CachedSerializer, - deserialize, - type Encoding, - type InputData, -} from "@/actor/protocol/serde"; -import { deconstructError } from "@/common/utils"; -import { ActionContext } from "../../action"; -import type { Conn } from "../../connection"; -import * as errors from "../../errors"; -import type { ActorInstance } from "../../instance"; -import { logger } from "../../log"; -import { assertUnreachable } from "../../utils"; - -export const TransportSchema = z.enum(["websocket", "sse"]); - -/** - * Transport mechanism used to communicate between client & actor. - */ -export type Transport = z.infer; - -interface MessageEventOpts { - encoding: Encoding; - maxIncomingMessageSize: number; -} - -function getValueLength(value: InputData): number { - if (typeof value === "string") { - return value.length; - } else if (value instanceof Blob) { - return value.size; - } else if ( - value instanceof ArrayBuffer || - value instanceof SharedArrayBuffer || - value instanceof Uint8Array - ) { - return value.byteLength; - } else { - assertUnreachable(value); - } -} - -export async function parseMessage( - value: InputData, - opts: MessageEventOpts, -): Promise { - // Validate value length - const length = getValueLength(value); - if (length > opts.maxIncomingMessageSize) { - throw new errors.MessageTooLong(); - } - - // Parse & validate message - const deserializedValue = await deserialize(value, opts.encoding); - const { - data: message, - success, - error, - } = wsToServer.ToServerSchema.safeParse(deserializedValue); - if (!success) { - throw new errors.MalformedMessage(error); - } - - return message; -} - -export interface ProcessMessageHandler< - S, - CP, - CS, - V, - I, - AD, - DB extends AnyDatabaseProvider, -> { - onExecuteAction?: ( - ctx: ActionContext, - name: string, - args: unknown[], - ) => Promise; - onSubscribe?: ( - eventName: string, - conn: Conn, - ) => Promise; - onUnsubscribe?: ( - eventName: string, - conn: Conn, - ) => Promise; -} - -export async function processMessage< - S, - CP, - CS, - V, - I, - AD, - DB extends AnyDatabaseProvider, ->( - message: wsToServer.ToServer, - actor: ActorInstance, - conn: Conn, - handler: ProcessMessageHandler, -) { - let actionId: number | undefined; - let actionName: string | undefined; - - try { - if ("ar" in message.b) { - // Action request - - if (handler.onExecuteAction === undefined) { - throw new errors.Unsupported("Action"); - } - - const { i: id, n: name, a: args = [] } = message.b.ar; - - actionId = id; - actionName = name; - - logger().debug("processing action request", { - id, - name, - argsCount: args.length, - }); - - const ctx = new ActionContext( - actor.actorContext, - conn, - ); - - // Process the action request and wait for the result - // This will wait for async actions to complete - const output = await handler.onExecuteAction(ctx, name, args); - - logger().debug("sending action response", { - id, - name, - outputType: typeof output, - isPromise: output instanceof Promise, - }); - - // Send the response back to the client - conn._sendMessage( - new CachedSerializer({ - b: { - ar: { - i: id, - o: output, - }, - }, - }), - ); - - logger().debug("action response sent", { id, name }); - } else if ("sr" in message.b) { - // Subscription request - - if ( - handler.onSubscribe === undefined || - handler.onUnsubscribe === undefined - ) { - throw new errors.Unsupported("Subscriptions"); - } - - const { e: eventName, s: subscribe } = message.b.sr; - logger().debug("processing subscription request", { - eventName, - subscribe, - }); - - if (subscribe) { - await handler.onSubscribe(eventName, conn); - } else { - await handler.onUnsubscribe(eventName, conn); - } - - logger().debug("subscription request completed", { - eventName, - subscribe, - }); - } else { - assertUnreachable(message.b); - } - } catch (error) { - const { code, message, metadata } = deconstructError(error, logger(), { - connectionId: conn.id, - actionId, - actionName, - }); - - logger().debug("sending error response", { - actionId, - actionName, - code, - message, - }); - - // Build response - conn._sendMessage( - new CachedSerializer({ - b: { - e: { - c: code, - m: message, - md: metadata, - ai: actionId, - }, - }, - }), - ); - - logger().debug("error response sent", { actionId, actionName }); - } -} diff --git a/packages/core/src/actor/protocol/message/to-client.ts b/packages/core/src/actor/protocol/message/to-client.ts deleted file mode 100644 index a0a5330a9..000000000 --- a/packages/core/src/actor/protocol/message/to-client.ts +++ /dev/null @@ -1,52 +0,0 @@ -import { z } from "zod"; - -// Only called for SSE because we don't need this for WebSockets -export const InitSchema = z.object({ - // Actor ID - ai: z.string(), - // Connection ID - ci: z.string(), - // Connection token - ct: z.string(), -}); - -// Used for connection errors (both during initialization and afterwards) -export const ErrorSchema = z.object({ - // Code - c: z.string(), - // Message - m: z.string(), - // Metadata - md: z.unknown().optional(), - // Action ID - ai: z.number().int().optional(), -}); - -export const ActionResponseSchema = z.object({ - // ID - i: z.number().int(), - // Output - o: z.unknown(), -}); - -export const EventSchema = z.object({ - // Name - n: z.string(), - // Args - a: z.array(z.unknown()), -}); - -export const ToClientSchema = z.object({ - // Body - b: z.union([ - z.object({ i: InitSchema }), - z.object({ e: ErrorSchema }), - z.object({ ar: ActionResponseSchema }), - z.object({ ev: EventSchema }), - ]), -}); - -export type ToClient = z.infer; -export type Error = z.infer; -export type ActionResponse = z.infer; -export type Event = z.infer; diff --git a/packages/core/src/actor/protocol/message/to-server.ts b/packages/core/src/actor/protocol/message/to-server.ts deleted file mode 100644 index 257f2792b..000000000 --- a/packages/core/src/actor/protocol/message/to-server.ts +++ /dev/null @@ -1,29 +0,0 @@ -import { z } from "zod"; - -const ActionRequestSchema = z.object({ - // ID - i: z.number().int(), - // Name - n: z.string(), - // Args - a: z.array(z.unknown()), -}); - -const SubscriptionRequestSchema = z.object({ - // Event name - e: z.string(), - // Subscribe - s: z.boolean(), -}); - -export const ToServerSchema = z.object({ - // Body - b: z.union([ - z.object({ ar: ActionRequestSchema }), - z.object({ sr: SubscriptionRequestSchema }), - ]), -}); - -export type ToServer = z.infer; -export type ActionRequest = z.infer; -export type SubscriptionRequest = z.infer; diff --git a/packages/core/src/actor/protocol/old.ts b/packages/core/src/actor/protocol/old.ts new file mode 100644 index 000000000..a5110f32b --- /dev/null +++ b/packages/core/src/actor/protocol/old.ts @@ -0,0 +1,281 @@ +import * as cbor from "cbor-x"; +import { z } from "zod"; +import type { AnyDatabaseProvider } from "@/actor/database"; +import * as errors from "@/actor/errors"; +import { + CachedSerializer, + type Encoding, + type InputData, +} from "@/actor/protocol/serde"; +import { deconstructError } from "@/common/utils"; +import type * as protocol from "@/schemas/client-protocol/mod"; +import { + TO_CLIENT_VERSIONED, + TO_SERVER_VERSIONED, +} from "@/schemas/client-protocol/versioned"; +import { deserializeWithEncoding } from "@/serde"; +import { assertUnreachable, bufferToArrayBuffer } from "../../utils"; +import { ActionContext } from "../action"; +import type { Conn } from "../connection"; +import type { ActorInstance } from "../instance"; +import { logger } from "../log"; + +export const TransportSchema = z.enum(["websocket", "sse"]); + +/** + * Transport mechanism used to communicate between client & actor. + */ +export type Transport = z.infer; + +interface MessageEventOpts { + encoding: Encoding; + maxIncomingMessageSize: number; +} + +function getValueLength(value: InputData): number { + if (typeof value === "string") { + return value.length; + } else if (value instanceof Blob) { + return value.size; + } else if ( + value instanceof ArrayBuffer || + value instanceof SharedArrayBuffer || + value instanceof Uint8Array + ) { + return value.byteLength; + } else { + assertUnreachable(value); + } +} + +export async function inputDataToBuffer( + data: InputData, +): Promise { + if (typeof data === "string") { + return data; + } else if (data instanceof Blob) { + const arrayBuffer = await data.arrayBuffer(); + return new Uint8Array(arrayBuffer); + } else if (data instanceof Uint8Array) { + return data; + } else if (data instanceof ArrayBuffer || data instanceof SharedArrayBuffer) { + return new Uint8Array(data); + } else { + throw new errors.MalformedMessage(); + } +} + +export async function parseMessage( + value: InputData, + opts: MessageEventOpts, +): Promise { + // Validate value length + const length = getValueLength(value); + if (length > opts.maxIncomingMessageSize) { + throw new errors.MessageTooLong(); + } + + // Parse & validate message + const buffer = await inputDataToBuffer(value); + return deserializeWithEncoding(opts.encoding, buffer, TO_SERVER_VERSIONED); +} + +export interface ProcessMessageHandler< + S, + CP, + CS, + V, + I, + AD, + DB extends AnyDatabaseProvider, +> { + onExecuteAction?: ( + ctx: ActionContext, + name: string, + args: unknown[], + ) => Promise; + onSubscribe?: ( + eventName: string, + conn: Conn, + ) => Promise; + onUnsubscribe?: ( + eventName: string, + conn: Conn, + ) => Promise; +} + +export async function processMessage< + S, + CP, + CS, + V, + I, + AD, + DB extends AnyDatabaseProvider, +>( + message: protocol.ToServer, + actor: ActorInstance, + conn: Conn, + handler: ProcessMessageHandler, +) { + let actionId: bigint | undefined; + let actionName: string | undefined; + + try { + if (message.body.tag === "ActionRequest") { + // Action request + + if (handler.onExecuteAction === undefined) { + throw new errors.Unsupported("Action"); + } + + const { id, name, args: argsRaw } = message.body.val; + actionId = id; + actionName = name; + const args = cbor.decode(new Uint8Array(argsRaw)); + + logger().debug("processing action request", { + actionId: id, + actionName: name, + }); + + const ctx = new ActionContext( + actor.actorContext, + conn, + ); + + // Process the action request and wait for the result + // This will wait for async actions to complete + const output = await handler.onExecuteAction(ctx, name, args); + + logger().debug("sending action response", { + actionId: id, + actionName: name, + outputType: typeof output, + isPromise: output instanceof Promise, + }); + + // Send the response back to the client + conn._sendMessage( + new CachedSerializer( + { + body: { + tag: "ActionResponse", + val: { + id: id, + output: bufferToArrayBuffer(cbor.encode(output)), + }, + }, + }, + TO_CLIENT_VERSIONED, + ), + ); + + logger().debug("action response sent", { id, name: name }); + } else if (message.body.tag === "SubscriptionRequest") { + // Subscription request + + if ( + handler.onSubscribe === undefined || + handler.onUnsubscribe === undefined + ) { + throw new errors.Unsupported("Subscriptions"); + } + + const { eventName, subscribe } = message.body.val; + logger().debug("processing subscription request", { + eventName, + subscribe, + }); + + if (subscribe) { + await handler.onSubscribe(eventName, conn); + } else { + await handler.onUnsubscribe(eventName, conn); + } + + logger().debug("subscription request completed", { + eventName, + subscribe, + }); + } else { + assertUnreachable(message.body); + } + } catch (error) { + const { code, message, metadata } = deconstructError(error, logger(), { + connectionId: conn.id, + actionId, + actionName, + }); + + logger().debug("sending error response", { + actionId, + actionName, + code, + message, + }); + + // Build response + conn._sendMessage( + new CachedSerializer( + { + body: { + tag: "Error", + val: { + code, + message, + metadata: bufferToArrayBuffer(cbor.encode(metadata)), + actionId: actionId ?? null, + }, + }, + }, + TO_CLIENT_VERSIONED, + ), + ); + + logger().debug("error response sent", { actionId, actionName }); + } +} + +///** +// * Use `CachedSerializer` if serializing the same data repeatedly. +// */ +//export function serialize(value: T, encoding: Encoding): OutputData { +// if (encoding === "json") { +// return JSON.stringify(value); +// } else if (encoding === "cbor") { +// // TODO: Remove this hack, but cbor-x can't handle anything extra in data structures +// const cleanValue = JSON.parse(JSON.stringify(value)); +// return cbor.encode(cleanValue); +// } else { +// assertUnreachable(encoding); +// } +//} +// +//export async function deserialize(data: InputData, encoding: Encoding) { +// if (encoding === "json") { +// if (typeof data !== "string") { +// logger().warn("received non-string for json parse"); +// throw new errors.MalformedMessage(); +// } else { +// return JSON.parse(data); +// } +// } else if (encoding === "cbor") { +// if (data instanceof Blob) { +// const arrayBuffer = await data.arrayBuffer(); +// return cbor.decode(new Uint8Array(arrayBuffer)); +// } else if (data instanceof Uint8Array) { +// return cbor.decode(data); +// } else if ( +// data instanceof ArrayBuffer || +// data instanceof SharedArrayBuffer +// ) { +// return cbor.decode(new Uint8Array(data)); +// } else { +// logger().warn("received non-binary type for cbor parse"); +// throw new errors.MalformedMessage(); +// } +// } else { +// assertUnreachable(encoding); +// } +//} diff --git a/packages/core/src/actor/protocol/serde.ts b/packages/core/src/actor/protocol/serde.ts index fea3dc7cf..9e155cd73 100644 --- a/packages/core/src/actor/protocol/serde.ts +++ b/packages/core/src/actor/protocol/serde.ts @@ -1,6 +1,8 @@ +import type { VersionedDataHandler } from "@rivetkit/versioned-data-util"; import * as cbor from "cbor-x"; import { z } from "zod"; import * as errors from "@/actor/errors"; +import { serializeWithEncoding } from "@/serde"; import { logger } from "../log"; import { assertUnreachable } from "../utils"; @@ -10,7 +12,7 @@ export type InputData = string | Buffer | Blob | ArrayBufferLike | Uint8Array; /** Data that's been serialized. */ export type OutputData = string | Uint8Array; -export const EncodingSchema = z.enum(["json", "cbor"]); +export const EncodingSchema = z.enum(["json", "cbor", "bare"]); /** * Encoding used to communicate between the client & actor. @@ -23,9 +25,11 @@ export type Encoding = z.infer; export class CachedSerializer { #data: T; #cache = new Map(); + #versionedDataHandler: VersionedDataHandler; - constructor(data: T) { + constructor(data: T, versionedDataHandler: VersionedDataHandler) { this.#data = data; + this.#versionedDataHandler = versionedDataHandler; } public get rawData(): T { @@ -37,55 +41,59 @@ export class CachedSerializer { if (cached) { return cached; } else { - const serialized = serialize(this.#data, encoding); + const serialized = serializeWithEncoding( + encoding, + this.#data, + this.#versionedDataHandler, + ); this.#cache.set(encoding, serialized); return serialized; } } } -/** - * Use `CachedSerializer` if serializing the same data repeatedly. - */ -export function serialize(value: T, encoding: Encoding): OutputData { - if (encoding === "json") { - return JSON.stringify(value); - } else if (encoding === "cbor") { - // TODO: Remove this hack, but cbor-x can't handle anything extra in data structures - const cleanValue = JSON.parse(JSON.stringify(value)); - return cbor.encode(cleanValue); - } else { - assertUnreachable(encoding); - } -} - -export async function deserialize(data: InputData, encoding: Encoding) { - if (encoding === "json") { - if (typeof data !== "string") { - logger().warn("received non-string for json parse"); - throw new errors.MalformedMessage(); - } else { - return JSON.parse(data); - } - } else if (encoding === "cbor") { - if (data instanceof Blob) { - const arrayBuffer = await data.arrayBuffer(); - return cbor.decode(new Uint8Array(arrayBuffer)); - } else if (data instanceof Uint8Array) { - return cbor.decode(data); - } else if ( - data instanceof ArrayBuffer || - data instanceof SharedArrayBuffer - ) { - return cbor.decode(new Uint8Array(data)); - } else { - logger().warn("received non-binary type for cbor parse"); - throw new errors.MalformedMessage(); - } - } else { - assertUnreachable(encoding); - } -} +///** +// * Use `CachedSerializer` if serializing the same data repeatedly. +// */ +//export function serialize(value: T, encoding: Encoding): OutputData { +// if (encoding === "json") { +// return JSON.stringify(value); +// } else if (encoding === "cbor") { +// // TODO: Remove this hack, but cbor-x can't handle anything extra in data structures +// const cleanValue = JSON.parse(JSON.stringify(value)); +// return cbor.encode(cleanValue); +// } else { +// assertUnreachable(encoding); +// } +//} +// +//export async function deserialize(data: InputData, encoding: Encoding) { +// if (encoding === "json") { +// if (typeof data !== "string") { +// logger().warn("received non-string for json parse"); +// throw new errors.MalformedMessage(); +// } else { +// return JSON.parse(data); +// } +// } else if (encoding === "cbor") { +// if (data instanceof Blob) { +// const arrayBuffer = await data.arrayBuffer(); +// return cbor.decode(new Uint8Array(arrayBuffer)); +// } else if (data instanceof Uint8Array) { +// return cbor.decode(data); +// } else if ( +// data instanceof ArrayBuffer || +// data instanceof SharedArrayBuffer +// ) { +// return cbor.decode(new Uint8Array(data)); +// } else { +// logger().warn("received non-binary type for cbor parse"); +// throw new errors.MalformedMessage(); +// } +// } else { +// assertUnreachable(encoding); +// } +//} // TODO: Encode base 128 function base64EncodeUint8Array(uint8Array: Uint8Array): string { @@ -114,3 +122,10 @@ export function encodeDataToString(message: OutputData): string { assertUnreachable(message); } } + +/** Stringifies with compat for values that BARE & CBOR supports. */ +export function jsonStringifyCompat(input: any): string { + return JSON.stringify(input, (_key, value) => + typeof value === "bigint" ? value.toString() : value, + ); +} diff --git a/packages/core/src/actor/router-endpoints.ts b/packages/core/src/actor/router-endpoints.ts index 05dca2f64..66238a21b 100644 --- a/packages/core/src/actor/router-endpoints.ts +++ b/packages/core/src/actor/router-endpoints.ts @@ -1,3 +1,4 @@ +import * as cbor from "cbor-x"; import type { Context as HonoContext, HonoRequest } from "hono"; import { type SSEStreamingApi, streamSSE } from "hono/streaming"; import type { WSContext } from "hono/ws"; @@ -12,21 +13,25 @@ import { } from "@/actor/connection"; import * as errors from "@/actor/errors"; import type { AnyActorInstance } from "@/actor/instance"; -import * as protoHttpAction from "@/actor/protocol/http/action"; -import { parseMessage } from "@/actor/protocol/message/mod"; -import type * as messageToServer from "@/actor/protocol/message/to-server"; import type { InputData } from "@/actor/protocol/serde"; -import { - deserialize, - type Encoding, - EncodingSchema, - serialize, -} from "@/actor/protocol/serde"; +import { type Encoding, EncodingSchema } from "@/actor/protocol/serde"; import type { UpgradeWebSocketArgs } from "@/common/inline-websocket-adapter2"; import { deconstructError, stringifyError } from "@/common/utils"; import type { UniversalWebSocket } from "@/common/websocket-interface"; import { HonoWebSocketAdapter } from "@/manager/hono-websocket-adapter"; import type { RunConfig } from "@/registry/run-config"; +import type * as protocol from "@/schemas/client-protocol/mod"; +import { + HTTP_ACTION_REQUEST_VERSIONED, + HTTP_ACTION_RESPONSE_VERSIONED, + TO_SERVER_VERSIONED, +} from "@/schemas/client-protocol/versioned"; +import { + contentTypeForEncoding, + deserializeWithEncoding, + serializeWithEncoding, +} from "@/serde"; +import { bufferToArrayBuffer } from "@/utils"; import type { ActorDriver } from "./driver"; import type { GenericHttpDriverState, @@ -34,7 +39,7 @@ import type { GenericWebSocketDriverState, } from "./generic-conn-driver"; import { logger } from "./log"; -import { assertUnreachable } from "./utils"; +import { parseMessage } from "./protocol/old"; export interface ConnectWebSocketOpts { req?: HonoRequest; @@ -46,7 +51,7 @@ export interface ConnectWebSocketOpts { export interface ConnectWebSocketOutput { onOpen: (ws: WSContext) => void; - onMessage: (message: messageToServer.ToServer) => void; + onMessage: (message: protocol.ToServer) => void; onClose: () => void; } @@ -80,7 +85,7 @@ export interface ConnsMessageOpts { req?: HonoRequest; connId: string; connToken: string; - message: messageToServer.ToServer; + message: protocol.ToServer; actorId: string; } @@ -319,7 +324,7 @@ export async function handleWebSocketConnect( */ export async function handleSseConnect( c: HonoContext, - runConfig: RunConfig, + _runConfig: RunConfig, actorDriver: ActorDriver, actorId: string, authData: unknown, @@ -416,7 +421,7 @@ export async function handleSseConnect( */ export async function handleAction( c: HonoContext, - runConfig: RunConfig, + _runConfig: RunConfig, actorDriver: ActorDriver, actionName: string, actorId: string, @@ -428,46 +433,13 @@ export async function handleAction( logger().debug("handling action", { actionName, encoding }); // Validate incoming request - let body: unknown; - if (encoding === "json") { - try { - body = await c.req.json(); - } catch (err) { - if (err instanceof errors.InvalidActionRequest) { - throw err; - } - throw new errors.InvalidActionRequest( - `Invalid JSON: ${stringifyError(err)}`, - ); - } - } else if (encoding === "cbor") { - try { - const value = await c.req.arrayBuffer(); - const uint8Array = new Uint8Array(value); - body = await deserialize(uint8Array as unknown as InputData, encoding); - } catch (err) { - throw new errors.InvalidActionRequest( - `Invalid binary format: ${stringifyError(err)}`, - ); - } - } else { - return assertUnreachable(encoding); - } - - // Validate using the action schema - let actionArgs: unknown[]; - try { - const result = protoHttpAction.ActionRequestSchema.safeParse(body); - if (!result.success) { - throw new errors.InvalidActionRequest("Invalid action request format"); - } - - actionArgs = result.data.a; - } catch (err) { - throw new errors.InvalidActionRequest( - `Invalid schema: ${stringifyError(err)}`, - ); - } + const arrayBuffer = await c.req.arrayBuffer(); + const request = deserializeWithEncoding( + encoding, + new Uint8Array(arrayBuffer), + HTTP_ACTION_REQUEST_VERSIONED, + ); + const actionArgs = cbor.decode(new Uint8Array(request.args)); // Invoke the action let actor: AnyActorInstance | undefined; @@ -497,25 +469,18 @@ export async function handleAction( } } - // Encode the response - if (encoding === "json") { - const responseData = { - o: output, // Use the format expected by ResponseOkSchema - }; - return c.json(responseData); - } else if (encoding === "cbor") { - // Use serialize from serde.ts instead of custom encoder - const responseData = { - o: output, // Use the format expected by ResponseOkSchema - }; - const serialized = serialize(responseData, encoding); - - return c.body(serialized as Uint8Array, 200, { - "Content-Type": "application/octet-stream", - }); - } else { - return assertUnreachable(encoding); - } + // Send response + const responseData: protocol.HttpActionResponse = { + output: bufferToArrayBuffer(cbor.encode(output)), + }; + const serialized = serializeWithEncoding( + encoding, + responseData, + HTTP_ACTION_RESPONSE_VERSIONED, + ); + return c.body(serialized as Uint8Array, 200, { + "Content-Type": contentTypeForEncoding(encoding), + }); } /** @@ -523,7 +488,7 @@ export async function handleAction( */ export async function handleConnectionMessage( c: HonoContext, - runConfig: RunConfig, + _runConfig: RunConfig, actorDriver: ActorDriver, connId: string, connToken: string, @@ -532,29 +497,12 @@ export async function handleConnectionMessage( const encoding = getRequestEncoding(c.req); // Validate incoming request - let message: messageToServer.ToServer; - if (encoding === "json") { - try { - message = await c.req.json(); - } catch (_err) { - throw new errors.InvalidRequest("Invalid JSON"); - } - } else if (encoding === "cbor") { - try { - const value = await c.req.arrayBuffer(); - const uint8Array = new Uint8Array(value); - message = await parseMessage(uint8Array as unknown as InputData, { - encoding, - maxIncomingMessageSize: runConfig.maxIncomingMessageSize, - }); - } catch (err) { - throw new errors.InvalidRequest( - `Invalid binary format: ${stringifyError(err)}`, - ); - } - } else { - return assertUnreachable(encoding); - } + const arrayBuffer = await c.req.arrayBuffer(); + const message = deserializeWithEncoding( + encoding, + new Uint8Array(arrayBuffer), + TO_SERVER_VERSIONED, + ); const actor = await actorDriver.loadActor(actorId); diff --git a/packages/core/src/client/actor-conn.ts b/packages/core/src/client/actor-conn.ts index 3b73f57dc..c1bb21b13 100644 --- a/packages/core/src/client/actor-conn.ts +++ b/packages/core/src/client/actor-conn.ts @@ -1,10 +1,10 @@ import * as cbor from "cbor-x"; +import invariant from "invariant"; import pRetry from "p-retry"; import type { CloseEvent, WebSocket } from "ws"; import type { AnyActorDefinition } from "@/actor/definition"; -import type * as wsToClient from "@/actor/protocol/message/to-client"; -import type * as wsToServer from "@/actor/protocol/message/to-server"; -import type { Encoding } from "@/actor/protocol/serde"; +import { inputDataToBuffer } from "@/actor/protocol/old"; +import { type Encoding, jsonStringifyCompat } from "@/actor/protocol/serde"; import type { UniversalErrorEvent, UniversalEventSource, @@ -12,6 +12,17 @@ import type { } from "@/common/eventsource-interface"; import { assertUnreachable, stringifyError } from "@/common/utils"; import type { ActorQuery } from "@/manager/protocol/query"; +import type * as protocol from "@/schemas/client-protocol/mod"; +import { + TO_CLIENT_VERSIONED, + TO_SERVER_VERSIONED, +} from "@/schemas/client-protocol/versioned"; +import { + deserializeWithEncoding, + encodingIsBinary, + serializeWithEncoding, +} from "@/serde"; +import { bufferToArrayBuffer, getEnvUniversal } from "@/utils"; import type { ActorDefinitionActions } from "./actor-common"; import { ACTOR_CONNS_SYMBOL, @@ -21,16 +32,11 @@ import { } from "./client"; import * as errors from "./errors"; import { logger } from "./log"; -import { rawHttpFetch, rawWebSocket } from "./raw-utils"; -import { - type WebSocketMessage as ConnMessage, - messageLength, - serializeWithEncoding, -} from "./utils"; +import { type WebSocketMessage as ConnMessage, messageLength } from "./utils"; interface ActionInFlight { name: string; - resolve: (response: wsToClient.ActionResponse) => void; + resolve: (response: protocol.ActionResponse) => void; reject: (error: Error) => void; } @@ -85,7 +91,7 @@ export class ActorConnRaw { #transport?: ConnTransport; - #messageQueue: wsToServer.ToServer[] = []; + #messageQueue: protocol.ToServer[] = []; #actionsInFlight = new Map(); // biome-ignore lint/suspicious/noExplicitAny: Unknown subscription type @@ -121,11 +127,11 @@ export class ActorConnRaw { * @protected */ public constructor( - private client: ClientRaw, - private driver: ClientDriver, - private params: unknown, - private encodingKind: Encoding, - private actorQuery: ActorQuery, + client: ClientRaw, + driver: ClientDriver, + params: unknown, + encodingKind: Encoding, + actorQuery: ActorQuery, ) { this.#client = client; this.#driver = driver; @@ -161,28 +167,29 @@ export class ActorConnRaw { this.#actionIdCounter += 1; const { promise, resolve, reject } = - Promise.withResolvers(); + Promise.withResolvers(); this.#actionsInFlight.set(actionId, { name: opts.name, resolve, reject }); this.#sendMessage({ - b: { - ar: { - i: actionId, - n: opts.name, - a: opts.args, + body: { + tag: "ActionRequest", + val: { + id: BigInt(actionId), + name: opts.name, + args: bufferToArrayBuffer(cbor.encode(opts.args)), }, }, - } satisfies wsToServer.ToServer); + } satisfies protocol.ToServer); // TODO: Throw error if disconnect is called - const { i: responseId, o: output } = await promise; - if (responseId !== actionId) + const { id: responseId, output } = await promise; + if (responseId !== BigInt(actionId)) throw new Error( `Request ID ${actionId} does not match response ID ${responseId}`, ); - return output as Response; + return cbor.decode(new Uint8Array(output)) as Response; } /** @@ -271,7 +278,7 @@ enc ws.addEventListener("close", (ev) => { this.#handleOnClose(ev); }); - ws.addEventListener("error", (ev) => { + ws.addEventListener("error", (_ev) => { this.#handleOnError(); }); } @@ -292,7 +299,7 @@ enc eventSource.onmessage = (ev: UniversalMessageEvent) => { this.#handleOnMessage(ev.data); }; - eventSource.onerror = (ev: UniversalErrorEvent) => { + eventSource.onerror = (_ev: UniversalErrorEvent) => { if (eventSource.readyState === eventSource.CLOSED) { // This error indicates a close event this.#handleOnClose(new Event("error")); @@ -339,29 +346,32 @@ enc isArrayBuffer: data instanceof ArrayBuffer, }); - const response = (await this.#parse( - data as ConnMessage, - )) as wsToClient.ToClient; - logger().trace("parsed message", { - response: JSON.stringify(response).substring(0, 100) + "...", - }); + const response = await this.#parseMessage(data as ConnMessage); + logger().trace( + "parsed message", + getEnvUniversal("_RIVETKIT_LOG_MESSAGE") + ? { + message: jsonStringifyCompat(response).substring(0, 100) + "...", + } + : {}, + ); - if ("i" in response.b) { + if (response.body.tag === "Init") { // This is only called for SSE - this.#actorId = response.b.i.ai; - this.#connectionId = response.b.i.ci; - this.#connectionToken = response.b.i.ct; + this.#actorId = response.body.val.actorId; + this.#connectionId = response.body.val.connectionId; + this.#connectionToken = response.body.val.connectionToken; logger().trace("received init message", { actorId: this.#actorId, connectionId: this.#connectionId, }); this.#handleOnOpen(); - } else if ("e" in response.b) { + } else if (response.body.tag === "Error") { // Connection error - const { c: code, m: message, md: metadata, ai: actionId } = response.b.e; + const { code, message, metadata, actionId } = response.body.val; if (actionId) { - const inFlight = this.#takeActionInFlight(actionId); + const inFlight = this.#takeActionInFlight(Number(actionId)); logger().warn("action error", { actionId: actionId, @@ -396,28 +406,24 @@ enc // Dispatch to error handler if registered this.#dispatchActorError(actorError); } - } else if ("ar" in response.b) { + } else if (response.body.tag === "ActionResponse") { // Action response OK - const { i: actionId, o: outputType } = response.b.ar; + const { id: actionId } = response.body.val; logger().trace("received action response", { actionId, - outputType, }); - const inFlight = this.#takeActionInFlight(actionId); + const inFlight = this.#takeActionInFlight(Number(actionId)); logger().trace("resolving action promise", { actionId, actionName: inFlight?.name, }); - inFlight.resolve(response.b.ar); - } else if ("ev" in response.b) { - logger().trace("received event", { - name: response.b.ev.n, - argsCount: response.b.ev.a?.length, - }); - this.#dispatchEvent(response.b.ev); + inFlight.resolve(response.body.val); + } else if (response.body.tag === "Event") { + logger().trace("received event", { name: response.body.val.name }); + this.#dispatchEvent(response.body.val); } else { - assertUnreachable(response.b); + assertUnreachable(response.body); } } @@ -479,8 +485,9 @@ enc return inFlight; } - #dispatchEvent(event: wsToClient.Event) { - const { n: name, a: args } = event; + #dispatchEvent(event: protocol.Event) { + const { name, args: argsRaw } = event; + const args = cbor.decode(new Uint8Array(argsRaw)); const listeners = this.#eventSubscriptions.get(name); if (!listeners) return; @@ -592,7 +599,7 @@ enc }; } - #sendMessage(message: wsToServer.ToServer, opts?: SendHttpMessageOpts) { + #sendMessage(message: protocol.ToServer, opts?: SendHttpMessageOpts) { if (this.#disposed) { throw new errors.ActorConnDisposed(); } @@ -607,6 +614,7 @@ enc const messageSerialized = serializeWithEncoding( this.#encodingKind, message, + TO_SERVER_VERSIONED, ); this.#transport.websocket.send(messageSerialized); logger().trace("sent websocket message", { @@ -641,18 +649,23 @@ enc } async #sendHttpMessage( - message: wsToServer.ToServer, + message: protocol.ToServer, opts?: SendHttpMessageOpts, ) { try { if (!this.#actorId || !this.#connectionId || !this.#connectionToken) throw new errors.InternalError("Missing connection ID or token."); - logger().trace("sent http message", { - message: JSON.stringify(message).substring(0, 100) + "...", - }); + logger().trace( + "sent http message", + getEnvUniversal("_RIVETKIT_LOG_MESSAGE") + ? { + message: jsonStringifyCompat(message).substring(0, 100) + "...", + } + : {}, + ); - const res = await this.#driver.sendHttpMessage( + await this.#driver.sendHttpMessage( undefined, this.#actorId, this.#encodingKind, @@ -661,15 +674,6 @@ enc message, opts?.signal ? { signal: opts.signal } : undefined, ); - - if (!res.ok) { - throw new errors.InternalError( - `Publish message over HTTP error (${res.statusText}):\n${await res.text()}`, - ); - } - - // Dispose of the response body, we don't care about it - await res.json(); } catch (error) { // TODO: This will not automatically trigger a re-broadcast of HTTP events since SSE is separate from the HTTP action @@ -686,49 +690,30 @@ enc } } - async #parse(data: ConnMessage): Promise { - if (this.#encodingKind === "json") { - if (typeof data !== "string") { - throw new Error("received non-string for json parse"); - } - return JSON.parse(data); - } else if (this.#encodingKind === "cbor") { - if (!this.#transport) { - // Do thing - throw new Error("Cannot parse message when no transport defined"); - } else if ("sse" in this.#transport) { - // Decode base64 since SSE sends raw strings - if (typeof data === "string") { - const binaryString = atob(data); - data = new Uint8Array( - [...binaryString].map((char) => char.charCodeAt(0)), - ); - } else { - throw new errors.InternalError( - `Expected data to be a string for SSE, got ${data}.`, - ); - } - } else if ("websocket" in this.#transport) { - // Do nothing - } else { - assertUnreachable(this.#transport); - } + async #parseMessage(data: ConnMessage): Promise { + invariant(this.#transport, "transport must be defined"); - // Decode data - if (data instanceof Blob) { - return cbor.decode(new Uint8Array(await data.arrayBuffer())); - } else if (data instanceof ArrayBuffer) { - return cbor.decode(new Uint8Array(data)); - } else if (data instanceof Uint8Array) { - return cbor.decode(data); + // Decode base64 since SSE sends raw strings + if (encodingIsBinary(this.#encodingKind) && "sse" in this.#transport) { + if (typeof data === "string") { + const binaryString = atob(data); + data = new Uint8Array( + [...binaryString].map((char) => char.charCodeAt(0)), + ); } else { - throw new Error( - `received non-binary type for cbor parse: ${typeof data}`, + throw new errors.InternalError( + `Expected data to be a string for SSE, got ${data}.`, ); } - } else { - assertUnreachable(this.#encodingKind); } + + const buffer = await inputDataToBuffer(data); + + return deserializeWithEncoding( + this.#encodingKind, + buffer, + TO_CLIENT_VERSIONED, + ); } /** @@ -760,13 +745,22 @@ enc if (!this.#transport) { // Nothing to do } else if ("websocket" in this.#transport) { - const { promise, resolve } = Promise.withResolvers(); - this.#transport.websocket.addEventListener("close", () => { - logger().debug("ws closed"); - resolve(undefined); - }); - this.#transport.websocket.close(); - await promise; + const ws = this.#transport.websocket; + // Check if WebSocket is already closed or closing + if ( + ws.readyState === 2 /* CLOSING */ || + ws.readyState === 3 /* CLOSED */ + ) { + logger().debug("ws already closed or closing"); + } else { + const { promise, resolve } = Promise.withResolvers(); + ws.addEventListener("close", () => { + logger().debug("ws closed"); + resolve(undefined); + }); + ws.close(); + await promise; + } } else if ("sse" in this.#transport) { this.#transport.sse.close(); } else { @@ -778,10 +772,11 @@ enc #sendSubscription(eventName: string, subscribe: boolean) { this.#sendMessage( { - b: { - sr: { - e: eventName, - s: subscribe, + body: { + tag: "SubscriptionRequest", + val: { + eventName, + subscribe, }, }, }, diff --git a/packages/core/src/client/client.ts b/packages/core/src/client/client.ts index 38af7ae67..10da9c1a3 100644 --- a/packages/core/src/client/client.ts +++ b/packages/core/src/client/client.ts @@ -1,12 +1,12 @@ import type { Context as HonoContext } from "hono"; import type { WebSocket } from "ws"; import type { AnyActorDefinition } from "@/actor/definition"; -import type { Transport } from "@/actor/protocol/message/mod"; -import type * as wsToServer from "@/actor/protocol/message/to-server"; +import type { Transport } from "@/actor/protocol/old"; import type { Encoding } from "@/actor/protocol/serde"; import type { UniversalEventSource } from "@/common/eventsource-interface"; import type { ActorQuery } from "@/manager/protocol/query"; import type { Registry } from "@/mod"; +import type { ToServer } from "@/schemas/client-protocol/mod"; import type { ActorActionFunction } from "./actor-common"; import { type ActorConn, @@ -196,9 +196,9 @@ export interface ClientDriver { encoding: Encoding, connectionId: string, connectionToken: string, - message: wsToServer.ToServer, + message: ToServer, opts: { signal?: AbortSignal } | undefined, - ): Promise; + ): Promise; rawHttpRequest( c: HonoContext | undefined, actorQuery: ActorQuery, @@ -244,7 +244,7 @@ export class ClientRaw { public constructor(driver: ClientDriver, opts?: ClientOptions) { this.#driver = driver; - this.#encodingKind = opts?.encoding ?? "cbor"; + this.#encodingKind = opts?.encoding ?? "bare"; this[TRANSPORT_SYMBOL] = opts?.transport ?? "websocket"; } diff --git a/packages/core/src/client/http-client-driver.ts b/packages/core/src/client/http-client-driver.ts index be51ed9ee..1acf2abe5 100644 --- a/packages/core/src/client/http-client-driver.ts +++ b/packages/core/src/client/http-client-driver.ts @@ -1,9 +1,6 @@ +import * as cbor from "cbor-x"; import type { Context as HonoContext } from "hono"; import type { WebSocket } from "ws"; -import type { ActionRequest } from "@/actor/protocol/http/action"; -import type * as protoHttpResolve from "@/actor/protocol/http/resolve"; -import type { ActionResponse } from "@/actor/protocol/message/to-client"; -import type * as wsToServer from "@/actor/protocol/message/to-server"; import type { Encoding } from "@/actor/protocol/serde"; import { HEADER_ACTOR_ID, @@ -17,17 +14,26 @@ import { importEventSource } from "@/common/eventsource"; import type { UniversalEventSource } from "@/common/eventsource-interface"; import { importWebSocket } from "@/common/websocket"; import type { ActorQuery } from "@/manager/protocol/query"; -import { assertUnreachable, httpUserAgent } from "@/utils"; +import type * as protocol from "@/schemas/client-protocol/mod"; +import { + HTTP_ACTION_REQUEST_VERSIONED, + HTTP_ACTION_RESPONSE_VERSIONED, + HTTP_RESOLVE_REQUEST_VERSIONED, + HTTP_RESOLVE_RESPONSE_VERSIONED, + TO_SERVER_VERSIONED, +} from "@/schemas/client-protocol/versioned"; +import { serializeWithEncoding, wsBinaryTypeForEncoding } from "@/serde"; +import { assertUnreachable, bufferToArrayBuffer, httpUserAgent } from "@/utils"; import type { ClientDriver } from "./client"; import * as errors from "./errors"; import { logger } from "./log"; -import { sendHttpRequest, serializeWithEncoding } from "./utils"; +import { sendHttpRequest } from "./utils"; /** * Client driver that communicates with the manager via HTTP. */ export function createHttpClientDriver(managerEndpoint: string): ClientDriver { - // Lazily import the dynamic imports so we don't have to turn `createClient` in to an aysnc fn + // Lazily import the dynamic imports so we don't have to turn `createClient` in to an async fn const dynamicImports = (async () => { // Import dynamic dependencies const [WebSocket, EventSource] = await Promise.all([ @@ -56,24 +62,29 @@ export function createHttpClientDriver(managerEndpoint: string): ClientDriver { query: actorQuery, }); - const responseData = await sendHttpRequest( - { - url: `${managerEndpoint}/registry/actors/actions/${encodeURIComponent(name)}`, - method: "POST", - headers: { - [HEADER_ENCODING]: encoding, - [HEADER_ACTOR_QUERY]: JSON.stringify(actorQuery), - ...(params !== undefined - ? { [HEADER_CONN_PARAMS]: JSON.stringify(params) } - : {}), - }, - body: { a: args } satisfies ActionRequest, - encoding: encoding, - signal: opts?.signal, + const responseData = await sendHttpRequest< + protocol.HttpActionRequest, + protocol.HttpActionResponse + >({ + url: `${managerEndpoint}/registry/actors/actions/${encodeURIComponent(name)}`, + method: "POST", + headers: { + [HEADER_ENCODING]: encoding, + [HEADER_ACTOR_QUERY]: JSON.stringify(actorQuery), + ...(params !== undefined + ? { [HEADER_CONN_PARAMS]: JSON.stringify(params) } + : {}), }, - ); + body: { + args: bufferToArrayBuffer(cbor.encode(args)), + } satisfies protocol.HttpActionRequest, + encoding: encoding, + signal: opts?.signal, + requestVersionedDataHandler: HTTP_ACTION_REQUEST_VERSIONED, + responseVersionedDataHandler: HTTP_ACTION_RESPONSE_VERSIONED, + }); - return responseData.o as Response; + return cbor.decode(new Uint8Array(responseData.output)); }, resolveActorId: async ( @@ -86,8 +97,8 @@ export function createHttpClientDriver(managerEndpoint: string): ClientDriver { try { const result = await sendHttpRequest< - Record, - protoHttpResolve.ResolveResponse + null, + protocol.HttpResolveResponse >({ url: `${managerEndpoint}/registry/actors/resolve`, method: "POST", @@ -98,12 +109,14 @@ export function createHttpClientDriver(managerEndpoint: string): ClientDriver { ? { [HEADER_CONN_PARAMS]: JSON.stringify(params) } : {}), }, - body: {}, + body: null, encoding: encodingKind, + requestVersionedDataHandler: HTTP_RESOLVE_REQUEST_VERSIONED, + responseVersionedDataHandler: HTTP_RESOLVE_RESPONSE_VERSIONED, }); - logger().debug("resolved actor ID", { actorId: result.i }); - return result.i; + logger().debug("resolved actor ID", { actorId: result.actorId }); + return result.actorId; } catch (error) { logger().error("failed to resolve actor ID", { error }); if (error instanceof errors.ActorError) { @@ -144,16 +157,10 @@ export function createHttpClientDriver(managerEndpoint: string): ClientDriver { logger().debug("connecting to websocket", { url }); const ws = new WebSocket(url, protocol); - if (encodingKind === "cbor") { - ws.binaryType = "arraybuffer"; - } else if (encodingKind === "json") { - // HACK: Bun bug prevents changing binary type, so we ignore the error https://github.com/oven-sh/bun/issues/17005 - try { - ws.binaryType = "blob" as any; - } catch (error) {} - } else { - assertUnreachable(encodingKind); - } + // HACK: Bun bug prevents changing binary type, so we ignore the error https://github.com/oven-sh/bun/issues/17005 + try { + ws.binaryType = wsBinaryTypeForEncoding(encodingKind); + } catch (error) {} // Node & web WebSocket types not compatible return ws as any; @@ -197,11 +204,15 @@ export function createHttpClientDriver(managerEndpoint: string): ClientDriver { encoding: Encoding, connectionId: string, connectionToken: string, - message: wsToServer.ToServer, - ): Promise => { + message: protocol.ToServer, + ): Promise => { // TODO: Implement ordered messages, this is not guaranteed order. Needs to use an index in order to ensure we can pipeline requests efficiently. // TODO: Validate that we're using HTTP/3 whenever possible for pipelining requests - const messageSerialized = serializeWithEncoding(encoding, message); + const messageSerialized = serializeWithEncoding( + encoding, + message, + TO_SERVER_VERSIONED, + ); const res = await fetch(`${managerEndpoint}/registry/actors/message`, { method: "POST", headers: { @@ -214,7 +225,14 @@ export function createHttpClientDriver(managerEndpoint: string): ClientDriver { body: messageSerialized, credentials: "include", }); - return res; + if (!res.ok) { + throw new errors.InternalError( + `Publish message over HTTP error (${res.statusText}):\n${await res.text()}`, + ); + } + + // Discard response + await res.body?.cancel(); }, rawHttpRequest: async ( diff --git a/packages/core/src/client/mod.ts b/packages/core/src/client/mod.ts index 3b48eba17..5fd4e4257 100644 --- a/packages/core/src/client/mod.ts +++ b/packages/core/src/client/mod.ts @@ -10,7 +10,7 @@ export { ActorDefinition, AnyActorDefinition, } from "@/actor/definition"; -export type { Transport } from "@/actor/protocol/message/mod"; +export type { Transport } from "@/actor/protocol/old"; export type { Encoding } from "@/actor/protocol/serde"; export { ActorClientError, diff --git a/packages/core/src/client/raw-utils.ts b/packages/core/src/client/raw-utils.ts index 6907d44de..dad4458fd 100644 --- a/packages/core/src/client/raw-utils.ts +++ b/packages/core/src/client/raw-utils.ts @@ -1,5 +1,3 @@ -import type { Encoding } from "@/actor/protocol/serde"; -import { assertUnreachable } from "@/common/utils"; import type { ActorQuery } from "@/manager/protocol/query"; import type { ClientDriver } from "./client"; diff --git a/packages/core/src/client/utils.ts b/packages/core/src/client/utils.ts index f1282b985..cdde39304 100644 --- a/packages/core/src/client/utils.ts +++ b/packages/core/src/client/utils.ts @@ -1,7 +1,15 @@ +import type { VersionedDataHandler } from "@rivetkit/versioned-data-util"; import * as cbor from "cbor-x"; -import type { ResponseError } from "@/actor/protocol/http/error"; +import invariant from "invariant"; import { assertUnreachable } from "@/common/utils"; import type { Encoding } from "@/mod"; +import type { HttpResponseError } from "@/schemas/client-protocol/mod"; +import { HTTP_RESPONSE_ERROR_VERSIONED } from "@/schemas/client-protocol/versioned"; +import { + contentTypeForEncoding, + deserializeWithEncoding, + serializeWithEncoding, +} from "@/serde"; import { httpUserAgent } from "@/utils"; import { ActorError, HttpRequestError } from "./errors"; import { logger } from "./log"; @@ -24,21 +32,23 @@ export function messageLength(message: WebSocketMessage): number { assertUnreachable(message); } -export interface HttpRequestOpts { +export interface HttpRequestOpts { method: string; url: string; headers: Record; - body?: Body; + body?: RequestBody; encoding: Encoding; skipParseResponse?: boolean; signal?: AbortSignal; customFetch?: (req: Request) => Promise; + requestVersionedDataHandler: VersionedDataHandler; + responseVersionedDataHandler: VersionedDataHandler; } export async function sendHttpRequest< RequestBody = unknown, ResponseBody = unknown, ->(opts: HttpRequestOpts): Promise { +>(opts: HttpRequestOpts): Promise { logger().debug("sending http request", { url: opts.url, encoding: opts.encoding, @@ -46,17 +56,15 @@ export async function sendHttpRequest< // Serialize body let contentType: string | undefined; - let bodyData: string | Buffer | undefined; + let bodyData: string | Uint8Array | undefined; if (opts.method === "POST" || opts.method === "PUT") { - if (opts.encoding === "json") { - contentType = "application/json"; - bodyData = JSON.stringify(opts.body); - } else if (opts.encoding === "cbor") { - contentType = "application/octet-stream"; - bodyData = cbor.encode(opts.body); - } else { - assertUnreachable(opts.encoding); - } + invariant(opts.body !== undefined, "missing body"); + contentType = contentTypeForEncoding(opts.encoding); + bodyData = serializeWithEncoding( + opts.encoding, + opts.body, + opts.requestVersionedDataHandler, + ); } // Send request @@ -90,17 +98,13 @@ export async function sendHttpRequest< if (!response.ok) { // Attempt to parse structured data const bufferResponse = await response.arrayBuffer(); - let responseData: ResponseError; + let responseData: HttpResponseError; try { - if (opts.encoding === "json") { - const textResponse = new TextDecoder().decode(bufferResponse); - responseData = JSON.parse(textResponse); - } else if (opts.encoding === "cbor") { - const uint8Array = new Uint8Array(bufferResponse); - responseData = cbor.decode(uint8Array); - } else { - assertUnreachable(opts.encoding); - } + responseData = deserializeWithEncoding( + opts.encoding, + new Uint8Array(bufferResponse), + HTTP_RESPONSE_ERROR_VERSIONED, + ); } catch (error) { //logger().warn("failed to cleanly parse error, this is likely because a non-structured response is being served", { // error: stringifyError(error), @@ -116,7 +120,13 @@ export async function sendHttpRequest< } // Throw structured error - throw new ActorError(responseData.c, responseData.m, responseData.md); + throw new ActorError( + responseData.code, + responseData.message, + responseData.metadata + ? cbor.decode(new Uint8Array(responseData.metadata)) + : undefined, + ); } // Some requests don't need the success response to be parsed, so this can speed things up @@ -125,35 +135,16 @@ export async function sendHttpRequest< } // Parse the response based on encoding - let responseBody: ResponseBody; try { - if (opts.encoding === "json") { - responseBody = (await response.json()) as ResponseBody; - } else if (opts.encoding === "cbor") { - const buffer = await response.arrayBuffer(); - const uint8Array = new Uint8Array(buffer); - responseBody = cbor.decode(uint8Array); - } else { - assertUnreachable(opts.encoding); - } + const buffer = new Uint8Array(await response.arrayBuffer()); + return deserializeWithEncoding( + opts.encoding, + buffer, + opts.responseVersionedDataHandler, + ); } catch (error) { throw new HttpRequestError(`Failed to parse response: ${error}`, { cause: error, }); } - - return responseBody; -} - -export function serializeWithEncoding( - encoding: Encoding, - value: unknown, -): WebSocketMessage { - if (encoding === "json") { - return JSON.stringify(value); - } else if (encoding === "cbor") { - return cbor.encode(value); - } else { - assertUnreachable(encoding); - } } diff --git a/packages/core/src/common/logfmt.ts b/packages/core/src/common/logfmt.ts index 07ce239ca..799347690 100644 --- a/packages/core/src/common/logfmt.ts +++ b/packages/core/src/common/logfmt.ts @@ -1,7 +1,7 @@ import { type LogLevel, LogLevels } from "./log-levels"; export type LogEntry = [string, LogValue]; -export type LogValue = string | number | boolean | null | undefined; +export type LogValue = string | number | bigint | boolean | null | undefined; const LOG_LEVEL_COLORS: Record = { [LogLevels.CRITICAL]: "\x1b[31m", // Red @@ -108,6 +108,7 @@ export function castToLogValue(v: unknown): LogValue { if ( typeof v === "string" || typeof v === "number" || + typeof v === "bigint" || typeof v === "boolean" || v === null || v === undefined diff --git a/packages/core/src/common/router.ts b/packages/core/src/common/router.ts index b53775228..4e8227820 100644 --- a/packages/core/src/common/router.ts +++ b/packages/core/src/common/router.ts @@ -1,10 +1,14 @@ +import * as cbor from "cbor-x"; import type { Context as HonoContext, Next } from "hono"; -import type { ResponseError } from "@/actor/protocol/http/error"; -import { type Encoding, serialize } from "@/actor/protocol/serde"; +import type { Encoding } from "@/actor/protocol/serde"; import { getRequestEncoding, getRequestExposeInternalError, } from "@/actor/router-endpoints"; +import { HttpResponseError } from "@/schemas/client-protocol/mod"; +import { HTTP_RESPONSE_ERROR_VERSIONED } from "@/schemas/client-protocol/versioned"; +import { serializeWithEncoding } from "@/serde"; +import { bufferToArrayBuffer } from "@/utils"; import { getLogger, type Logger } from "./log"; import { deconstructError, stringifyError } from "./utils"; @@ -69,13 +73,14 @@ export function handleRouteError( encoding = "json"; } - const output = serialize( - { - c: code, - m: message, - md: metadata, - } satisfies ResponseError, + const output = serializeWithEncoding( encoding, + { + code, + message, + metadata: bufferToArrayBuffer(cbor.encode(metadata)), + }, + HTTP_RESPONSE_ERROR_VERSIONED, ); return c.body(output, { status: statusCode }); diff --git a/packages/core/src/driver-test-suite/mod.ts b/packages/core/src/driver-test-suite/mod.ts index d0c2228b2..c3885700b 100644 --- a/packages/core/src/driver-test-suite/mod.ts +++ b/packages/core/src/driver-test-suite/mod.ts @@ -86,8 +86,6 @@ export function runDriverTests( runActorDriverTests(driverTestConfig); runManagerDriverTests(driverTestConfig); - // TODO: Add back SSE once fixed in Rivet driver & CF lifecycle - // for (const transport of ["websocket", "sse"] as Transport[]) { for (const transport of ["websocket", "sse"] as Transport[]) { describe(`transport (${transport})`, () => { runActorConnTests({ diff --git a/packages/core/src/driver-test-suite/test-inline-client-driver.ts b/packages/core/src/driver-test-suite/test-inline-client-driver.ts index bf148b6fe..1bf27bc0b 100644 --- a/packages/core/src/driver-test-suite/test-inline-client-driver.ts +++ b/packages/core/src/driver-test-suite/test-inline-client-driver.ts @@ -1,7 +1,6 @@ import * as cbor from "cbor-x"; import type { Context as HonoContext } from "hono"; import type { WebSocket } from "ws"; -import type * as wsToServer from "@/actor/protocol/message/to-server"; import type { Encoding } from "@/actor/protocol/serde"; import { HEADER_ACTOR_QUERY, @@ -19,6 +18,7 @@ import type { TestInlineDriverCallRequest, TestInlineDriverCallResponse, } from "@/manager/router"; +import type * as protocol from "@/schemas/client-protocol/mod"; import { logger } from "./log"; /** @@ -30,7 +30,7 @@ export function createTestInlineClientDriver( ): ClientDriver { return { action: async = unknown[], Response = unknown>( - c: HonoContext | undefined, + _c: HonoContext | undefined, actorQuery: ActorQuery, encoding: Encoding, params: unknown, @@ -47,7 +47,7 @@ export function createTestInlineClientDriver( }, resolveActorId: async ( - c: HonoContext | undefined, + _c: HonoContext | undefined, actorQuery: ActorQuery, encodingKind: Encoding, params: unknown, @@ -62,7 +62,7 @@ export function createTestInlineClientDriver( }, connectWebSocket: async ( - c: HonoContext | undefined, + _c: HonoContext | undefined, actorQuery: ActorQuery, encodingKind: Encoding, params: unknown, @@ -100,7 +100,7 @@ export function createTestInlineClientDriver( }, connectSse: async ( - c: HonoContext | undefined, + _c: HonoContext | undefined, actorQuery: ActorQuery, encodingKind: Encoding, params: unknown, @@ -163,13 +163,13 @@ export function createTestInlineClientDriver( }, sendHttpMessage: async ( - c: HonoContext | undefined, + _c: HonoContext | undefined, actorId: string, encoding: Encoding, connectionId: string, connectionToken: string, - message: wsToServer.ToServer, - ): Promise => { + message: protocol.ToServer, + ): Promise => { logger().debug("sending http message via test inline driver", { actorId, encoding, @@ -204,16 +204,12 @@ export function createTestInlineClientDriver( throw new Error(`Failed to send HTTP message: ${result.statusText}`); } - // Need to create a Response object from the proxy response - return new Response(await result.text(), { - status: result.status, - statusText: result.statusText, - headers: result.headers, - }); + // Discard response + await result.body?.cancel(); }, rawHttpRequest: async ( - c: HonoContext | undefined, + _c: HonoContext | undefined, actorQuery: ActorQuery, encoding: Encoding, params: unknown, diff --git a/packages/core/src/driver-test-suite/tests/action-features.ts b/packages/core/src/driver-test-suite/tests/action-features.ts index 43fb6e24b..2262106a9 100644 --- a/packages/core/src/driver-test-suite/tests/action-features.ts +++ b/packages/core/src/driver-test-suite/tests/action-features.ts @@ -6,7 +6,7 @@ import { setupDriverTest } from "../utils"; export function runActionFeaturesTests(driverTestConfig: DriverTestConfig) { describe("Action Features", () => { // TODO: These do not work with fake timers - describe.skip("Action Timeouts", () => { + describe("Action Timeouts", () => { const usesFakeTimers = !driverTestConfig.useRealTimers; test("should timeout actions that exceed the configured timeout", async (c) => { diff --git a/packages/core/src/drivers/engine/actor-driver.ts b/packages/core/src/drivers/engine/actor-driver.ts index 484d49eac..603ea31d9 100644 --- a/packages/core/src/drivers/engine/actor-driver.ts +++ b/packages/core/src/drivers/engine/actor-driver.ts @@ -349,7 +349,7 @@ export class EngineActorDriver implements ActorDriver { }); } - sleep(actorId: string) { + async sleep(actorId: string) { this.#runner.sleepActor(actorId); } diff --git a/packages/core/src/drivers/file-system/actor.ts b/packages/core/src/drivers/file-system/actor.ts index 683fb9df6..f52532d6b 100644 --- a/packages/core/src/drivers/file-system/actor.ts +++ b/packages/core/src/drivers/file-system/actor.ts @@ -1,4 +1,5 @@ import type { GenericConnGlobalState } from "@/actor/generic-conn-driver"; +import { logger } from "@/actor/log"; import type { AnyClient } from "@/client/client"; import type { ActorDriver, @@ -79,7 +80,7 @@ export class FileSystemActorDriver implements ActorDriver { return this.#state.createDatabase(actorId); } - sleep(actorId: string): void { - this.#state.sleepActor(actorId); + sleep(actorId: string): Promise { + return this.#state.sleepActor(actorId); } } diff --git a/packages/core/src/inline-client-driver/mod.ts b/packages/core/src/inline-client-driver/mod.ts index 213ca5d78..0b0de0a86 100644 --- a/packages/core/src/inline-client-driver/mod.ts +++ b/packages/core/src/inline-client-driver/mod.ts @@ -1,13 +1,9 @@ +import * as cbor from "cbor-x"; import type { Context as HonoContext } from "hono"; import invariant from "invariant"; import onChange from "on-change"; import type { WebSocket } from "ws"; import * as errors from "@/actor/errors"; -import type { - ActionRequest, - ActionResponse, -} from "@/actor/protocol/http/action"; -import type * as wsToServer from "@/actor/protocol/message/to-server"; import type { Encoding } from "@/actor/protocol/serde"; import { PATH_CONNECT_WEBSOCKET, @@ -30,7 +26,14 @@ import { deconstructError } from "@/common/utils"; import type { ManagerDriver } from "@/manager/driver"; import type { ActorQuery } from "@/manager/protocol/query"; import type { RunConfig } from "@/mod"; -import { httpUserAgent } from "@/utils"; +import type * as protocol from "@/schemas/client-protocol/mod"; +import { + HTTP_ACTION_REQUEST_VERSIONED, + HTTP_ACTION_RESPONSE_VERSIONED, + TO_CLIENT_VERSIONED, + TO_SERVER_VERSIONED, +} from "@/schemas/client-protocol/versioned"; +import { bufferToArrayBuffer, httpUserAgent } from "@/utils"; import { logger } from "./log"; /** @@ -66,8 +69,8 @@ export function createInlineClientDriver( // Invoke the action logger().debug("handling action", { actionName, encoding }); const responseData = await sendHttpRequest< - ActionRequest, - ActionResponse + protocol.HttpActionRequest, + protocol.HttpActionResponse >({ url: `http://actor/action/${encodeURIComponent(actionName)}`, method: "POST", @@ -78,13 +81,17 @@ export function createInlineClientDriver( : {}), [HEADER_EXPOSE_INTERNAL_ERROR]: "true", }, - body: { a: args } satisfies ActionRequest, + body: { + args: bufferToArrayBuffer(cbor.encode(args)), + } satisfies protocol.HttpActionRequest, encoding: encoding, customFetch: managerDriver.sendRequest.bind(managerDriver, actorId), signal: opts?.signal, + requestVersionedDataHandler: HTTP_ACTION_REQUEST_VERSIONED, + responseVersionedDataHandler: HTTP_ACTION_RESPONSE_VERSIONED, }); - return responseData.o as Response; + return cbor.decode(new Uint8Array(responseData.output)); } catch (err) { // Standardize to ClientActorError instead of the native backend error const { code, message, metadata } = deconstructError( @@ -181,12 +188,12 @@ export function createInlineClientDriver( encoding: Encoding, connectionId: string, connectionToken: string, - message: wsToServer.ToServer, - ): Promise => { + message: protocol.ToServer, + ): Promise => { logger().debug("sending http message", { actorId, connectionId }); // Send an HTTP request to the connections endpoint - return sendHttpRequest({ + await sendHttpRequest({ url: "http://actor/connections/message", method: "POST", headers: { @@ -199,6 +206,8 @@ export function createInlineClientDriver( encoding, skipParseResponse: true, customFetch: managerDriver.sendRequest.bind(managerDriver, actorId), + requestVersionedDataHandler: TO_SERVER_VERSIONED, + responseVersionedDataHandler: TO_CLIENT_VERSIONED, }); }, diff --git a/packages/core/src/manager/protocol/mod.ts b/packages/core/src/manager/protocol/mod.ts index 6b4d1e993..1eaa9be27 100644 --- a/packages/core/src/manager/protocol/mod.ts +++ b/packages/core/src/manager/protocol/mod.ts @@ -1,5 +1,5 @@ import { z } from "zod"; -import { TransportSchema } from "@/actor/protocol/message/mod"; +import { TransportSchema } from "@/actor/protocol/old"; import { ActorQuerySchema } from "./query"; export * from "./query"; diff --git a/packages/core/src/manager/router.ts b/packages/core/src/manager/router.ts index 9587b751b..528b0f91f 100644 --- a/packages/core/src/manager/router.ts +++ b/packages/core/src/manager/router.ts @@ -12,10 +12,8 @@ import invariant from "invariant"; import type { CloseEvent, MessageEvent, WebSocket } from "ws"; import { z } from "zod"; import * as errors from "@/actor/errors"; -import type * as protoHttpResolve from "@/actor/protocol/http/resolve"; -import type { Transport } from "@/actor/protocol/message/mod"; -import type { ToClient } from "@/actor/protocol/message/to-client"; -import { type Encoding, serialize } from "@/actor/protocol/serde"; +import type { Transport } from "@/actor/protocol/old"; +import type { Encoding } from "@/actor/protocol/serde"; import { PATH_CONNECT_WEBSOCKET, PATH_RAW_WEBSOCKET_PREFIX, @@ -49,7 +47,13 @@ import { secureInspector } from "@/inspector/utils"; import type { UpgradeWebSocketArgs } from "@/mod"; import type { RegistryConfig } from "@/registry/config"; import type { RunConfig } from "@/registry/run-config"; -import { VERSION } from "@/utils"; +import type * as protocol from "@/schemas/client-protocol/mod"; +import { + HTTP_RESOLVE_RESPONSE_VERSIONED, + TO_CLIENT_VERSIONED, +} from "@/schemas/client-protocol/versioned"; +import { serializeWithEncoding } from "@/serde"; +import { bufferToArrayBuffer } from "@/utils"; import { authenticateEndpoint } from "./auth"; import type { ManagerDriver } from "./driver"; import { logger } from "./log"; @@ -1104,7 +1108,7 @@ async function createTestWebSocketProxy( async function handleSseConnectRequest( c: HonoContext, registryConfig: RegistryConfig, - runConfig: RunConfig, + _runConfig: RunConfig, driver: ManagerDriver, ): Promise { let encoding: Encoding | undefined; @@ -1177,18 +1181,24 @@ async function handleSseConnectRequest( try { if (encoding) { // Serialize and send the connection error - const errorMsg: ToClient = { - b: { - e: { - c: code, - m: message, - md: metadata, + const errorMsg: protocol.ToClient = { + body: { + tag: "Error", + val: { + code, + message, + metadata: bufferToArrayBuffer(cbor.encode(metadata)), + actionId: null, }, }, }; // Send the error message to the client - const serialized = serialize(errorMsg, encoding); + const serialized = serializeWithEncoding( + encoding, + errorMsg, + TO_CLIENT_VERSIONED, + ); await stream.writeSSE({ data: typeof serialized === "string" @@ -1332,18 +1342,24 @@ async function handleWebSocketConnectRequest( if (encoding) { try { // Serialize and send the connection error - const errorMsg: ToClient = { - b: { - e: { - c: code, - m: message, - md: metadata, + const errorMsg: protocol.ToClient = { + body: { + tag: "Error", + val: { + code, + message, + metadata: bufferToArrayBuffer(cbor.encode(metadata)), + actionId: null, }, }, }; // Send the error message to the client - const serialized = serialize(errorMsg, encoding); + const serialized = serializeWithEncoding( + encoding, + errorMsg, + TO_CLIENT_VERSIONED, + ); ws.send(serialized); // Close the connection with an error code @@ -1371,8 +1387,8 @@ async function handleWebSocketConnectRequest( */ async function handleMessageRequest( c: HonoContext, - registryConfig: RegistryConfig, - runConfig: RunConfig, + _registryConfig: RegistryConfig, + _runConfig: RunConfig, driver: ManagerDriver, ): Promise { logger().debug("connection message request received"); @@ -1441,7 +1457,7 @@ async function handleMessageRequest( async function handleActionRequest( c: HonoContext, registryConfig: RegistryConfig, - runConfig: RunConfig, + _runConfig: RunConfig, driver: ManagerDriver, ): Promise { try { @@ -1552,10 +1568,14 @@ async function handleResolveRequest( invariant(actorId, "Missing actor ID"); // Format response according to protocol - const response: protoHttpResolve.ResolveResponse = { - i: actorId, + const response: protocol.HttpResolveResponse = { + actorId, }; - const serialized = serialize(response, encoding); + const serialized = serializeWithEncoding( + encoding, + response, + HTTP_RESOLVE_RESPONSE_VERSIONED, + ); return c.body(serialized); } @@ -1565,7 +1585,7 @@ async function handleResolveRequest( async function handleRawHttpRequest( c: HonoContext, registryConfig: RegistryConfig, - runConfig: RunConfig, + _runConfig: RunConfig, driver: ManagerDriver, ): Promise { try { @@ -1742,7 +1762,7 @@ function universalActorProxy({ runConfig: RunConfig; driver: ManagerDriver; }): MiddlewareHandler { - return async (c, next) => { + return async (c, _next) => { if (c.req.header("upgrade") === "websocket") { return handleRawWebSocketRequest(c, registryConfig, runConfig, driver); } else { diff --git a/packages/core/src/schemas/client-protocol/mod.ts b/packages/core/src/schemas/client-protocol/mod.ts new file mode 100644 index 000000000..a1841fee9 --- /dev/null +++ b/packages/core/src/schemas/client-protocol/mod.ts @@ -0,0 +1 @@ +export * from "../../../dist/schemas/client-protocol/v1"; diff --git a/packages/core/src/schemas/client-protocol/versioned.ts b/packages/core/src/schemas/client-protocol/versioned.ts new file mode 100644 index 000000000..e258d6376 --- /dev/null +++ b/packages/core/src/schemas/client-protocol/versioned.ts @@ -0,0 +1,63 @@ +import { + createVersionedDataHandler, + type MigrationFn, +} from "@rivetkit/versioned-data-util"; +import * as v1 from "../../../dist/schemas/client-protocol/v1"; + +export const CURRENT_VERSION = 1; + +const migrations = new Map>(); + +export const TO_SERVER_VERSIONED = createVersionedDataHandler({ + currentVersion: CURRENT_VERSION, + migrations, + serializeVersion: (data) => v1.encodeToServer(data), + deserializeVersion: (bytes) => v1.decodeToServer(bytes), +}); + +export const TO_CLIENT_VERSIONED = createVersionedDataHandler({ + currentVersion: CURRENT_VERSION, + migrations, + serializeVersion: (data) => v1.encodeToClient(data), + deserializeVersion: (bytes) => v1.decodeToClient(bytes), +}); + +export const HTTP_ACTION_REQUEST_VERSIONED = + createVersionedDataHandler({ + currentVersion: CURRENT_VERSION, + migrations, + serializeVersion: (data) => v1.encodeHttpActionRequest(data), + deserializeVersion: (bytes) => v1.decodeHttpActionRequest(bytes), + }); + +export const HTTP_ACTION_RESPONSE_VERSIONED = + createVersionedDataHandler({ + currentVersion: CURRENT_VERSION, + migrations, + serializeVersion: (data) => v1.encodeHttpActionResponse(data), + deserializeVersion: (bytes) => v1.decodeHttpActionResponse(bytes), + }); + +export const HTTP_RESPONSE_ERROR_VERSIONED = + createVersionedDataHandler({ + currentVersion: CURRENT_VERSION, + migrations, + serializeVersion: (data) => v1.encodeHttpResponseError(data), + deserializeVersion: (bytes) => v1.decodeHttpResponseError(bytes), + }); + +export const HTTP_RESOLVE_REQUEST_VERSIONED = + createVersionedDataHandler({ + currentVersion: CURRENT_VERSION, + migrations, + serializeVersion: (_) => new Uint8Array(), + deserializeVersion: (bytes) => null, + }); + +export const HTTP_RESOLVE_RESPONSE_VERSIONED = + createVersionedDataHandler({ + currentVersion: CURRENT_VERSION, + migrations, + serializeVersion: (data) => v1.encodeHttpResolveResponse(data), + deserializeVersion: (bytes) => v1.decodeHttpResolveResponse(bytes), + }); diff --git a/packages/core/src/schemas/file-system-driver/mod.ts b/packages/core/src/schemas/file-system-driver/mod.ts new file mode 100644 index 000000000..c7431b7f3 --- /dev/null +++ b/packages/core/src/schemas/file-system-driver/mod.ts @@ -0,0 +1 @@ +export * from "../../../dist/schemas/file-system-driver/v1"; diff --git a/packages/core/src/schemas/file-system-driver/versioned.ts b/packages/core/src/schemas/file-system-driver/versioned.ts new file mode 100644 index 000000000..9cdef6429 --- /dev/null +++ b/packages/core/src/schemas/file-system-driver/versioned.ts @@ -0,0 +1,28 @@ +import { + createVersionedDataHandler, + type MigrationFn, +} from "@rivetkit/versioned-data-util"; +import * as v1 from "../../../dist/schemas/file-system-driver/v1"; + +export const CURRENT_VERSION = 1; + +export type CurrentActorState = v1.ActorState; +export type CurrentActorAlarm = v1.ActorAlarm; + +const migrations = new Map>(); + +export const ACTOR_STATE_VERSIONED = + createVersionedDataHandler({ + currentVersion: CURRENT_VERSION, + migrations, + serializeVersion: (data) => v1.encodeActorState(data), + deserializeVersion: (bytes) => v1.decodeActorState(bytes), + }); + +export const ACTOR_ALARM_VERSIONED = + createVersionedDataHandler({ + currentVersion: CURRENT_VERSION, + migrations, + serializeVersion: (data) => v1.encodeActorAlarm(data), + deserializeVersion: (bytes) => v1.decodeActorAlarm(bytes), + }); diff --git a/packages/core/src/serde.ts b/packages/core/src/serde.ts new file mode 100644 index 000000000..016479993 --- /dev/null +++ b/packages/core/src/serde.ts @@ -0,0 +1,84 @@ +import type { VersionedDataHandler } from "@rivetkit/versioned-data-util"; +import * as cbor from "cbor-x"; +import invariant from "invariant"; +import { assertUnreachable } from "@/common/utils"; +import type { Encoding } from "@/mod"; +import { jsonStringifyCompat } from "./actor/protocol/serde"; + +export function encodingIsBinary(encoding: Encoding): boolean { + if (encoding === "json") { + return false; + } else if (encoding === "cbor" || encoding === "bare") { + return true; + } else { + assertUnreachable(encoding); + } +} + +export function contentTypeForEncoding(encoding: Encoding): string { + if (encoding === "json") { + return "application/json"; + } else if (encoding === "cbor" || encoding === "bare") { + return "application/octet-stream"; + } else { + assertUnreachable(encoding); + } +} + +export function wsBinaryTypeForEncoding( + encoding: Encoding, +): "arraybuffer" | "blob" { + if (encoding === "json") { + return "blob"; + } else if (encoding === "cbor" || encoding === "bare") { + return "arraybuffer"; + } else { + assertUnreachable(encoding); + } +} + +export function serializeWithEncoding( + encoding: Encoding, + value: T, + versionedDataHandler: VersionedDataHandler, +): Uint8Array | string { + if (encoding === "json") { + return jsonStringifyCompat(value); + } else if (encoding === "cbor") { + return cbor.encode(value); + } else if (encoding === "bare") { + return versionedDataHandler.serializeWithEmbeddedVersion(value); + } else { + assertUnreachable(encoding); + } +} + +export function deserializeWithEncoding( + encoding: Encoding, + buffer: Uint8Array | string, + versionedDataHandler: VersionedDataHandler, +): T { + if (encoding === "json") { + if (typeof buffer === "string") { + return JSON.parse(buffer); + } else { + const decoder = new TextDecoder("utf-8"); + const jsonString = decoder.decode(buffer); + return JSON.parse(jsonString); + } + } else if (encoding === "cbor") { + invariant( + typeof buffer !== "string", + "buffer cannot be string for cbor encoding", + ); + return cbor.decode(buffer); + } else if (encoding === "bare") { + invariant( + typeof buffer !== "string", + "buffer cannot be string for bare encoding", + ); + return versionedDataHandler.deserializeWithEmbeddedVersion(buffer); + } else { + assertUnreachable(encoding); + } +} diff --git a/packages/core/src/utils.ts b/packages/core/src/utils.ts index bb79214d6..1350cd668 100644 --- a/packages/core/src/utils.ts +++ b/packages/core/src/utils.ts @@ -163,3 +163,10 @@ export class SinglePromiseQueue { } } } + +export function bufferToArrayBuffer(buf: Buffer): ArrayBuffer { + return buf.buffer.slice( + buf.byteOffset, + buf.byteOffset + buf.byteLength, + ) as ArrayBuffer; +} diff --git a/packages/core/tsconfig.json b/packages/core/tsconfig.json index 820cb0f5b..ed7fd572f 100644 --- a/packages/core/tsconfig.json +++ b/packages/core/tsconfig.json @@ -12,6 +12,7 @@ "src/**/*", "tests/**/*", "scripts/**/*", - "fixtures/driver-test-suite/**/*" + "fixtures/driver-test-suite/**/*", + "dist/schemas/**/*" ] } diff --git a/packages/core/tsup.config.ts b/packages/core/tsup.config.ts index b935a0a8e..d371df1d5 100644 --- a/packages/core/tsup.config.ts +++ b/packages/core/tsup.config.ts @@ -1,4 +1,7 @@ import { defineConfig } from "tsup"; import defaultConfig from "../../tsup.base.ts"; -export default defineConfig(defaultConfig); +export default defineConfig({ + ...defaultConfig, + outDir: "dist/tsup/", +}); diff --git a/packages/core/turbo.json b/packages/core/turbo.json index 16060d55a..b2ec927c1 100644 --- a/packages/core/turbo.json +++ b/packages/core/turbo.json @@ -5,14 +5,23 @@ "dump-openapi": { "inputs": ["package.json", "packages/rivetkit/src/manager/router.ts"] }, + "build:schema": { + "dependsOn": ["^build"], + "inputs": ["schemas/**/*.bare"], + "outputs": ["dist/schemas/**/*.ts"] + }, "build": { - "dependsOn": ["^build", "dump-openapi"], + "dependsOn": ["^build", "dump-openapi", "build:schema"], "inputs": ["src/**", "tsconfig.json", "tsup.config.ts", "package.json"], "outputs": ["dist/**"] }, "test": { "dependsOn": ["^test", "build"], "inputs": ["src/**", "tests/**", "fixtures/**"] + }, + "check-types": { + "dependsOn": ["^build", "build:schema"], + "inputs": ["src/**", "schemas/**/*.bare", "tsconfig.json"] } } } diff --git a/packages/misc/bare-compiler/package.json b/packages/misc/bare-compiler/package.json new file mode 100644 index 000000000..634366bdf --- /dev/null +++ b/packages/misc/bare-compiler/package.json @@ -0,0 +1,36 @@ +{ + "name": "@rivetkit/bare-compiler", + "version": "0.0.1", + "private": true, + "type": "module", + "description": "BARE schema compiler for RivetKit", + "keywords": [ + "rivetkit", + "bare", + "compiler", + "serialization", + "binary" + ], + "repository": { + "type": "git", + "url": "https://github.com/rivet-gg/rivetkit.git", + "directory": "packages/misc/bare-compiler" + }, + "license": "Apache-2.0", + "bin": { + "bare-compiler": "./dist/cli.js" + }, + "scripts": { + "build": "tsc", + "check-types": "tsc --noEmit" + }, + "dependencies": { + "@bare-ts/tools": "^0.13.0", + "commander": "^12.0.0" + }, + "devDependencies": { + "@types/node": "^20.17.10", + "tsx": "^4.19.2", + "typescript": "^5.7.2" + } +} diff --git a/packages/misc/bare-compiler/src/cli.ts b/packages/misc/bare-compiler/src/cli.ts new file mode 100644 index 000000000..2217d99cc --- /dev/null +++ b/packages/misc/bare-compiler/src/cli.ts @@ -0,0 +1,44 @@ +#!/usr/bin/env node + +import { Command } from "commander"; +import * as path from "path"; +import { compileSchema } from "./compile.js"; + +const program = new Command(); + +program + .name("bare-compiler") + .description("Compile BARE schemas to TypeScript") + .version("0.0.1"); + +program + .command("compile") + .description("Compile a BARE schema file") + .argument("", "Input BARE schema file") + .option("-o, --output ", "Output file path") + .option("--pedantic", "Enable pedantic mode", false) + .option("--generator ", "Generator type (ts, js, dts, bare)", "ts") + .action(async (input: string, options) => { + try { + const schemaPath = path.resolve(input); + const outputPath = options.output + ? path.resolve(options.output) + : schemaPath.replace(/\.bare$/, ".ts"); + + await compileSchema({ + schemaPath, + outputPath, + config: { + pedantic: options.pedantic, + generator: options.generator, + }, + }); + + console.log(`Successfully compiled ${input} to ${outputPath}`); + } catch (error) { + console.error("Failed to compile schema:", error); + process.exit(1); + } + }); + +program.parse(); diff --git a/packages/misc/bare-compiler/src/compile.ts b/packages/misc/bare-compiler/src/compile.ts new file mode 100644 index 000000000..bfdce788a --- /dev/null +++ b/packages/misc/bare-compiler/src/compile.ts @@ -0,0 +1,30 @@ +import { type Config, transform } from "@bare-ts/tools"; +import * as fs from "fs/promises"; +import * as path from "path"; + +export interface CompileOptions { + schemaPath: string; + outputPath: string; + config?: Partial; +} + +export async function compileSchema(options: CompileOptions): Promise { + const { schemaPath, outputPath, config = {} } = options; + + const schema = await fs.readFile(schemaPath, "utf-8"); + const outputDir = path.dirname(outputPath); + + await fs.mkdir(outputDir, { recursive: true }); + + const defaultConfig: Partial = { + pedantic: true, + generator: "ts", + ...config, + }; + + const result = transform(schema, defaultConfig); + + await fs.writeFile(outputPath, result); +} + +export { type Config, transform } from "@bare-ts/tools"; diff --git a/packages/misc/bare-compiler/tsconfig.json b/packages/misc/bare-compiler/tsconfig.json new file mode 100644 index 000000000..0bf25dd7f --- /dev/null +++ b/packages/misc/bare-compiler/tsconfig.json @@ -0,0 +1,11 @@ +{ + "extends": "../../../tsconfig.base.json", + "compilerOptions": { + "rootDir": "./src", + "outDir": "./dist", + "emitDeclarationOnly": false, + "noEmit": false, + "declaration": true + }, + "include": ["src/**/*"] +} diff --git a/packages/misc/bare-compiler/turbo.json b/packages/misc/bare-compiler/turbo.json new file mode 100644 index 000000000..95960709b --- /dev/null +++ b/packages/misc/bare-compiler/turbo.json @@ -0,0 +1,4 @@ +{ + "$schema": "https://turbo.build/schema.json", + "extends": ["//"] +} diff --git a/packages/misc/versioned-data-util/package.json b/packages/misc/versioned-data-util/package.json new file mode 100644 index 000000000..bbd847c45 --- /dev/null +++ b/packages/misc/versioned-data-util/package.json @@ -0,0 +1,44 @@ +{ + "name": "@rivetkit/versioned-data-util", + "version": "0.0.1", + "type": "module", + "description": "Versioned data serialization utilities for RivetKit", + "keywords": [ + "rivetkit", + "versioning", + "serialization", + "migration", + "bare" + ], + "repository": { + "type": "git", + "url": "https://github.com/rivet-gg/rivetkit.git", + "directory": "packages/misc/versioned-data-util" + }, + "license": "Apache-2.0", + "exports": { + ".": { + "import": { + "types": "./dist/mod.d.ts", + "default": "./dist/mod.js" + }, + "require": { + "types": "./dist/mod.d.cts", + "default": "./dist/mod.cjs" + } + } + }, + "scripts": { + "build": "tsup src/mod.ts", + "test": "vitest", + "test:run": "vitest run", + "check-types": "tsc --noEmit" + }, + "devDependencies": { + "@types/node": "^20.17.10", + "tsup": "^8.5.0", + "tsx": "^4.19.2", + "typescript": "^5.7.2", + "vitest": "^2.0.0" + } +} diff --git a/packages/misc/versioned-data-util/src/mod.ts b/packages/misc/versioned-data-util/src/mod.ts new file mode 100644 index 000000000..800c962af --- /dev/null +++ b/packages/misc/versioned-data-util/src/mod.ts @@ -0,0 +1,95 @@ +export interface VersionedData { + version: number; + data: T; +} + +export type MigrationFn = (data: TFrom) => TTo; + +export interface VersionedDataConfig { + currentVersion: number; + migrations: Map>; + serializeVersion: (data: T) => Uint8Array; + deserializeVersion: (bytes: Uint8Array) => T; +} + +export class VersionedDataHandler { + constructor(private config: VersionedDataConfig) {} + + serializeWithEmbeddedVersion(data: T): Uint8Array { + const versioned: VersionedData = { + version: this.config.currentVersion, + data: this.config.serializeVersion(data), + }; + + return this.embedVersion(versioned); + } + + deserializeWithEmbeddedVersion(bytes: Uint8Array): T { + const versioned = this.extractVersion(bytes); + return this.deserialize(versioned.data, versioned.version); + } + + serialize(data: T, version: number): Uint8Array { + return this.config.serializeVersion(data); + } + + deserialize(bytes: Uint8Array, version: number): T { + if (version === this.config.currentVersion) { + return this.config.deserializeVersion(bytes); + } + + if (version > this.config.currentVersion) { + throw new Error( + `Cannot decode data from version ${version}, current version is ${this.config.currentVersion}`, + ); + } + + let currentData: any = this.config.deserializeVersion(bytes); + let currentVersion = version; + + while (currentVersion < this.config.currentVersion) { + const migration = this.config.migrations.get(currentVersion); + if (!migration) { + throw new Error( + `No migration found from version ${currentVersion} to ${currentVersion + 1}`, + ); + } + + currentData = migration(currentData); + currentVersion++; + } + + return currentData; + } + + private embedVersion(data: VersionedData): Uint8Array { + const versionBytes = new Uint8Array(4); + new DataView(versionBytes.buffer).setUint32(0, data.version, true); + + const result = new Uint8Array(versionBytes.length + data.data.length); + result.set(versionBytes); + result.set(data.data, versionBytes.length); + + return result; + } + + private extractVersion(bytes: Uint8Array): VersionedData { + if (bytes.length < 4) { + throw new Error("Invalid versioned data: too short"); + } + + const version = new DataView(bytes.buffer, bytes.byteOffset).getUint32( + 0, + true, + ); + const data = bytes.slice(4); + + return { version, data }; + } +} + +export function createVersionedDataHandler( + config: VersionedDataConfig, +): VersionedDataHandler { + return new VersionedDataHandler(config); +} diff --git a/packages/misc/versioned-data-util/tests/versioned.test.ts b/packages/misc/versioned-data-util/tests/versioned.test.ts new file mode 100644 index 000000000..4c3c510df --- /dev/null +++ b/packages/misc/versioned-data-util/tests/versioned.test.ts @@ -0,0 +1,175 @@ +import { describe, expect, it } from "vitest"; +import { createVersionedDataHandler, type MigrationFn } from "../src/index"; + +interface V1Data { + name: string; +} + +interface V2Data { + name: string; + age: number; +} + +interface V3Data { + firstName: string; + lastName: string; + age: number; +} + +describe("VersionedDataHandler", () => { + it("should encode and decode current version data", () => { + const HANDLER = createVersionedDataHandler({ + currentVersion: 1, + migrations: new Map(), + serializeVersion: (data) => + new TextEncoder().encode(JSON.stringify(data)), + deserializeVersion: (bytes) => + JSON.parse(new TextDecoder().decode(bytes)), + }); + + const original: V1Data = { name: "John" }; + const encoded = HANDLER.serializeWithEmbeddedVersion(original); + const decoded = HANDLER.deserializeWithEmbeddedVersion(encoded); + + expect(decoded).toEqual(original); + }); + + it("should migrate from v1 to v2", () => { + const v1to2: MigrationFn = (data) => ({ + name: data.name, + age: 0, + }); + + const HANDLER = createVersionedDataHandler({ + currentVersion: 2, + migrations: new Map([[1, v1to2]]), + serializeVersion: (data) => + new TextEncoder().encode(JSON.stringify(data)), + deserializeVersion: (bytes) => + JSON.parse(new TextDecoder().decode(bytes)), + }); + + const V1_HANDLER = createVersionedDataHandler({ + currentVersion: 1, + migrations: new Map(), + serializeVersion: (data) => + new TextEncoder().encode(JSON.stringify(data)), + deserializeVersion: (bytes) => + JSON.parse(new TextDecoder().decode(bytes)), + }); + + const v1Data: V1Data = { name: "John" }; + const v1Encoded = V1_HANDLER.serializeWithEmbeddedVersion(v1Data); + + const v2Decoded = HANDLER.deserializeWithEmbeddedVersion(v1Encoded); + expect(v2Decoded).toEqual({ name: "John", age: 0 }); + }); + + it("should migrate through multiple versions", () => { + const v1to2: MigrationFn = (data) => ({ + name: data.name, + age: 25, + }); + + const v2to3: MigrationFn = (data) => { + const [firstName, ...lastParts] = data.name.split(" "); + return { + firstName, + lastName: lastParts.join(" ") || "", + age: data.age, + }; + }; + + const HANDLER = createVersionedDataHandler({ + currentVersion: 3, + migrations: new Map>([ + [1, v1to2], + [2, v2to3], + ]), + serializeVersion: (data) => + new TextEncoder().encode(JSON.stringify(data)), + deserializeVersion: (bytes) => + JSON.parse(new TextDecoder().decode(bytes)), + }); + + const V1_HANDLER = createVersionedDataHandler({ + currentVersion: 1, + migrations: new Map(), + serializeVersion: (data) => + new TextEncoder().encode(JSON.stringify(data)), + deserializeVersion: (bytes) => + JSON.parse(new TextDecoder().decode(bytes)), + }); + + const v1Data: V1Data = { name: "John Doe" }; + const v1Encoded = V1_HANDLER.serializeWithEmbeddedVersion(v1Data); + + const v3Decoded = HANDLER.deserializeWithEmbeddedVersion(v1Encoded); + expect(v3Decoded).toEqual({ + firstName: "John", + lastName: "Doe", + age: 25, + }); + }); + + it("should throw error for future version", () => { + const HANDLER = createVersionedDataHandler({ + currentVersion: 1, + migrations: new Map(), + serializeVersion: (data) => + new TextEncoder().encode(JSON.stringify(data)), + deserializeVersion: (bytes) => + JSON.parse(new TextDecoder().decode(bytes)), + }); + + const futureVersionBytes = new Uint8Array([ + 2, + 0, + 0, + 0, + ...new TextEncoder().encode('{"name":"test"}'), + ]); + + expect(() => + HANDLER.deserializeWithEmbeddedVersion(futureVersionBytes), + ).toThrow("Cannot decode data from version 2, current version is 1"); + }); + + it("should throw error for missing migration", () => { + const HANDLER = createVersionedDataHandler({ + currentVersion: 3, + migrations: new Map([[2, (data: any) => data]]), + serializeVersion: (data) => + new TextEncoder().encode(JSON.stringify(data)), + deserializeVersion: (bytes) => + JSON.parse(new TextDecoder().decode(bytes)), + }); + + const v1Bytes = new Uint8Array([ + 1, + 0, + 0, + 0, + ...new TextEncoder().encode('{"name":"test"}'), + ]); + + expect(() => HANDLER.deserializeWithEmbeddedVersion(v1Bytes)).toThrow( + "No migration found from version 1 to 2", + ); + }); + + it("should handle binary data correctly", () => { + const HANDLER = createVersionedDataHandler({ + currentVersion: 1, + migrations: new Map(), + serializeVersion: (data) => data, + deserializeVersion: (bytes) => bytes, + }); + + const original = new Uint8Array([1, 2, 3, 4, 5]); + const encoded = HANDLER.serializeWithEmbeddedVersion(original); + const decoded = HANDLER.deserializeWithEmbeddedVersion(encoded); + + expect(decoded).toEqual(original); + }); +}); diff --git a/packages/misc/versioned-data-util/tsconfig.json b/packages/misc/versioned-data-util/tsconfig.json new file mode 100644 index 000000000..24543fd6e --- /dev/null +++ b/packages/misc/versioned-data-util/tsconfig.json @@ -0,0 +1,11 @@ +{ + "extends": "../../../tsconfig.base.json", + "compilerOptions": { + "rootDir": ".", + "outDir": "./dist", + "emitDeclarationOnly": false, + "noEmit": false, + "declaration": true + }, + "include": ["src/**/*", "tests/**/*"] +} diff --git a/packages/misc/versioned-data-util/tsup.config.ts b/packages/misc/versioned-data-util/tsup.config.ts new file mode 100644 index 000000000..f363b829f --- /dev/null +++ b/packages/misc/versioned-data-util/tsup.config.ts @@ -0,0 +1,4 @@ +import { defineConfig } from "tsup"; +import defaultConfig from "../../../tsup.base.ts"; + +export default defineConfig(defaultConfig); diff --git a/packages/misc/versioned-data-util/turbo.json b/packages/misc/versioned-data-util/turbo.json new file mode 100644 index 000000000..95960709b --- /dev/null +++ b/packages/misc/versioned-data-util/turbo.json @@ -0,0 +1,4 @@ +{ + "$schema": "https://turbo.build/schema.json", + "extends": ["//"] +} diff --git a/packages/misc/versioned-data-util/vitest.config.ts b/packages/misc/versioned-data-util/vitest.config.ts new file mode 100644 index 000000000..2fb5c48d9 --- /dev/null +++ b/packages/misc/versioned-data-util/vitest.config.ts @@ -0,0 +1,8 @@ +import { defineConfig } from "vitest/config"; + +export default defineConfig({ + test: { + globals: true, + environment: "node", + }, +}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 868d6ca50..8502d2839 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -43,6 +43,9 @@ importers: semver: specifier: ^7.7.1 version: 7.7.2 + tsup: + specifier: ^8.4.0 + version: 8.5.0(@microsoft/api-extractor@7.52.8(@types/node@24.0.4))(postcss@8.5.6)(tsx@4.20.3)(typescript@5.8.3)(yaml@2.8.0) tsx: specifier: ^4.20.3 version: 4.20.3 @@ -958,6 +961,9 @@ importers: packages/core: dependencies: + '@bare-ts/lib': + specifier: ~0.3.0 + version: 0.3.0 '@hono/standard-validator': specifier: ^0.1.3 version: 0.1.4(@standard-schema/spec@1.0.0)(hono@4.8.3) @@ -970,6 +976,9 @@ importers: '@rivetkit/fast-json-patch': specifier: ^3.1.2 version: 3.1.2 + '@rivetkit/versioned-data-util': + specifier: workspace:* + version: link:../misc/versioned-data-util cbor-x: specifier: ^1.6.0 version: 1.6.0 @@ -1001,6 +1010,9 @@ importers: '@rivet-gg/actor-core': specifier: ^25.1.0 version: 25.2.0 + '@rivetkit/bare-compiler': + specifier: workspace:* + version: link:../misc/bare-compiler '@types/invariant': specifier: ^2 version: 2.2.37 @@ -1187,6 +1199,25 @@ importers: specifier: ^5.5.2 version: 5.8.3 + packages/misc/bare-compiler: + dependencies: + '@bare-ts/tools': + specifier: ^0.13.0 + version: 0.13.0(@bare-ts/lib@0.4.0) + commander: + specifier: ^12.0.0 + version: 12.1.0 + devDependencies: + '@types/node': + specifier: ^20.17.10 + version: 20.19.9 + tsx: + specifier: ^4.19.2 + version: 4.20.3 + typescript: + specifier: ^5.7.2 + version: 5.8.3 + packages/misc/sql-loader: devDependencies: '@types/node': @@ -1196,6 +1227,24 @@ importers: specifier: ^8.5.0 version: 8.5.0(@microsoft/api-extractor@7.52.8(@types/node@22.15.32))(postcss@8.5.6)(tsx@4.20.3)(typescript@5.8.3)(yaml@2.8.0) + packages/misc/versioned-data-util: + devDependencies: + '@types/node': + specifier: ^20.17.10 + version: 20.19.9 + tsup: + specifier: ^8.5.0 + version: 8.5.0(@microsoft/api-extractor@7.52.8(@types/node@20.19.9))(postcss@8.5.6)(tsx@4.20.3)(typescript@5.8.3)(yaml@2.8.0) + tsx: + specifier: ^4.19.2 + version: 4.20.3 + typescript: + specifier: ^5.7.2 + version: 5.8.3 + vitest: + specifier: ^2.0.0 + version: 2.1.9(@types/node@20.19.9) + packages/rivetkit: {} packages: @@ -1337,10 +1386,21 @@ packages: resolution: {integrity: sha512-ETyHEk2VHHvl9b9jZP5IHPavHYk57EhanlRRuae9XCpb/j5bDCbPPMOBfCWhnl/7EDJz0jEMCi/RhccCE8r1+Q==} engines: {node: '>=6.9.0'} + '@bare-ts/lib@0.3.0': + resolution: {integrity: sha512-0JlIu0R26CTMdEnmmKIuFvAS9Cd554MvH4BjI6iL+74TDYBCHhaCWIDJXU6+VuYPu0+/JDLV/KXQstK9Sxn+9A==} + engines: {node: '>= 12'} + '@bare-ts/lib@0.4.0': resolution: {integrity: sha512-uTb12lcDkvwwwGb/4atNy/+2Xuksx88tlxKuwnDuxmsOFVo4R/WsE+fmAf2nWtzHDi7m6NDw8Ke9j+XlNBLoJg==} engines: {node: ^14.18.0 || >=16.0.0} + '@bare-ts/tools@0.13.0': + resolution: {integrity: sha512-53fjq3/B3HeFJG6FygJbbdAoqvVnexdWjoi2O95p4karcyXUzest+AJ4ruf9ppk4bl28Jn0jkVegsJgeNZt4OQ==} + engines: {node: ^14.18.0 || >=16.0.0} + hasBin: true + peerDependencies: + '@bare-ts/lib': ~0.3.0 + '@better-auth/utils@0.2.5': resolution: {integrity: sha512-uI2+/8h/zVsH8RrYdG8eUErbuGBk16rZKQfz8CjxQOyCE6v7BqFYEbFwvOkvl1KbUdxhqOnXp78+uE5h8qVEgQ==} @@ -2771,9 +2831,23 @@ packages: peerDependencies: vite: ^4.2.0 || ^5.0.0 || ^6.0.0 || ^7.0.0-beta.0 + '@vitest/expect@2.1.9': + resolution: {integrity: sha512-UJCIkTBenHeKT1TTlKMJWy1laZewsRIzYighyYiJKZreqtdxSos/S1t+ktRMQWu2CKqaarrkeszJx1cgC5tGZw==} + '@vitest/expect@3.2.4': resolution: {integrity: sha512-Io0yyORnB6sikFlt8QW5K7slY4OjqNX9jmJQ02QDda8lyM6B5oNgVWoSoKPac8/kgnCUzuHQKrSLtu/uOqqrig==} + '@vitest/mocker@2.1.9': + resolution: {integrity: sha512-tVL6uJgoUdi6icpxmdrn5YNo3g3Dxv+IHJBr0GXHaEdTcw3F+cPKnsXFhli6nO+f/6SDKPHEK1UN+k+TQv0Ehg==} + peerDependencies: + msw: ^2.4.9 + vite: ^5.0.0 + peerDependenciesMeta: + msw: + optional: true + vite: + optional: true + '@vitest/mocker@3.2.4': resolution: {integrity: sha512-46ryTE9RZO/rfDd7pEqFl7etuyzekzEhUbTW3BvmeO/BcCMEgq59BKhek3dXDWgAj4oMK6OZi+vRr1wPW6qjEQ==} peerDependencies: @@ -2785,18 +2859,30 @@ packages: vite: optional: true + '@vitest/pretty-format@2.1.9': + resolution: {integrity: sha512-KhRIdGV2U9HOUzxfiHmY8IFHTdqtOhIzCpd8WRdJiE7D/HUcZVD0EgQCVjm+Q9gkUXWgBvMmTtZgIG48wq7sOQ==} + '@vitest/pretty-format@3.1.1': resolution: {integrity: sha512-dg0CIzNx+hMMYfNmSqJlLSXEmnNhMswcn3sXO7Tpldr0LiGmg3eXdLLhwkv2ZqgHb/d5xg5F7ezNFRA1fA13yA==} '@vitest/pretty-format@3.2.4': resolution: {integrity: sha512-IVNZik8IVRJRTr9fxlitMKeJeXFFFN0JaB9PHPGQ8NKQbGpfjlTx9zO4RefN8gp7eqjNy8nyK3NZmBzOPeIxtA==} + '@vitest/runner@2.1.9': + resolution: {integrity: sha512-ZXSSqTFIrzduD63btIfEyOmNcBmQvgOVsPNPe0jYtESiXkhd8u2erDLnMxmGrDCwHCCHE7hxwRDCT3pt0esT4g==} + '@vitest/runner@3.2.4': resolution: {integrity: sha512-oukfKT9Mk41LreEW09vt45f8wx7DordoWUZMYdY/cyAk7w5TWkTRCNZYF7sX7n2wB7jyGAl74OxgwhPgKaqDMQ==} + '@vitest/snapshot@2.1.9': + resolution: {integrity: sha512-oBO82rEjsxLNJincVhLhaxxZdEtV0EFHMK5Kmx5sJ6H9L183dHECjiefOAdnqpIgT5eZwT04PoggUnW88vOBNQ==} + '@vitest/snapshot@3.2.4': resolution: {integrity: sha512-dEYtS7qQP2CjU27QBC5oUOxLE/v5eLkGqPE0ZKEIDGMs4vKWe7IjgLOeauHsR0D5YuuycGRO5oSRXnwnmA78fQ==} + '@vitest/spy@2.1.9': + resolution: {integrity: sha512-E1B35FwzXXTs9FHNK6bDszs7mtydNi5MIfUWpceJ8Xbfb1gBMscAnwLbEu+B44ed6W3XjL9/ehLPHR1fkf1KLQ==} + '@vitest/spy@3.2.4': resolution: {integrity: sha512-vAfasCOe6AIK70iP5UD11Ac4siNUNJ9i/9PZ3NKx07sG6sUxeag1LWdNrMWeKKYBLlzuK+Gn65Yd5nyL6ds+nw==} @@ -2805,6 +2891,9 @@ packages: peerDependencies: vitest: 3.1.1 + '@vitest/utils@2.1.9': + resolution: {integrity: sha512-v0psaMSkNJ3A2NMrUEHFRzJtDPFn+/VWZ5WxImB21T9fjucJRmS7xCS3ppEnARb9y11OAzaD+P2Ps+b+BGX5iQ==} + '@vitest/utils@3.1.1': resolution: {integrity: sha512-1XIjflyaU2k3HMArJ50bwSh3wKWPD6Q47wz/NUSmRV0zNywPc4w79ARjg/i/aNINHwA+mIALhUVqD9/aUvZNgg==} @@ -3047,6 +3136,14 @@ packages: resolution: {integrity: sha512-1rXeuUUiGGrykh+CeBdu5Ie7OJwinCgQY0bc7GCRxy5xVHy+moaqkpL/jqQq0MtQOeYcrqEz4abc5f0KtU7W4A==} engines: {node: '>=12.5.0'} + commander@10.0.0: + resolution: {integrity: sha512-zS5PnTI22FIRM6ylNW8G4Ap0IEOyk62fhLSD0+uHRT9McRCLGpkVNvao4bjimpK/GShynyQkFFxHhwMcETmduA==} + engines: {node: '>=14'} + + commander@12.1.0: + resolution: {integrity: sha512-Vw8qHK3bZM9y/P10u3Vib8o/DdkvA2OtPtZvD871QKjy74Wj1WSKFILMPRPSdUSx5RFK1arlJzEtA4PkFgnbuA==} + engines: {node: '>=18'} + commander@14.0.0: resolution: {integrity: sha512-2uM9rYjPvyq39NwLRqaiLtWHyDC1FvryJDa2ATTVims5YAS4PupsEQsDvP14FqhFr0P49CYDugi59xaxJlTXRA==} engines: {node: '>=20'} @@ -3915,6 +4012,9 @@ packages: resolution: {integrity: sha512-TdrF7fW9Rphjq4RjrW0Kp2AW0Ahwu9sRGTkS6bvDi0SCwZlEZYmcfDbEsTz8RVk0EHIS/Vd1bv3JhG+1xZuAyQ==} engines: {node: '>=16'} + pathe@1.1.2: + resolution: {integrity: sha512-whLdWMYL2TwI08hn8/ZqAbrVemu0LNaNNJZX73O6qaIdCTfXutsLhMkjdENX0qhsQ9uIimo4/aQOmXkoon2nDQ==} + pathe@2.0.3: resolution: {integrity: sha512-WUjGcAqP1gQacoQe+OBJsFA7Ld4DyXuUIjZ5cc75cLHvJ7dtNsTugphxIADwspS+AraAUePCKrSVtPLFj/F88w==} @@ -4338,10 +4438,18 @@ packages: resolution: {integrity: sha512-Zba82s87IFq9A9XmjiX5uZA/ARWDrB03OHlq+Vw1fSdt0I+4/Kutwy8BP4Y/y/aORMo61FQ0vIb5j44vSo5Pkg==} engines: {node: ^18.0.0 || >=20.0.0} + tinyrainbow@1.2.0: + resolution: {integrity: sha512-weEDEq7Z5eTHPDh4xjX789+fHfF+P8boiFB+0vbWzpbnbsEr/GRaohi/uMKxg8RZMXnl1ItAi/IUHWMsjDV7kQ==} + engines: {node: '>=14.0.0'} + tinyrainbow@2.0.0: resolution: {integrity: sha512-op4nsTR47R6p0vMUUoYl/a+ljLFVtlfaXkLQmqfLR1qHma1h/ysYk4hEXZ880bf2CYgTskvTa/e196Vd5dDQXw==} engines: {node: '>=14.0.0'} + tinyspy@3.0.2: + resolution: {integrity: sha512-n1cw8k1k0x4pgA2+9XrOkFydTerNcJ1zWCO5Nn9scWHTD+5tp8dghT2x1uduQePZTZgd3Tupf+x9BxJjeJi77Q==} + engines: {node: '>=14.0.0'} + tinyspy@4.0.3: resolution: {integrity: sha512-t2T/WLB2WRgZ9EpE4jgPJ9w+i66UZfDc8wHh0xrwiRNN+UwH98GIJkTeZqX9rg0i0ptwzqW+uYeIF0T4F8LR7A==} engines: {node: '>=14.0.0'} @@ -4506,6 +4614,11 @@ packages: resolution: {integrity: sha512-BNGbWLfd0eUPabhkXUVm0j8uuvREyTh5ovRa/dyow/BqAbZJyC+5fU+IzQOzmAKzYqYRAISoRhdQr3eIZ/PXqg==} engines: {node: '>= 0.8'} + vite-node@2.1.9: + resolution: {integrity: sha512-AM9aQ/IPrW/6ENLQg3AGY4K1N2TGZdR5e4gu/MmmR2xR3Ll1+dib+nook92g4TV3PXVyeyxdWwtaCAiUL0hMxA==} + engines: {node: ^18.0.0 || >=20.0.0} + hasBin: true + vite-node@3.2.4: resolution: {integrity: sha512-EbKSKh+bh1E1IFxeO0pg1n4dvoOTt0UDiXMd/qn++r98+jPO1xtJilvXldeuQ8giIB5IkpjCgMleHMNEsGH6pg==} engines: {node: ^18.0.0 || ^20.0.0 || >=22.0.0} @@ -4591,6 +4704,31 @@ packages: yaml: optional: true + vitest@2.1.9: + resolution: {integrity: sha512-MSmPM9REYqDGBI8439mA4mWhV5sKmDlBKWIYbA3lRb2PTHACE0mgKwA8yQ2xq9vxDTuk4iPrECBAEW2aoFXY0Q==} + engines: {node: ^18.0.0 || >=20.0.0} + hasBin: true + peerDependencies: + '@edge-runtime/vm': '*' + '@types/node': ^18.0.0 || >=20.0.0 + '@vitest/browser': 2.1.9 + '@vitest/ui': 2.1.9 + happy-dom: '*' + jsdom: '*' + peerDependenciesMeta: + '@edge-runtime/vm': + optional: true + '@types/node': + optional: true + '@vitest/browser': + optional: true + '@vitest/ui': + optional: true + happy-dom: + optional: true + jsdom: + optional: true + vitest@3.2.4: resolution: {integrity: sha512-LUCP5ev3GURDysTWiP47wRRUpLKMOfPh+yKTx3kVIEiu5KOMeqzpnYNsKyOoVrULivR8tLcks4+lga33Whn90A==} engines: {node: ^18.0.0 || ^20.0.0 || >=22.0.0} @@ -4914,8 +5052,15 @@ snapshots: '@babel/helper-string-parser': 7.27.1 '@babel/helper-validator-identifier': 7.27.1 + '@bare-ts/lib@0.3.0': {} + '@bare-ts/lib@0.4.0': {} + '@bare-ts/tools@0.13.0(@bare-ts/lib@0.4.0)': + dependencies: + '@bare-ts/lib': 0.4.0 + commander: 10.0.0 + '@better-auth/utils@0.2.5': dependencies: typescript: 5.8.3 @@ -5560,6 +5705,15 @@ snapshots: '@levischuck/tiny-cbor@0.2.11': {} + '@microsoft/api-extractor-model@7.30.6(@types/node@20.19.9)': + dependencies: + '@microsoft/tsdoc': 0.15.1 + '@microsoft/tsdoc-config': 0.17.1 + '@rushstack/node-core-library': 5.13.1(@types/node@20.19.9) + transitivePeerDependencies: + - '@types/node' + optional: true + '@microsoft/api-extractor-model@7.30.6(@types/node@22.15.32)': dependencies: '@microsoft/tsdoc': 0.15.1 @@ -5577,6 +5731,25 @@ snapshots: transitivePeerDependencies: - '@types/node' + '@microsoft/api-extractor@7.52.8(@types/node@20.19.9)': + dependencies: + '@microsoft/api-extractor-model': 7.30.6(@types/node@20.19.9) + '@microsoft/tsdoc': 0.15.1 + '@microsoft/tsdoc-config': 0.17.1 + '@rushstack/node-core-library': 5.13.1(@types/node@20.19.9) + '@rushstack/rig-package': 0.5.3 + '@rushstack/terminal': 0.15.3(@types/node@20.19.9) + '@rushstack/ts-command-line': 5.0.1(@types/node@20.19.9) + lodash: 4.17.21 + minimatch: 3.0.8 + resolve: 1.22.10 + semver: 7.5.4 + source-map: 0.6.1 + typescript: 5.8.2 + transitivePeerDependencies: + - '@types/node' + optional: true + '@microsoft/api-extractor@7.52.8(@types/node@22.15.32)': dependencies: '@microsoft/api-extractor-model': 7.30.6(@types/node@22.15.32) @@ -5787,6 +5960,20 @@ snapshots: '@rollup/rollup-win32-x64-msvc@4.44.0': optional: true + '@rushstack/node-core-library@5.13.1(@types/node@20.19.9)': + dependencies: + ajv: 8.13.0 + ajv-draft-04: 1.0.0(ajv@8.13.0) + ajv-formats: 3.0.1(ajv@8.13.0) + fs-extra: 11.3.0 + import-lazy: 4.0.0 + jju: 1.4.0 + resolve: 1.22.10 + semver: 7.5.4 + optionalDependencies: + '@types/node': 20.19.9 + optional: true + '@rushstack/node-core-library@5.13.1(@types/node@22.15.32)': dependencies: ajv: 8.13.0 @@ -5819,6 +6006,14 @@ snapshots: resolve: 1.22.10 strip-json-comments: 3.1.1 + '@rushstack/terminal@0.15.3(@types/node@20.19.9)': + dependencies: + '@rushstack/node-core-library': 5.13.1(@types/node@20.19.9) + supports-color: 8.1.1 + optionalDependencies: + '@types/node': 20.19.9 + optional: true + '@rushstack/terminal@0.15.3(@types/node@22.15.32)': dependencies: '@rushstack/node-core-library': 5.13.1(@types/node@22.15.32) @@ -5834,6 +6029,16 @@ snapshots: optionalDependencies: '@types/node': 24.0.4 + '@rushstack/ts-command-line@5.0.1(@types/node@20.19.9)': + dependencies: + '@rushstack/terminal': 0.15.3(@types/node@20.19.9) + '@types/argparse': 1.0.38 + argparse: 1.0.10 + string-argv: 0.3.2 + transitivePeerDependencies: + - '@types/node' + optional: true + '@rushstack/ts-command-line@5.0.1(@types/node@22.15.32)': dependencies: '@rushstack/terminal': 0.15.3(@types/node@22.15.32) @@ -6061,6 +6266,13 @@ snapshots: transitivePeerDependencies: - supports-color + '@vitest/expect@2.1.9': + dependencies: + '@vitest/spy': 2.1.9 + '@vitest/utils': 2.1.9 + chai: 5.2.0 + tinyrainbow: 1.2.0 + '@vitest/expect@3.2.4': dependencies: '@types/chai': 5.2.2 @@ -6069,6 +6281,14 @@ snapshots: chai: 5.2.0 tinyrainbow: 2.0.0 + '@vitest/mocker@2.1.9(vite@5.4.19(@types/node@20.19.9))': + dependencies: + '@vitest/spy': 2.1.9 + estree-walker: 3.0.3 + magic-string: 0.30.17 + optionalDependencies: + vite: 5.4.19(@types/node@20.19.9) + '@vitest/mocker@3.2.4(vite@6.3.5(@types/node@20.19.9)(tsx@4.20.3)(yaml@2.8.0))': dependencies: '@vitest/spy': 3.2.4 @@ -6101,6 +6321,10 @@ snapshots: optionalDependencies: vite: 6.3.5(@types/node@24.0.4)(tsx@4.20.3)(yaml@2.8.0) + '@vitest/pretty-format@2.1.9': + dependencies: + tinyrainbow: 1.2.0 + '@vitest/pretty-format@3.1.1': dependencies: tinyrainbow: 2.0.0 @@ -6109,18 +6333,33 @@ snapshots: dependencies: tinyrainbow: 2.0.0 + '@vitest/runner@2.1.9': + dependencies: + '@vitest/utils': 2.1.9 + pathe: 1.1.2 + '@vitest/runner@3.2.4': dependencies: '@vitest/utils': 3.2.4 pathe: 2.0.3 strip-literal: 3.0.0 + '@vitest/snapshot@2.1.9': + dependencies: + '@vitest/pretty-format': 2.1.9 + magic-string: 0.30.17 + pathe: 1.1.2 + '@vitest/snapshot@3.2.4': dependencies: '@vitest/pretty-format': 3.2.4 magic-string: 0.30.17 pathe: 2.0.3 + '@vitest/spy@2.1.9': + dependencies: + tinyspy: 3.0.2 + '@vitest/spy@3.2.4': dependencies: tinyspy: 4.0.3 @@ -6136,6 +6375,12 @@ snapshots: tinyrainbow: 2.0.0 vitest: 3.2.4(@types/node@22.15.32)(@vitest/ui@3.1.1)(tsx@4.20.3)(yaml@2.8.0) + '@vitest/utils@2.1.9': + dependencies: + '@vitest/pretty-format': 2.1.9 + loupe: 3.1.4 + tinyrainbow: 1.2.0 + '@vitest/utils@3.1.1': dependencies: '@vitest/pretty-format': 3.1.1 @@ -6432,6 +6677,10 @@ snapshots: color-convert: 2.0.1 color-string: 1.9.1 + commander@10.0.0: {} + + commander@12.1.0: {} + commander@14.0.0: {} commander@4.1.1: {} @@ -7217,6 +7466,8 @@ snapshots: path-to-regexp@8.2.0: {} + pathe@1.1.2: {} + pathe@2.0.3: {} pathval@2.0.0: {} @@ -7714,8 +7965,12 @@ snapshots: tinypool@1.1.1: {} + tinyrainbow@1.2.0: {} + tinyrainbow@2.0.0: {} + tinyspy@3.0.2: {} + tinyspy@4.0.3: {} toidentifier@1.0.1: {} @@ -7738,6 +7993,35 @@ snapshots: tslib@2.8.1: {} + tsup@8.5.0(@microsoft/api-extractor@7.52.8(@types/node@20.19.9))(postcss@8.5.6)(tsx@4.20.3)(typescript@5.8.3)(yaml@2.8.0): + dependencies: + bundle-require: 5.1.0(esbuild@0.25.5) + cac: 6.7.14 + chokidar: 4.0.3 + consola: 3.4.2 + debug: 4.4.1 + esbuild: 0.25.5 + fix-dts-default-cjs-exports: 1.0.1 + joycon: 3.1.1 + picocolors: 1.1.1 + postcss-load-config: 6.0.1(postcss@8.5.6)(tsx@4.20.3)(yaml@2.8.0) + resolve-from: 5.0.0 + rollup: 4.44.0 + source-map: 0.8.0-beta.0 + sucrase: 3.35.0 + tinyexec: 0.3.2 + tinyglobby: 0.2.14 + tree-kill: 1.2.2 + optionalDependencies: + '@microsoft/api-extractor': 7.52.8(@types/node@20.19.9) + postcss: 8.5.6 + typescript: 5.8.3 + transitivePeerDependencies: + - jiti + - supports-color + - tsx + - yaml + tsup@8.5.0(@microsoft/api-extractor@7.52.8(@types/node@22.15.32))(postcss@8.5.6)(tsx@4.20.3)(typescript@5.8.3)(yaml@2.8.0): dependencies: bundle-require: 5.1.0(esbuild@0.25.5) @@ -7902,6 +8186,24 @@ snapshots: vary@1.1.2: {} + vite-node@2.1.9(@types/node@20.19.9): + dependencies: + cac: 6.7.14 + debug: 4.4.1 + es-module-lexer: 1.7.0 + pathe: 1.1.2 + vite: 5.4.19(@types/node@20.19.9) + transitivePeerDependencies: + - '@types/node' + - less + - lightningcss + - sass + - sass-embedded + - stylus + - sugarss + - supports-color + - terser + vite-node@3.2.4(@types/node@20.19.9)(tsx@4.20.3)(yaml@2.8.0): dependencies: cac: 6.7.14 @@ -8079,6 +8381,41 @@ snapshots: tsx: 4.20.3 yaml: 2.8.0 + vitest@2.1.9(@types/node@20.19.9): + dependencies: + '@vitest/expect': 2.1.9 + '@vitest/mocker': 2.1.9(vite@5.4.19(@types/node@20.19.9)) + '@vitest/pretty-format': 2.1.9 + '@vitest/runner': 2.1.9 + '@vitest/snapshot': 2.1.9 + '@vitest/spy': 2.1.9 + '@vitest/utils': 2.1.9 + chai: 5.2.0 + debug: 4.4.1 + expect-type: 1.2.1 + magic-string: 0.30.17 + pathe: 1.1.2 + std-env: 3.9.0 + tinybench: 2.9.0 + tinyexec: 0.3.2 + tinypool: 1.1.1 + tinyrainbow: 1.2.0 + vite: 5.4.19(@types/node@20.19.9) + vite-node: 2.1.9(@types/node@20.19.9) + why-is-node-running: 2.3.0 + optionalDependencies: + '@types/node': 20.19.9 + transitivePeerDependencies: + - less + - lightningcss + - msw + - sass + - sass-embedded + - stylus + - sugarss + - supports-color + - terser + vitest@3.2.4(@types/node@20.19.9)(tsx@4.20.3)(yaml@2.8.0): dependencies: '@types/chai': 5.2.2 diff --git a/vitest.base.ts b/vitest.base.ts index 885c2d6ca..a56139f47 100644 --- a/vitest.base.ts +++ b/vitest.base.ts @@ -7,7 +7,7 @@ export default { // Enable parallelism sequence: { // TODO: This breaks fake timers, unsure how to make tests run in parallel within the same file - // concurrent: true, + concurrent: true, }, env: { // Enable logging @@ -15,6 +15,7 @@ export default { _LOG_TARGET: "1", _LOG_TIMESTAMP: "1", _RIVETKIT_ERROR_STACK: "1", + _RIVETKIT_LOG_MESSAGE: "1", }, }, } satisfies ViteUserConfig;