From 8983d470754115e8ec35c06b559928293b6dd915 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Thu, 26 Jun 2025 23:48:53 +0000 Subject: [PATCH] refactor: simplify server creation with createServer API --- .../src/frontend/components/ChatRoom.tsx | 4 +- examples/elysia/src/server.ts | 46 ++- examples/express/src/server.ts | 2 +- examples/hono-react/src/backend/server.ts | 57 ++-- examples/hono-react/src/frontend/App.tsx | 76 +++-- examples/hono-react/src/frontend/main.tsx | 18 +- examples/react/src/backend/server.ts | 18 +- .../driver-test-suite/inline-client.ts | 32 +- packages/core/src/common/utils.ts | 6 + packages/core/src/driver-test-suite/mod.ts | 14 +- packages/core/src/driver-test-suite/utils.ts | 8 +- packages/core/src/drivers/default.ts | 25 ++ .../core/src/drivers/file-system/actor.ts | 7 +- .../src/drivers/file-system/global-state.ts | 44 +-- .../core/src/drivers/file-system/utils.ts | 23 +- packages/core/src/drivers/rivet/actor.ts | 27 +- packages/core/src/drivers/rivet/config.ts | 11 - packages/core/src/drivers/rivet/manager.ts | 105 ------ packages/core/src/manager/router.ts | 304 +++++++++--------- packages/core/src/registry/mod.ts | 22 +- packages/core/src/registry/run-config.ts | 11 +- packages/core/src/registry/serve.ts | 6 +- packages/core/src/test/mod.ts | 41 +-- .../src/topologies/coordinate/topology.ts | 2 - .../src/topologies/partition/actor-router.ts | 19 +- packages/core/src/utils.ts | 24 -- 26 files changed, 400 insertions(+), 552 deletions(-) create mode 100644 packages/core/src/drivers/default.ts delete mode 100644 packages/core/src/drivers/rivet/config.ts delete mode 100644 packages/core/src/drivers/rivet/manager.ts diff --git a/examples/better-auth/src/frontend/components/ChatRoom.tsx b/examples/better-auth/src/frontend/components/ChatRoom.tsx index cf6a840a8..2570f0d80 100644 --- a/examples/better-auth/src/frontend/components/ChatRoom.tsx +++ b/examples/better-auth/src/frontend/components/ChatRoom.tsx @@ -3,9 +3,7 @@ import { createClient, createRivetKit } from "@rivetkit/react"; import { authClient } from "../auth-client"; import type { Registry } from "../../backend/registry"; -const client = createClient("http://localhost:8080/registry", { - transport: "sse", -}); +const client = createClient("http://localhost:8080/registry"); const { useActor } = createRivetKit(client); diff --git a/examples/elysia/src/server.ts b/examples/elysia/src/server.ts index 362e1100e..1dbb22a7a 100644 --- a/examples/elysia/src/server.ts +++ b/examples/elysia/src/server.ts @@ -1,25 +1,21 @@ -// import { registry } from "./registry"; -// import { Elysia } from "elysia"; -// import { createMemoryDriver } from "@rivetkit/memory"; -// -// // Start RivetKit -// const { client, handler } = registry.run({ -// driver: createMemoryDriver(), -// }); -// -// // Setup router -// const app = new Elysia() -// // Expose RivetKit to the frontend (optional) -// .mount("/registry", handler) -// // Example HTTP endpoint -// .post("/increment/:name", async ({ params }) => { -// const name = params.name; -// -// const counter = client.counter.getOrCreate(name); -// const newCount = await counter.increment(1); -// -// return `New Count: ${newCount}`; -// }) -// .listen(8080); -// -// console.log("Listening at http://localhost:8080"); +import { registry } from "./registry"; +import { Elysia } from "elysia"; + +const { client, handler } = registry.createServer(); + +// Setup router +new Elysia() + // Expose RivetKit to the frontend (optional) + .mount("/registry", handler) + // Example HTTP endpoint + .post("/increment/:name", async ({ params }) => { + const name = params.name; + + const counter = client.counter.getOrCreate(name); + const newCount = await counter.increment(1); + + return `New Count: ${newCount}`; + }) + .listen(8080); + +console.log("Listening at http://localhost:8080"); diff --git a/examples/express/src/server.ts b/examples/express/src/server.ts index b5d577b50..cd597e950 100644 --- a/examples/express/src/server.ts +++ b/examples/express/src/server.ts @@ -2,7 +2,7 @@ import { registry } from "./registry"; import express from "express"; // Start RivetKit -const { client, handler } = registry.run(); +const { client, handler } = registry.createServer(); // Setup router const app = express(); diff --git a/examples/hono-react/src/backend/server.ts b/examples/hono-react/src/backend/server.ts index d0304713e..fe2961e69 100644 --- a/examples/hono-react/src/backend/server.ts +++ b/examples/hono-react/src/backend/server.ts @@ -1,33 +1,24 @@ -// import { registry } from "./registry"; -// import { Hono } from "hono"; -// import { serve } from "@hono/node-server"; -// import { createMemoryDriver } from "@rivetkit/memory"; -// -// // Setup router -// const app = new Hono(); -// -// // Start RivetKit -// const { client, hono } = registry.run({ -// driver: createMemoryDriver(), -// cors: { -// // IMPORTANT: Configure origins in production -// origin: "*", -// }, -// }); -// -// // Expose RivetKit to the frontend -// app.route("/registry", hono); -// -// // Example HTTP endpoint -// app.post("/increment/:name", async (c) => { -// const name = c.req.param("name"); -// -// const counter = client.counter.getOrCreate(name); -// const newCount = await counter.increment(1); -// -// return c.text(`New Count: ${newCount}`); -// }); -// -// serve({ fetch: app.fetch, port: 8080 }, () => -// console.log("Listening at http://localhost:8080"), -// ); +import { registry } from "./registry"; +import { Hono } from "hono"; + +const { client, serve } = registry.createServer({ + cors: { + // IMPORTANT: Configure origins in production + origin: "*", + }, +}); + +// Setup router +const app = new Hono(); + +// Example HTTP endpoint +app.post("/increment/:name", async (c) => { + const name = c.req.param("name"); + + const counter = client.counter.getOrCreate(name); + const newCount = await counter.increment(1); + + return c.text(`New Count: ${newCount}`); +}); + +serve(app); diff --git a/examples/hono-react/src/frontend/App.tsx b/examples/hono-react/src/frontend/App.tsx index d8fc9ff06..1be67de1b 100644 --- a/examples/hono-react/src/frontend/App.tsx +++ b/examples/hono-react/src/frontend/App.tsx @@ -1,39 +1,37 @@ -// import { useState } from "react"; -// import { createClient, createRivetKit } from "@rivetkit/react"; -// import type { Registry } from "../backend/registry"; -// -// const client = createClient("http://localhost:8080/registry", { -// transport: "sse", -// }); -// const { useActor } = createRivetKit(client); -// -// function App() { -// const [count, setCount] = useState(0); -// const [counterName, setCounterName] = useState("test-counter"); -// -// const counter = useActor({ -// name: "counter", -// key: [counterName], -// }); -// -// counter.useEvent("newCount", (x: number) => setCount(x)); -// -// const increment = async () => { -// await counter.connection?.increment(1); -// }; -// -// return ( -//
-//

Counter: {count}

-// setCounterName(e.target.value)} -// placeholder="Counter name" -// /> -// -//
-// ); -// } -// -// export default App; +import { useState } from "react"; +import { createClient, createRivetKit } from "@rivetkit/react"; +import type { registry } from "../backend/registry"; + +const client = createClient("http://localhost:8080/registry"); +const { useActor } = createRivetKit(client); + +function App() { + const [count, setCount] = useState(0); + const [counterName, setCounterName] = useState("test-counter"); + + const counter = useActor({ + name: "counter", + key: [counterName], + }); + + counter.useEvent("newCount", (x: number) => setCount(x)); + + const increment = async () => { + await counter.connection?.increment(1); + }; + + return ( +
+

Counter: {count}

+ setCounterName(e.target.value)} + placeholder="Counter name" + /> + +
+ ); +} + +export default App; diff --git a/examples/hono-react/src/frontend/main.tsx b/examples/hono-react/src/frontend/main.tsx index cd443c7ac..6d0ba7949 100644 --- a/examples/hono-react/src/frontend/main.tsx +++ b/examples/hono-react/src/frontend/main.tsx @@ -1,9 +1,9 @@ -// import React from "react"; -// import ReactDOM from "react-dom/client"; -// import App from "./App"; -// -// ReactDOM.createRoot(document.getElementById("root")!).render( -// -// -// , -// ); +import React from "react"; +import ReactDOM from "react-dom/client"; +import App from "./App"; + +ReactDOM.createRoot(document.getElementById("root")!).render( + + + , +); diff --git a/examples/react/src/backend/server.ts b/examples/react/src/backend/server.ts index 40bd8bbc3..5eb7d1676 100644 --- a/examples/react/src/backend/server.ts +++ b/examples/react/src/backend/server.ts @@ -1,9 +1,9 @@ -// import { registry } from "./registry"; -// import { serve } from "@rivetkit/nodejs"; -// -// serve(registry, { -// cors: { -// // IMPORTANT: Configure origins in production -// origin: "*", -// }, -// }); +import { registry } from "./registry"; +import { serve } from "@rivetkit/nodejs"; + +serve(registry, { + cors: { + // IMPORTANT: Configure origins in production + origin: "*", + }, +}); diff --git a/packages/core/fixtures/driver-test-suite/inline-client.ts b/packages/core/fixtures/driver-test-suite/inline-client.ts index b3edefbcc..0e1f0e19b 100644 --- a/packages/core/fixtures/driver-test-suite/inline-client.ts +++ b/packages/core/fixtures/driver-test-suite/inline-client.ts @@ -1,5 +1,5 @@ import { actor } from "@rivetkit/core"; -import type { Registry } from "./registry"; +import type { registry } from "./registry"; export const inlineClientActor = actor({ onAuth: () => {}, @@ -7,26 +7,32 @@ export const inlineClientActor = actor({ actions: { // Action that uses client to call another actor (stateless) callCounterIncrement: async (c, amount: number) => { - const client = c.client(); - const result = await client.counter.getOrCreate(["inline-test"]).increment(amount); - c.state.messages.push(`Called counter.increment(${amount}), result: ${result}`); + const client = c.client(); + const result = await client.counter + .getOrCreate(["inline-test"]) + .increment(amount); + c.state.messages.push( + `Called counter.increment(${amount}), result: ${result}`, + ); return result; }, // Action that uses client to get counter state (stateless) getCounterState: async (c) => { - const client = c.client(); - const count = await client.counter.getOrCreate(["inline-test"]).getCount(); + const client = c.client(); + const count = await client.counter + .getOrCreate(["inline-test"]) + .getCount(); c.state.messages.push(`Got counter state: ${count}`); return count; }, // Action that uses client with .connect() for stateful communication connectToCounterAndIncrement: async (c, amount: number) => { - const client = c.client(); + const client = c.client(); const handle = client.counter.getOrCreate(["inline-test-stateful"]); const connection = handle.connect(); - + // Set up event listener const events: number[] = []; connection.on("newCount", (count: number) => { @@ -36,11 +42,13 @@ export const inlineClientActor = actor({ // Perform increments const result1 = await connection.increment(amount); const result2 = await connection.increment(amount * 2); - + await connection.dispose(); - - c.state.messages.push(`Connected to counter, incremented by ${amount} and ${amount * 2}, results: ${result1}, ${result2}, events: ${JSON.stringify(events)}`); - + + c.state.messages.push( + `Connected to counter, incremented by ${amount} and ${amount * 2}, results: ${result1}, ${result2}, events: ${JSON.stringify(events)}`, + ); + return { result1, result2, events }; }, diff --git a/packages/core/src/common/utils.ts b/packages/core/src/common/utils.ts index 7303973f4..1c6e61b6a 100644 --- a/packages/core/src/common/utils.ts +++ b/packages/core/src/common/utils.ts @@ -2,6 +2,7 @@ import * as errors from "@/actor/errors"; import { getEnvUniversal } from "@/utils"; import type { ContentfulStatusCode } from "hono/utils/http-status"; import type { Logger } from "./log"; +import { type Next } from "hono"; export function assertUnreachable(x: never): never { throw new Error(`Unreachable case: ${x}`); @@ -223,3 +224,8 @@ function getErrorMessage(err: unknown): string { return String(err); } } + +/** Generates a `Next` handler to pass to middleware in order to be able to call arbitrary middleware. */ +export function noopNext(): Next { + return async () => {}; +} diff --git a/packages/core/src/driver-test-suite/mod.ts b/packages/core/src/driver-test-suite/mod.ts index 93b68c0e0..17140b881 100644 --- a/packages/core/src/driver-test-suite/mod.ts +++ b/packages/core/src/driver-test-suite/mod.ts @@ -128,13 +128,10 @@ export async function createTestRuntime( // Build driver config let injectWebSocket: NodeWebSocket["injectWebSocket"] | undefined; + let upgradeWebSocket = undefined; const config: RunConfig = RunConfigSchema.parse({ driver, - getUpgradeWebSocket: (router: any) => { - const webSocket = createNodeWebSocket({ app: router }); - injectWebSocket = webSocket.injectWebSocket; - return webSocket.upgradeWebSocket; - }, + getUpgradeWebSocket: () => upgradeWebSocket!, }); // Build topology @@ -142,7 +139,10 @@ export async function createTestRuntime( config.driver.topology === "coordinate" ? new CoordinateTopology(registry.config, config) : new StandaloneTopology(registry.config, config); - if (!injectWebSocket) throw new Error("injectWebSocket not defined"); + + // Inject WebSocket + const nodeWebSocket = createNodeWebSocket({ app: topology.router }); + upgradeWebSocket = nodeWebSocket.upgradeWebSocket; // Start server const port = await getPort(); @@ -152,7 +152,7 @@ export async function createTestRuntime( port, }); invariant(injectWebSocket !== undefined, "should have injectWebSocket"); - injectWebSocket(server); + nodeWebSocket.injectWebSocket(server); const endpoint = `http://127.0.0.1:${port}`; // Cleanup diff --git a/packages/core/src/driver-test-suite/utils.ts b/packages/core/src/driver-test-suite/utils.ts index f402a2b96..dc978772f 100644 --- a/packages/core/src/driver-test-suite/utils.ts +++ b/packages/core/src/driver-test-suite/utils.ts @@ -3,7 +3,7 @@ import { assertUnreachable } from "@/actor/utils"; import { createClientWithDriver } from "@/client/client"; import { type Client, createClient } from "@/client/mod"; import { type TestContext, vi } from "vitest"; -import type { Registry } from "../../fixtures/driver-test-suite/registry"; +import type { registry } from "../../fixtures/driver-test-suite/registry"; import type { DriverTestConfig } from "./mod"; import { createTestInlineClientDriver } from "./test-inline-client-driver"; @@ -12,7 +12,7 @@ export async function setupDriverTest( c: TestContext, driverTestConfig: DriverTestConfig, ): Promise<{ - client: Client; + client: Client; }> { if (!driverTestConfig.useRealTimers) { vi.useFakeTimers(); @@ -23,10 +23,10 @@ export async function setupDriverTest( const { endpoint, cleanup } = await driverTestConfig.start(projectPath); c.onTestFinished(cleanup); - let client: Client; + let client: Client; if (driverTestConfig.clientType === "http") { // Create client - client = createClient(endpoint, { + client = createClient(endpoint, { transport: driverTestConfig.transport, }); } else if (driverTestConfig.clientType === "inline") { diff --git a/packages/core/src/drivers/default.ts b/packages/core/src/drivers/default.ts new file mode 100644 index 000000000..06ea25541 --- /dev/null +++ b/packages/core/src/drivers/default.ts @@ -0,0 +1,25 @@ +import { logger } from "@/actor/log"; +import { createMemoryDriver } from "@/drivers/memory/mod"; +import { createRivetManagerDriver } from "@/drivers/rivet/mod"; +import { type DriverConfig, UserError } from "@/mod"; +import { createFileSystemDriver } from "@/drivers/file-system/mod"; +import { getEnvUniversal } from "@/utils"; + +/** + * Determines which driver to use if none is provided. + */ +export function createDefaultDriver(): DriverConfig { + const driver = getEnvUniversal("RIVETKIT_DRIVER"); + if (!driver || driver === "file-system") { + logger().info("using default file system driver"); + return createFileSystemDriver(); + } else if (driver === "memory") { + logger().info("using default memory driver"); + return createMemoryDriver(); + } else if (driver === "rivet") { + logger().info("using default rivet driver"); + return createRivetManagerDriver(); + } else { + throw new UserError(`Unrecognized driver: ${driver}`); + } +} diff --git a/packages/core/src/drivers/file-system/actor.ts b/packages/core/src/drivers/file-system/actor.ts index 1791809f8..9c97807de 100644 --- a/packages/core/src/drivers/file-system/actor.ts +++ b/packages/core/src/drivers/file-system/actor.ts @@ -1,7 +1,4 @@ -import type { - ActorDriver, - AnyActorInstance, -} from "@/driver-helpers/mod"; +import type { ActorDriver, AnyActorInstance } from "@/driver-helpers/mod"; import type { FileSystemGlobalState } from "./global-state"; export type ActorDriverContext = Record; @@ -28,10 +25,12 @@ export class FileSystemActorDriver implements ActorDriver { } async readPersistedData(actorId: string): Promise { + console.log("reading data", this.#state.readPersistedData(actorId)); return this.#state.readPersistedData(actorId); } async writePersistedData(actorId: string, data: Uint8Array): Promise { + console.log("writing data", data); this.#state.writePersistedData(actorId, data); // Save state to disk diff --git a/packages/core/src/drivers/file-system/global-state.ts b/packages/core/src/drivers/file-system/global-state.ts index 7c4da9118..c03f0d4e8 100644 --- a/packages/core/src/drivers/file-system/global-state.ts +++ b/packages/core/src/drivers/file-system/global-state.ts @@ -5,12 +5,14 @@ import type { ActorKey } from "@/actor/mod"; import { logger } from "./log"; import { getStoragePath, - getActorStoragePath, + getActorStoragePath as getActorDataPath, ensureDirectoryExists, ensureDirectoryExistsSync, + getActorsDir, } from "./utils"; import invariant from "invariant"; import { serializeEmptyPersistData } from "@/driver-helpers/mod"; +import * as cbor from "cbor-x"; /** * Interface representing a actor's state @@ -19,7 +21,7 @@ export interface ActorState { id: string; name: string; key: ActorKey; - persistedData?: Uint8Array; + persistedData: Uint8Array; } /** @@ -34,8 +36,7 @@ export class FileSystemGlobalState { this.#storagePath = getStoragePath(customPath); // Ensure storage directories exist synchronously during initialization - ensureDirectoryExistsSync(this.#storagePath); - ensureDirectoryExistsSync(`${this.#storagePath}/actors`); + ensureDirectoryExistsSync(getActorsDir(this.#storagePath)); // Load all actors into cache synchronously this.#loadAllActorsIntoCache(); @@ -51,19 +52,19 @@ export class FileSystemGlobalState { * Only called once during initialization */ #loadAllActorsIntoCache(): void { - const actorsDir = path.join(this.#storagePath, "actors"); + const actorsDir = getActorsDir(this.#storagePath); try { // HACK: Use synchronous filesystem operations for initialization const actorIds = fsSync.readdirSync(actorsDir); for (const actorId of actorIds) { - const stateFilePath = this.getStateFilePath(actorId); + const stateFilePath = getActorDataPath(this.#storagePath, actorId); if (fsSync.existsSync(stateFilePath)) { try { - const stateData = fsSync.readFileSync(stateFilePath, "utf8"); - const state = JSON.parse(stateData) as ActorState; + const stateData = fsSync.readFileSync(stateFilePath); + const state = cbor.decode(stateData) as ActorState; this.#stateCache.set(actorId, state); } catch (error) { @@ -86,14 +87,6 @@ export class FileSystemGlobalState { return this.#storagePath; } - /** - * Get state file path for a actor - */ - getStateFilePath(actorId: string): string { - const actorDir = getActorStoragePath(this.#storagePath, actorId); - return path.join(actorDir, "state.json"); - } - /** * Load actor state from cache */ @@ -132,22 +125,17 @@ export class FileSystemGlobalState { return; } - const actorDir = getActorStoragePath(this.#storagePath, actorId); - const stateFilePath = this.getStateFilePath(actorId); + const dataPath = getActorDataPath(this.#storagePath, actorId); try { + // TODO: This only needs to be done once // Create actor directory - await ensureDirectoryExists(actorDir); - - // Create serializable object - // State is already in serializable format - const serializedState = state; + await ensureDirectoryExists(path.dirname(dataPath)); - await fs.writeFile( - stateFilePath, - JSON.stringify(serializedState), - "utf8", - ); + // Write data + const serializedState = cbor.encode(state); + await fs.writeFile(dataPath, serializedState); + console.log("saving state", dataPath); } catch (error) { logger().error("failed to save actor state", { actorId, error }); throw new Error(`Failed to save actor state: ${error}`); diff --git a/packages/core/src/drivers/file-system/utils.ts b/packages/core/src/drivers/file-system/utils.ts index 8c9d3d960..89fc02781 100644 --- a/packages/core/src/drivers/file-system/utils.ts +++ b/packages/core/src/drivers/file-system/utils.ts @@ -1,11 +1,8 @@ -import * as fs from "fs/promises"; -import * as fsSync from "fs"; -import * as path from "path"; -import * as crypto from "crypto"; -import * as os from "os"; - -// Get platform-specific data directory -const paths = { data: getDataPath("rivetkit") }; +import * as fs from "node:fs/promises"; +import * as fsSync from "node:fs"; +import * as path from "node:path"; +import * as crypto from "node:crypto"; +import * as os from "node:os"; /** * Create a hash for a path, normalizing it first @@ -31,14 +28,16 @@ function createHashForPath(dirPath: string): string { * Get the storage path for the current working directory or a specified path */ export function getStoragePath(customPath?: string): string { + const dataPath = getDataPath("rivetkit"); const pathToHash = customPath || process.cwd(); const dirHash = createHashForPath(pathToHash); - return path.join(paths.data, dirHash); + return path.join(dataPath, dirHash); +} + +export function getActorsDir(baseDir: string): string { + return path.join(baseDir, "actors"); } -/** - * Get actor's storage directory - */ export function getActorStoragePath(baseDir: string, actorId: string): string { return path.join(baseDir, "actors", actorId); } diff --git a/packages/core/src/drivers/rivet/actor.ts b/packages/core/src/drivers/rivet/actor.ts index 35bb3ef15..a4689be09 100644 --- a/packages/core/src/drivers/rivet/actor.ts +++ b/packages/core/src/drivers/rivet/actor.ts @@ -5,20 +5,20 @@ import { PartitionTopologyActor } from "@/topologies/partition/mod"; import type { ActorContext } from "@rivet-gg/actor-core"; import invariant from "invariant"; import { RivetActorDriver } from "./actor-driver"; -import { type Config, ConfigSchema, type InputConfig } from "./config"; import { logger } from "./log"; import { RivetManagerDriver } from "./manager-driver"; import { type RivetClientConfig, getRivetClientConfig } from "./rivet-client"; import { type RivetHandler, deserializeKeyFromTag } from "./util"; import * as cbor from "cbor-x"; +import { RunConfigInput, RunConfigSchema } from "@/registry/run-config"; export function createActorHandler( registry: Registry, - inputConfig?: InputConfig, + inputConfig?: RunConfigInput, ): RivetHandler { - let config: Config; + let config: RunConfig; try { - config = ConfigSchema.parse(inputConfig); + config = RunConfigSchema.parse(inputConfig); } catch (error) { logger().error("failed to start manager", { error: stringifyError(error) }); Deno.exit(1); @@ -39,7 +39,7 @@ export function createActorHandler( async function startActor( ctx: ActorContext, registry: Registry, - config: Config, + config: RunConfig, ): Promise { const { upgradeWebSocket } = await import("hono/deno"); @@ -56,15 +56,12 @@ async function startActor( const clientConfig: RivetClientConfig = getRivetClientConfig(); - const runConfig = { - driver: { - topology: "partition", - manager: new RivetManagerDriver(clientConfig), - actor: new RivetActorDriver(ctx), - }, - getUpgradeWebSocket: () => upgradeWebSocket, - ...config, - } satisfies RunConfig; + config.driver = { + topology: "partition", + manager: new RivetManagerDriver(clientConfig), + actor: new RivetActorDriver(ctx), + }; + config.getUpgradeWebSocket = () => upgradeWebSocket; // Initialization promise // @@ -123,7 +120,7 @@ async function startActor( //}; // Create actor topology - const actorTopology = new PartitionTopologyActor(registry.config, runConfig); + const actorTopology = new PartitionTopologyActor(registry.config, config); // Set a catch-all route const router = actorTopology.router; diff --git a/packages/core/src/drivers/rivet/config.ts b/packages/core/src/drivers/rivet/config.ts deleted file mode 100644 index b5844a912..000000000 --- a/packages/core/src/drivers/rivet/config.ts +++ /dev/null @@ -1,11 +0,0 @@ -import { RunConfigSchema } from "@/driver-helpers/mod"; -import type { z } from "zod"; - -export const ConfigSchema = RunConfigSchema.removeDefault() - .omit({ - driver: true, - getUpgradeWebSocket: true, - }) - .default({}); -export type InputConfig = z.input; -export type Config = z.infer; diff --git a/packages/core/src/drivers/rivet/manager.ts b/packages/core/src/drivers/rivet/manager.ts deleted file mode 100644 index b89efbb23..000000000 --- a/packages/core/src/drivers/rivet/manager.ts +++ /dev/null @@ -1,105 +0,0 @@ -import { setupLogging } from "@/common/log"; -import type { Registry, RunConfig } from "@/registry/mod"; -import { PartitionTopologyManager } from "@/topologies/partition/mod"; -import { serve as honoServe } from "@hono/node-server"; -import { type NodeWebSocket, createNodeWebSocket } from "@hono/node-ws"; -import { ConfigSchema, type InputConfig } from "./config"; -import { logger } from "./log"; -import { RivetManagerDriver } from "./manager-driver"; -import type { RivetClientConfig } from "./rivet-client"; - -export async function startManager( - registry: Registry, - inputConfig?: InputConfig, -): Promise { - setupLogging(); - - const portStr = process.env.PORT_HTTP; - if (!portStr) { - throw "Missing port"; - } - const port = Number.parseInt(portStr); - if (!Number.isFinite(port)) { - throw "Invalid port"; - } - - const endpoint = process.env.RIVET_ENDPOINT; - if (!endpoint) throw new Error("missing RIVET_ENDPOINT"); - const token = process.env.RIVET_SERVICE_TOKEN; - if (!token) throw new Error("missing RIVET_SERVICE_TOKEN"); - const project = process.env.RIVET_PROJECT; - if (!project) throw new Error("missing RIVET_PROJECT"); - const environment = process.env.RIVET_ENVIRONMENT; - if (!environment) throw new Error("missing RIVET_ENVIRONMENT"); - - const clientConfig: RivetClientConfig = { - endpoint, - token, - project, - environment, - }; - - const config = ConfigSchema.parse(inputConfig); - let injectWebSocket: NodeWebSocket["injectWebSocket"] | undefined; - const runConfig = { - driver: { - topology: "partition", - manager: new RivetManagerDriver(clientConfig), - // HACK: We can't build the actor driver until we're inside the actor - actor: undefined as any, - }, - // Setup WebSocket routing for Node - // - // Save `injectWebSocket` for after server is created - getUpgradeWebSocket: (app) => { - const webSocket = createNodeWebSocket({ app }); - injectWebSocket = webSocket.injectWebSocket; - return webSocket.upgradeWebSocket; - }, - ...config, - } satisfies RunConfig; - - //// Force disable inspector - //driverConfig.registry.config.inspector = { - // enabled: false, - //}; - - //const corsConfig = driverConfig.registry.config.cors; - // - //// Enable CORS for Rivet domains - //driverConfig.registry.config.cors = { - // ...driverConfig.registry.config.cors, - // origin: (origin, c) => { - // const isRivetOrigin = - // origin.endsWith(".rivet.gg") || origin.includes("localhost:"); - // const configOrigin = corsConfig?.origin; - // - // if (isRivetOrigin) { - // return origin; - // } - // if (typeof configOrigin === "function") { - // return configOrigin(origin, c); - // } - // if (typeof configOrigin === "string") { - // return configOrigin; - // } - // return null; - // }, - //}; - - // Create manager topology - const managerTopology = new PartitionTopologyManager( - registry.config, - runConfig, - ); - - // Start server with ambient env wrapper - logger().info("server running", { port }); - const server = honoServe({ - fetch: managerTopology.router.fetch, - hostname: "0.0.0.0", - port, - }); - if (!injectWebSocket) throw new Error("injectWebSocket not defined"); - injectWebSocket(server); -} diff --git a/packages/core/src/manager/router.ts b/packages/core/src/manager/router.ts index 1096100b6..52146e735 100644 --- a/packages/core/src/manager/router.ts +++ b/packages/core/src/manager/router.ts @@ -38,7 +38,7 @@ import { VERSION } from "@/utils"; import { OpenAPIHono } from "@hono/zod-openapi"; import { createRoute } from "@hono/zod-openapi"; import * as cbor from "cbor-x"; -import { Hono, type Context as HonoContext, type Next } from "hono"; +import { Hono, type Context as HonoContext } from "hono"; import { cors } from "hono/cors"; import { streamSSE } from "hono/streaming"; import type { WSContext } from "hono/ws"; @@ -55,6 +55,7 @@ import { ResolveRequestSchema, } from "./protocol/query"; import type { ActorQuery } from "./protocol/query"; +import { noopNext } from "@/common/utils"; type ManagerRouterHandler = { // onConnectInspector?: ManagerInspectorConnHandler; @@ -116,10 +117,6 @@ export function createManagerRouter( const driver = runConfig.driver.manager; const router = new OpenAPIHono({ strict: false }); - const upgradeWebSocket = runConfig.getUpgradeWebSocket?.( - router as unknown as Hono, - ); - router.use("*", loggerMiddleware(logger())); if (runConfig.cors) { @@ -203,7 +200,6 @@ export function createManagerRouter( if (c.req.path.endsWith("/actors/connect/websocket")) { return handleWebSocketConnectRequest( c, - upgradeWebSocket, registryConfig, runConfig, driver, @@ -397,158 +393,152 @@ export function createManagerRouter( return c.body(cbor.encode(response)); }); - if (upgradeWebSocket) { - router.get( - ".test/inline-driver/connect-websocket", - upgradeWebSocket(async (c: any) => { - const { - actorQuery: actorQueryRaw, - params: paramsRaw, - encodingKind, - } = c.req.query() as { - actorQuery: string; - params?: string; - encodingKind: Encoding; - }; - const actorQuery = JSON.parse(actorQueryRaw); - const params = - paramsRaw !== undefined ? JSON.parse(paramsRaw) : undefined; - - logger().debug("received test inline driver websocket", { - actorQuery, - params, - encodingKind, - }); + router.get(".test/inline-driver/connect-websocket", async (c) => { + const upgradeWebSocket = runConfig.getUpgradeWebSocket?.(); + invariant(upgradeWebSocket, "websockets not supported on this platform"); + + return upgradeWebSocket(async (c: any) => { + const { + actorQuery: actorQueryRaw, + params: paramsRaw, + encodingKind, + } = c.req.query() as { + actorQuery: string; + params?: string; + encodingKind: Encoding; + }; + const actorQuery = JSON.parse(actorQueryRaw); + const params = + paramsRaw !== undefined ? JSON.parse(paramsRaw) : undefined; + + logger().debug("received test inline driver websocket", { + actorQuery, + params, + encodingKind, + }); - // Connect to the actor using the inline client driver - this returns a Promise - const clientWsPromise = inlineClientDriver.connectWebSocket( - undefined, - actorQuery, - encodingKind, - params, - undefined, - ); + // Connect to the actor using the inline client driver - this returns a Promise + const clientWsPromise = inlineClientDriver.connectWebSocket( + undefined, + actorQuery, + encodingKind, + params, + undefined, + ); - // Store a reference to the resolved WebSocket - let clientWs: WebSocket | null = null; - - // Create WebSocket proxy handlers to relay messages between client and server - return { - onOpen: async (_evt: any, serverWs: WSContext) => { - logger().debug("test websocket connection opened"); - - try { - // Resolve the client WebSocket promise - clientWs = await clientWsPromise; - - // Add message handler to forward messages from client to server - clientWs.addEventListener( - "message", - (clientEvt: MessageEvent) => { - logger().debug("test websocket connection message"); - - if (serverWs.readyState === 1) { - // OPEN - serverWs.send(clientEvt.data as any); - } - }, - ); - - // Add close handler to close server when client closes - clientWs.addEventListener("close", (clientEvt: CloseEvent) => { - logger().debug("test websocket connection closed"); - - if (serverWs.readyState !== 3) { - // Not CLOSED - serverWs.close(clientEvt.code, clientEvt.reason); - } - }); + // Store a reference to the resolved WebSocket + let clientWs: WebSocket | null = null; + + // Create WebSocket proxy handlers to relay messages between client and server + return { + onOpen: async (_evt: any, serverWs: WSContext) => { + logger().debug("test websocket connection opened"); - // Add error handler - clientWs.addEventListener("error", () => { - logger().debug("test websocket connection error"); + try { + // Resolve the client WebSocket promise + clientWs = await clientWsPromise; - if (serverWs.readyState !== 3) { - // Not CLOSED - serverWs.close(1011, "Error in client websocket"); + // Add message handler to forward messages from client to server + clientWs.addEventListener( + "message", + (clientEvt: MessageEvent) => { + logger().debug("test websocket connection message"); + + if (serverWs.readyState === 1) { + // OPEN + serverWs.send(clientEvt.data as any); } - }); - } catch (error) { - logger().error( - "failed to establish client websocket connection", - { error }, - ); - serverWs.close(1011, "Failed to establish connection"); - } - }, - onMessage: async (evt: { data: any }, serverWs: WSContext) => { - // If clientWs hasn't been resolved yet, messages will be lost - if (!clientWs) { - logger().debug( - "received server message before client WebSocket connected", - ); - return; - } - - logger().debug("received message from server", { - dataType: typeof evt.data, + }, + ); + + // Add close handler to close server when client closes + clientWs.addEventListener("close", (clientEvt: CloseEvent) => { + logger().debug("test websocket connection closed"); + + if (serverWs.readyState !== 3) { + // Not CLOSED + serverWs.close(clientEvt.code, clientEvt.reason); + } }); - // Forward messages from server websocket to client websocket - if (clientWs.readyState === 1) { - // OPEN - clientWs.send(evt.data); - } - }, - onClose: async ( - event: { - wasClean: boolean; - code: number; - reason: string; - }, - serverWs: WSContext, - ) => { - logger().debug("server websocket closed", { - wasClean: event.wasClean, - code: event.code, - reason: event.reason, + // Add error handler + clientWs.addEventListener("error", () => { + logger().debug("test websocket connection error"); + + if (serverWs.readyState !== 3) { + // Not CLOSED + serverWs.close(1011, "Error in client websocket"); + } }); + } catch (error) { + logger().error( + "failed to establish client websocket connection", + { error }, + ); + serverWs.close(1011, "Failed to establish connection"); + } + }, + onMessage: async (evt: { data: any }, serverWs: WSContext) => { + // If clientWs hasn't been resolved yet, messages will be lost + if (!clientWs) { + logger().debug( + "received server message before client WebSocket connected", + ); + return; + } + + logger().debug("received message from server", { + dataType: typeof evt.data, + }); - // HACK: Close socket in order to fix bug with Cloudflare leaving WS in closing state - // https://github.com/cloudflare/workerd/issues/2569 - serverWs.close(1000, "hack_force_close"); - - // Close the client websocket when the server websocket closes - if ( - clientWs && - clientWs.readyState !== clientWs.CLOSED && - clientWs.readyState !== clientWs.CLOSING - ) { - clientWs.close(event.code, event.reason); - } - }, - onError: async (error: unknown) => { - logger().error("error in server websocket", { error }); - - // Close the client websocket on error - if ( - clientWs && - clientWs.readyState !== clientWs.CLOSED && - clientWs.readyState !== clientWs.CLOSING - ) { - clientWs.close(1011, "Error in server websocket"); - } + // Forward messages from server websocket to client websocket + if (clientWs.readyState === 1) { + // OPEN + clientWs.send(evt.data); + } + }, + onClose: async ( + event: { + wasClean: boolean; + code: number; + reason: string; }, - }; - }), - ); - } else { - router.get(".test/inline-driver/connect-websocket", (c) => { - throw new Error( - "websocket unsupported, fix the test to exclude websockets for this platform", - ); - }); - } + serverWs: WSContext, + ) => { + logger().debug("server websocket closed", { + wasClean: event.wasClean, + code: event.code, + reason: event.reason, + }); + + // HACK: Close socket in order to fix bug with Cloudflare leaving WS in closing state + // https://github.com/cloudflare/workerd/issues/2569 + serverWs.close(1000, "hack_force_close"); + + // Close the client websocket when the server websocket closes + if ( + clientWs && + clientWs.readyState !== clientWs.CLOSED && + clientWs.readyState !== clientWs.CLOSING + ) { + clientWs.close(event.code, event.reason); + } + }, + onError: async (error: unknown) => { + logger().error("error in server websocket", { error }); + + // Close the client websocket on error + if ( + clientWs && + clientWs.readyState !== clientWs.CLOSED && + clientWs.readyState !== clientWs.CLOSING + ) { + clientWs.close(1011, "Error in server websocket"); + } + }, + }; + })(c, noopNext()); + }); } router.doc("/openapi.json", { @@ -796,17 +786,18 @@ async function handleSseConnectRequest( */ async function handleWebSocketConnectRequest( c: HonoContext, - upgradeWebSocket: - | (( - createEvents: (c: HonoContext) => any, - ) => (c: HonoContext, next: Next) => Promise) - | undefined, registryConfig: RegistryConfig, runConfig: RunConfig, driver: ManagerDriver, handler: ManagerRouterHandler, ): Promise { - invariant(upgradeWebSocket, "WebSockets not supported"); + const upgradeWebSocket = runConfig.getUpgradeWebSocket?.(); + if (!upgradeWebSocket) { + return c.text( + "WebSockets are not enabled for this driver. Use SSE instead.", + 400, + ); + } let encoding: Encoding | undefined; try { @@ -1216,8 +1207,3 @@ async function handleResolveRequest( const serialized = serialize(response, encoding); return c.body(serialized); } - -/** Generates a `Next` handler to pass to middleware in order to be able to call arbitrary middleware. */ -function noopNext(): Next { - return async () => {}; -} diff --git a/packages/core/src/registry/mod.ts b/packages/core/src/registry/mod.ts index 8c5d2531a..b73a08418 100644 --- a/packages/core/src/registry/mod.ts +++ b/packages/core/src/registry/mod.ts @@ -52,6 +52,12 @@ export class Registry { public createServer(inputConfig?: RunConfigInput): ServerOutput { const config = RunConfigSchema.parse(inputConfig); + // Configure getUpgradeWebSocket lazily so we can assign it in crossPlatformServe + let upgradeWebSocket = undefined; + if (!config.getUpgradeWebSocket) { + config.getUpgradeWebSocket = () => upgradeWebSocket!; + } + // Setup topology let hono: Hono; let clientDriver: ClientDriver; @@ -78,7 +84,10 @@ export class Registry { client, hono, handler: async (req: Request) => await hono.fetch(req), - serve: (app) => crossPlatformServe(config, hono, app), + serve: async (app) => { + const out = await crossPlatformServe(hono, app); + upgradeWebSocket = out.upgradeWebSocket; + }, }; } @@ -96,6 +105,12 @@ export class Registry { public createWorker(inputConfig?: RunConfigInput): ActorNodeOutput { const config = RunConfigSchema.parse(inputConfig); + // Configure getUpgradeWebSocket lazily so we can assign it in crossPlatformServe + let upgradeWebSocket = undefined; + if (!config.getUpgradeWebSocket) { + config.getUpgradeWebSocket = () => upgradeWebSocket!; + } + // Setup topology let hono: Hono; if (config.driver.topology === "standalone") { @@ -112,7 +127,10 @@ export class Registry { return { hono, handler: async (req: Request) => await hono.fetch(req), - serve: (app) => crossPlatformServe(config, hono, app), + serve: async (app) => { + const out = await crossPlatformServe(hono, app); + upgradeWebSocket = out.upgradeWebSocket; + }, }; } diff --git a/packages/core/src/registry/run-config.ts b/packages/core/src/registry/run-config.ts index 61f86d6a3..549e62dc3 100644 --- a/packages/core/src/registry/run-config.ts +++ b/packages/core/src/registry/run-config.ts @@ -1,8 +1,8 @@ import type { ActorDriver } from "@/actor/driver"; -import { createMemoryDriver } from "@/drivers/memory/mod"; import type { ManagerDriver } from "@/manager/driver"; import type { CoordinateDriver } from "@/topologies/coordinate/driver"; -import { createDefaultDriver, type UpgradeWebSocket } from "@/utils"; +import { type UpgradeWebSocket } from "@/utils"; +import { createDefaultDriver } from "@/drivers/default"; import type { Hono } from "hono"; import type { cors } from "hono/cors"; import { z } from "zod"; @@ -46,7 +46,7 @@ export type ActorPeerConfig = z.infer; export const TopologySchema = z.enum(["standalone", "partition", "coordinate"]); export type Topology = z.infer; -export type GetUpgradeWebSocket = (router: Hono) => UpgradeWebSocket; +export type GetUpgradeWebSocket = () => UpgradeWebSocket; export const DriverConfigSchema = z.object({ topology: TopologySchema, @@ -62,7 +62,10 @@ export const RunConfigSchema = z .object({ driver: DriverConfigSchema.optional().default(() => createDefaultDriver()), - // This is dynamic since NodeJS requires a reference to the router to initialize WebSockets + // This is a function to allow for lazy configuration of upgradeWebSocket on the + // fly. This is required since the dependencies that profie upgradeWebSocket + // (specifically Node.js) can sometimes only be specified after the router is + // created or must be imported async using `await import(...)` getUpgradeWebSocket: z.custom().optional(), /** CORS configuration for the router. Uses Hono's CORS middleware options. */ diff --git a/packages/core/src/registry/serve.ts b/packages/core/src/registry/serve.ts index af6a1ccc2..635b9c042 100644 --- a/packages/core/src/registry/serve.ts +++ b/packages/core/src/registry/serve.ts @@ -4,7 +4,6 @@ import { logger } from "./log"; import type { RunConfig } from "./run-config"; export async function crossPlatformServe( - config: RunConfig, rivetKitRouter: Hono, userRouter: Hono | undefined, ) { @@ -42,9 +41,6 @@ export async function crossPlatformServe( app, }); - // Update config for new WS - config.getUpgradeWebSocket = () => upgradeWebSocket; - // Start server const port = Number.parseInt( getEnvUniversal("PORT") ?? getEnvUniversal("PORT_HTTP") ?? "8080", @@ -53,4 +49,6 @@ export async function crossPlatformServe( logger().info(`listening on port ${port}`), ); injectWebSocket(server); + + return { upgradeWebSocket }; } diff --git a/packages/core/src/test/mod.ts b/packages/core/src/test/mod.ts index 3448860b8..6d18f1459 100644 --- a/packages/core/src/test/mod.ts +++ b/packages/core/src/test/mod.ts @@ -11,14 +11,9 @@ import { type TestContext, vi } from "vitest"; import { ConfigSchema, type InputConfig } from "./config"; import { logger } from "./log"; import { createMemoryDriver } from "@/drivers/memory/mod"; +import { upgradeWebSocket } from "hono/deno"; -function createRouter( - registry: Registry, - inputConfig?: InputConfig, -): { - router: Hono; - injectWebSocket: NodeWebSocket["injectWebSocket"]; -} { +function serve(registry: Registry, inputConfig?: InputConfig): ServerType { const config = ConfigSchema.parse(inputConfig); // Configure default configuration @@ -26,46 +21,34 @@ function createRouter( config.driver = createMemoryDriver(); } - // Setup WebSocket routing for Node - // - // Save `injectWebSocket` for after server is created - let injectWebSocket: NodeWebSocket["injectWebSocket"] | undefined; + let upgradeWebSocket = undefined; if (!config.getUpgradeWebSocket) { - config.getUpgradeWebSocket = (router) => { - const webSocket = createNodeWebSocket({ app: router }); - injectWebSocket = webSocket.injectWebSocket; - return webSocket.upgradeWebSocket; - }; + config.getUpgradeWebSocket = () => upgradeWebSocket!; } // Setup topology const runConfig = RunConfigSchema.parse(inputConfig); + let topology; if (config.driver.topology === "standalone") { - const topology = new StandaloneTopology(registry.config, runConfig); - if (!injectWebSocket) throw new Error("injectWebSocket not defined"); - return { router: topology.router, injectWebSocket }; + topology = new StandaloneTopology(registry.config, runConfig); } else if (config.driver.topology === "partition") { throw new Error("Node.js only supports standalone & coordinate topology."); } else if (config.driver.topology === "coordinate") { - const topology = new CoordinateTopology(registry.config, runConfig); - if (!injectWebSocket) throw new Error("injectWebSocket not defined"); - return { router: topology.router, injectWebSocket }; + topology = new CoordinateTopology(registry.config, runConfig); } else { assertUnreachable(config.driver.topology); } -} - -function serve(registry: Registry, inputConfig?: InputConfig): ServerType { - const config = ConfigSchema.parse(inputConfig); - const { router, injectWebSocket } = createRouter(registry, config); + // Inject WebSocket + const nodeWebSocket = createNodeWebSocket({ app: topology.router }); + upgradeWebSocket = nodeWebSocket.upgradeWebSocket; const server = honoServe({ - fetch: router.fetch, + fetch: topology.router.fetch, hostname: config.hostname, port: config.port, }); - injectWebSocket(server); + nodeWebSocket.injectWebSocket(server); logger().info("rivetkit started", { hostname: config.hostname, diff --git a/packages/core/src/topologies/coordinate/topology.ts b/packages/core/src/topologies/coordinate/topology.ts index 10d72a380..21469042b 100644 --- a/packages/core/src/topologies/coordinate/topology.ts +++ b/packages/core/src/topologies/coordinate/topology.ts @@ -68,8 +68,6 @@ export class CoordinateTopology { // Build router const router = new Hono(); - const upgradeWebSocket = runConfig.getUpgradeWebSocket?.(router); - // Share connection handlers for both routers const connectionHandlers: ConnectionHandlers = { onConnectWebSocket: async ( diff --git a/packages/core/src/topologies/partition/actor-router.ts b/packages/core/src/topologies/partition/actor-router.ts index 23a02c8f7..ebd9abae3 100644 --- a/packages/core/src/topologies/partition/actor-router.ts +++ b/packages/core/src/topologies/partition/actor-router.ts @@ -27,6 +27,7 @@ import type { RegistryConfig } from "@/registry/config"; import type { RunConfig } from "@/registry/run-config"; import { Hono, type Context as HonoContext } from "hono"; import { logger } from "./log"; +import { noopNext } from "@/common/utils"; export type { ConnectWebSocketOpts, @@ -57,8 +58,6 @@ export function createActorRouter( ): Hono { const router = new Hono({ strict: false }); - const upgradeWebSocket = runConfig.getUpgradeWebSocket?.(router); - router.use("*", loggerMiddleware(logger())); router.get("/", (c) => { @@ -74,9 +73,9 @@ export function createActorRouter( // Use the handlers from connectionHandlers const handlers = handler.connectionHandlers; - if (upgradeWebSocket) { - router.get( - "/connect/websocket", + router.get("/connect/websocket", async (c) => { + let upgradeWebSocket = runConfig.getUpgradeWebSocket?.(); + if (upgradeWebSocket) { upgradeWebSocket(async (c) => { const actorId = await handler.getActorId(); const encodingRaw = c.req.header(HEADER_ENCODING); @@ -99,16 +98,14 @@ export function createActorRouter( connParams, authData, ); - }), - ); - } else { - router.get("/connect/websocket", (c) => { + })(c, noopNext()); + } else { return c.text( "WebSockets are not enabled for this driver. Use SSE instead.", 400, ); - }); - } + } + }); router.get("/connect/sse", async (c) => { if (!handlers.onConnectSse) { diff --git a/packages/core/src/utils.ts b/packages/core/src/utils.ts index bd89a2022..c0ded72bc 100644 --- a/packages/core/src/utils.ts +++ b/packages/core/src/utils.ts @@ -3,11 +3,6 @@ export { stringifyError } from "@/common/utils"; import type { Context as HonoContext, Handler as HonoHandler } from "hono"; import pkgJson from "../package.json" with { type: "json" }; -import { logger } from "./actor/log"; -import { createMemoryDriver } from "./drivers/memory/mod"; -import { createRivetManagerDriver } from "./drivers/rivet/mod"; -import { type DriverConfig, UserError } from "./mod"; -import { createFileSystemDriver } from "./drivers/file-system/mod"; export const VERSION = pkgJson.version; @@ -35,25 +30,6 @@ export type UpgradeWebSocket = ( createEvents: (c: HonoContext) => any, ) => HonoHandler; -/** - * Determines which driver to use if none is provided. - */ -export function createDefaultDriver(): DriverConfig { - const driver = getEnvUniversal("RIVETKIT_DRIVER"); - if (!driver || driver === "file-system") { - logger().info("using default file system driver"); - return createFileSystemDriver(); - } else if (driver === "memory") { - logger().info("using default memory driver"); - return createMemoryDriver(); - } else if (driver === "rivet") { - logger().info("using default rivet driver"); - return createRivetManagerDriver(); - } else { - throw new UserError(`Unrecognized driver: ${driver}`); - } -} - export function getEnvUniversal(key: string): string | undefined { if (typeof Deno !== "undefined") { return Deno.env.get(key);