diff --git a/packages/actor/scripts/dump-openapi.ts b/packages/actor/scripts/dump-openapi.ts index 1a69a6b06..e7a61ce85 100644 --- a/packages/actor/scripts/dump-openapi.ts +++ b/packages/actor/scripts/dump-openapi.ts @@ -41,7 +41,7 @@ function main() { }; const managerRouter = createManagerRouter(appConfig, driverConfig, { - proxyMode: { + routingHandler: { inline: { handlers: sharedConnectionHandlers, }, diff --git a/packages/actor/src/actor/conn-routing-handler.ts b/packages/actor/src/actor/conn-routing-handler.ts new file mode 100644 index 000000000..7b5eb3b7d --- /dev/null +++ b/packages/actor/src/actor/conn-routing-handler.ts @@ -0,0 +1,53 @@ +import type { ConnectionHandlers as ConnHandlers } from "./router-endpoints"; +import type { Context as HonoContext, HonoRequest } from "hono"; + +/** + * Deterines how requests to actors should be routed. + * + * Inline handlers calls the connection handlers directly. + * + * Custom will let a custom function handle the request. This usually will proxy the request to another location. + */ +export type ConnRoutingHandler = + | { + inline: { + handlers: ConnHandlers; + }; + } + | { + custom: ConnRoutingHandlerCustom; + }; + +export interface ConnRoutingHandlerCustom { + sendRequest: SendRequestHandler; + openWebSocket: OpenWebSocketHandler; + proxyRequest: ProxyRequestHandler; + proxyWebSocket: ProxyWebSocketHandler; +} + +export type BuildProxyEndpoint = (c: HonoContext, actorId: string) => string; + +export type SendRequestHandler = ( + actorId: string, + meta: unknown | undefined, + actorRequest: Request, +) => Promise; + +export type OpenWebSocketHandler = ( + actorId: string, + meta?: unknown, +) => Promise; + +export type ProxyRequestHandler = ( + c: HonoContext, + actorRequest: Request, + actorId: string, + meta?: unknown, +) => Promise; + +export type ProxyWebSocketHandler = ( + c: HonoContext, + path: string, + actorId: string, + meta?: unknown, +) => Promise; diff --git a/packages/actor/src/actor/router-endpoints.ts b/packages/actor/src/actor/router-endpoints.ts index 9d1b6f3ea..99e831551 100644 --- a/packages/actor/src/actor/router-endpoints.ts +++ b/packages/actor/src/actor/router-endpoints.ts @@ -21,7 +21,7 @@ import type { DriverConfig } from "@/driver-helpers/config"; import invariant from "invariant"; export interface ConnectWebSocketOpts { - req: HonoRequest; + req?: HonoRequest; encoding: Encoding; params: unknown; actorId: string; @@ -34,7 +34,7 @@ export interface ConnectWebSocketOutput { } export interface ConnectSseOpts { - req: HonoRequest; + req?: HonoRequest; encoding: Encoding; params: unknown; actorId: string; @@ -46,7 +46,7 @@ export interface ConnectSseOutput { } export interface ActionOpts { - req: HonoRequest; + req?: HonoRequest; params: unknown; actionName: string; actionArgs: unknown[]; @@ -58,7 +58,7 @@ export interface ActionOutput { } export interface ConnsMessageOpts { - req: HonoRequest; + req?: HonoRequest; connId: string; connToken: string; message: messageToServer.ToServer; diff --git a/packages/actor/src/app/inline-client-driver.ts b/packages/actor/src/app/inline-client-driver.ts new file mode 100644 index 000000000..c8a2f2fe0 --- /dev/null +++ b/packages/actor/src/app/inline-client-driver.ts @@ -0,0 +1,212 @@ +import * as errors from "@/actor/errors"; +import * as protoHttpAction from "@/actor/protocol/http/action"; +import { logger } from "./log"; +import type { EventSource } from "eventsource"; +import type * as wsToServer from "@/actor/protocol/message/to-server"; +import { type Encoding, serialize } from "@/actor/protocol/serde"; +import { + HEADER_CONN_PARAMS, + HEADER_ENCODING, + type ConnectionHandlers, +} from "@/actor/router-endpoints"; +import { HonoRequest, type Context as HonoContext, type Next } from "hono"; +import invariant from "invariant"; +import { ClientDriver } from "@/client/client"; +import { ManagerDriver } from "@/manager/driver"; +import { ActorQuery } from "@/manager/protocol/query"; +import { ConnRoutingHandler } from "@/actor/conn-routing-handler"; +import { sendHttpRequest, serializeWithEncoding } from "@/client/utils"; +import { ActionRequest, ActionResponse } from "@/actor/protocol/http/action"; +import { assertUnreachable } from "@/actor/utils"; + +/** + * Client driver that calls the manager driver inline. + * + * This driver can access private resources. + * + * This driver serves a double purpose as: + * - Providing the client for the internal requests + * - Provide the driver for the manager HTTP router (see manager/router.ts) + */ +export function createInlineClientDriver( + managerDriver: ManagerDriver, + routingHandler: ConnRoutingHandler, +): ClientDriver { + //// Lazily import the dynamic imports so we don't have to turn `createClient` in to an aysnc fn + //const dynamicImports = (async () => { + // // Import dynamic dependencies + // const [WebSocket, EventSource] = await Promise.all([ + // importWebSocket(), + // importEventSource(), + // ]); + // return { + // WebSocket, + // EventSource, + // }; + //})(); + + const driver: ClientDriver = { + action: async = unknown[], Response = unknown>( + req: HonoRequest | undefined, + actorQuery: ActorQuery, + encoding: Encoding, + params: unknown, + actionName: string, + ...args: Args + ): Promise => { + // Get the actor ID and meta + const { actorId, meta } = await queryActor( + req, + actorQuery, + managerDriver, + ); + logger().debug("found actor for action", { actorId, meta }); + invariant(actorId, "Missing actor ID"); + + // Invoke the action + logger().debug("handling action", { actionName, encoding }); + if ("inline" in routingHandler) { + const { output } = await routingHandler.inline.handlers.onAction({ + req, + params, + actionName, + actionArgs: args, + actorId, + }); + return output as Response; + } else if ("custom" in routingHandler) { + const responseData = await sendHttpRequest< + ActionRequest, + ActionResponse + >({ + url: `http://actor/action/${encodeURIComponent(actionName)}`, + method: "POST", + headers: { + [HEADER_ENCODING]: encoding, + ...(params !== undefined + ? { [HEADER_CONN_PARAMS]: JSON.stringify(params) } + : {}), + }, + body: { a: args } satisfies ActionRequest, + encoding: encoding, + customFetch: routingHandler.custom.sendRequest.bind( + undefined, + actorId, + meta, + ), + }); + + return responseData.o as Response; + } else { + assertUnreachable(routingHandler); + } + }, + + resolveActorId: async ( + req: HonoRequest | undefined, + actorQuery: ActorQuery, + _encodingKind: Encoding, + ): Promise => { + // Get the actor ID and meta + const { actorId } = await queryActor(req, actorQuery, managerDriver); + logger().debug("resolved actor", { actorId }); + invariant(actorId, "missing actor ID"); + + return actorId; + }, + + connectWebSocket: async ( + req: HonoRequest | undefined, + actorQuery: ActorQuery, + encodingKind: Encoding, + ): Promise => { + throw "UNIMPLEMENTED"; + }, + + connectSse: async ( + req: HonoRequest | undefined, + actorQuery: ActorQuery, + encodingKind: Encoding, + params: unknown, + ): Promise => { + throw "UNIMPLEMENTED"; + }, + + sendHttpMessage: async ( + req: HonoRequest | undefined, + actorId: string, + encoding: Encoding, + connectionId: string, + connectionToken: string, + message: wsToServer.ToServer, + ): Promise => { + throw "UNIMPLEMENTED"; + }, + }; + + return driver; +} + +/** + * Query the manager driver to get or create an actor based on the provided query + */ +export async function queryActor( + req: HonoRequest | undefined, + query: ActorQuery, + driver: ManagerDriver, +): Promise<{ actorId: string; meta?: unknown }> { + logger().debug("querying actor", { query }); + let actorOutput: { actorId: string; meta?: unknown }; + if ("getForId" in query) { + const output = await driver.getForId({ + req, + actorId: query.getForId.actorId, + }); + if (!output) throw new errors.ActorNotFound(query.getForId.actorId); + actorOutput = output; + } else if ("getForKey" in query) { + const existingActor = await driver.getWithKey({ + req, + name: query.getForKey.name, + key: query.getForKey.key, + }); + if (!existingActor) { + throw new errors.ActorNotFound( + `${query.getForKey.name}:${JSON.stringify(query.getForKey.key)}`, + ); + } + actorOutput = existingActor; + } else if ("getOrCreateForKey" in query) { + const getOrCreateOutput = await driver.getOrCreateWithKey({ + req, + name: query.getOrCreateForKey.name, + key: query.getOrCreateForKey.key, + input: query.getOrCreateForKey.input, + region: query.getOrCreateForKey.region, + }); + actorOutput = { + actorId: getOrCreateOutput.actorId, + meta: getOrCreateOutput.meta, + }; + } else if ("create" in query) { + const createOutput = await driver.createActor({ + req, + name: query.create.name, + key: query.create.key, + input: query.create.input, + region: query.create.region, + }); + actorOutput = { + actorId: createOutput.actorId, + meta: createOutput.meta, + }; + } else { + throw new errors.InvalidRequest("Invalid query format"); + } + + logger().debug("actor query result", { + actorId: actorOutput.actorId, + meta: actorOutput.meta, + }); + return { actorId: actorOutput.actorId, meta: actorOutput.meta }; +} diff --git a/packages/actor/src/app/log.ts b/packages/actor/src/app/log.ts new file mode 100644 index 000000000..d56c5aa74 --- /dev/null +++ b/packages/actor/src/app/log.ts @@ -0,0 +1,7 @@ +import { getLogger } from "@/common//log"; + +export const LOGGER_NAME = "actor-app"; + +export function logger() { + return getLogger(LOGGER_NAME); +} diff --git a/packages/actor/src/client/actor-conn.ts b/packages/actor/src/client/actor-conn.ts index 9de47b400..5bf304c76 100644 --- a/packages/actor/src/client/actor-conn.ts +++ b/packages/actor/src/client/actor-conn.ts @@ -251,6 +251,7 @@ enc async #connectWebSocket() { const ws = await this.#driver.connectWebSocket( + undefined, this.#actorQuery, this.#encodingKind, ); @@ -281,6 +282,7 @@ enc async #connectSse() { const eventSource = await this.#driver.connectSse( + undefined, this.#actorQuery, this.#encodingKind, this.#params, @@ -649,6 +651,7 @@ enc throw new errors.InternalError("Missing connection ID or token."); const res = await this.#driver.sendHttpMessage( + undefined, this.#actorId, this.#encodingKind, this.#connectionId, diff --git a/packages/actor/src/client/actor-handle.ts b/packages/actor/src/client/actor-handle.ts index c1c458808..0da6fa69b 100644 --- a/packages/actor/src/client/actor-handle.ts +++ b/packages/actor/src/client/actor-handle.ts @@ -61,6 +61,7 @@ export class ActorHandleRaw { ...args: Args ): Promise { return await this.#driver.action( + undefined, this.#actorQuery, this.#encodingKind, this.#params, @@ -105,6 +106,7 @@ export class ActorHandleRaw { ) { // TODO: const actorId = await this.#driver.resolveActorId( + undefined, this.#actorQuery, this.#encodingKind, ); diff --git a/packages/actor/src/client/client.ts b/packages/actor/src/client/client.ts index 91145fdb6..b0f574158 100644 --- a/packages/actor/src/client/client.ts +++ b/packages/actor/src/client/client.ts @@ -15,6 +15,7 @@ import type { AnyActorDefinition } from "@/actor/definition"; import type * as wsToServer from "@/actor/protocol/message/to-server"; import type { EventSource } from "eventsource"; import { createHttpClientDriver } from "./http-client-driver"; +import { HonoRequest } from "hono"; /** Extract the actor registry from the app definition. */ export type ExtractActorsFromApp> = @@ -159,6 +160,7 @@ export const TRANSPORT_SYMBOL = Symbol("transport"); export interface ClientDriver { action = unknown[], Response = unknown>( + req: HonoRequest | undefined, actorQuery: ActorQuery, encoding: Encoding, params: unknown, @@ -166,19 +168,23 @@ export interface ClientDriver { ...args: Args ): Promise; resolveActorId( + req: HonoRequest | undefined, actorQuery: ActorQuery, encodingKind: Encoding, ): Promise; connectWebSocket( + req: HonoRequest | undefined, actorQuery: ActorQuery, encodingKind: Encoding, ): Promise; connectSse( + req: HonoRequest | undefined, actorQuery: ActorQuery, encodingKind: Encoding, params: unknown, ): Promise; sendHttpMessage( + req: HonoRequest | undefined, actorId: string, encoding: Encoding, connectionId: string, @@ -353,6 +359,7 @@ export class ClientRaw { // Create the actor const actorId = await this.#driver.resolveActorId( + undefined, createQuery, this.#encodingKind, ); diff --git a/packages/actor/src/client/http-client-driver.ts b/packages/actor/src/client/http-client-driver.ts index 94cdb0c36..ef7a4da47 100644 --- a/packages/actor/src/client/http-client-driver.ts +++ b/packages/actor/src/client/http-client-driver.ts @@ -25,6 +25,7 @@ import { import type { ActionRequest } from "@/actor/protocol/http/action"; import type { ActionResponse } from "@/actor/protocol/message/to-client"; import { ClientDriver } from "./client"; +import { HonoRequest } from "hono"; /** * Client driver that communicates with the manager via HTTP. @@ -45,6 +46,7 @@ export function createHttpClientDriver(managerEndpoint: string): ClientDriver { const driver: ClientDriver = { action: async = unknown[], Response = unknown>( + _req: HonoRequest | undefined, actorQuery: ActorQuery, encoding: Encoding, params: unknown, @@ -77,6 +79,7 @@ export function createHttpClientDriver(managerEndpoint: string): ClientDriver { }, resolveActorId: async ( + _req: HonoRequest | undefined, actorQuery: ActorQuery, encodingKind: Encoding, ): Promise => { @@ -112,6 +115,7 @@ export function createHttpClientDriver(managerEndpoint: string): ClientDriver { }, connectWebSocket: async ( + _req: HonoRequest | undefined, actorQuery: ActorQuery, encodingKind: Encoding, ): Promise => { @@ -140,6 +144,7 @@ export function createHttpClientDriver(managerEndpoint: string): ClientDriver { }, connectSse: async ( + _req: HonoRequest | undefined, actorQuery: ActorQuery, encodingKind: Encoding, params: unknown, @@ -170,6 +175,7 @@ export function createHttpClientDriver(managerEndpoint: string): ClientDriver { }, sendHttpMessage: async ( + _req: HonoRequest | undefined, actorId: string, encoding: Encoding, connectionId: string, diff --git a/packages/actor/src/client/utils.ts b/packages/actor/src/client/utils.ts index 915032143..aeb3a4c57 100644 --- a/packages/actor/src/client/utils.ts +++ b/packages/actor/src/client/utils.ts @@ -1,5 +1,4 @@ -import { deserialize } from "@/actor/protocol/serde"; -import { assertUnreachable, stringifyError } from "@/common/utils"; +import { assertUnreachable } from "@/common/utils"; import { httpUserAgent } from "@/utils"; import { Encoding } from "@/mod"; import * as cbor from "cbor-x"; @@ -32,6 +31,7 @@ export interface HttpRequestOpts { body?: Body; encoding: Encoding; skipParseResponse?: boolean; + customFetch?: (req: Request) => Promise; } export async function sendHttpRequest< @@ -62,19 +62,21 @@ export async function sendHttpRequest< let response: Response; try { // Make the HTTP request - response = await fetch(opts.url, { - method: opts.method, - headers: { - ...opts.headers, - ...(contentType - ? { - "Content-Type": contentType, - } - : {}), - "User-Agent": httpUserAgent(), - }, - body: bodyData, - }); + response = await (opts.customFetch ?? fetch)( + new Request(opts.url, { + method: opts.method, + headers: { + ...opts.headers, + ...(contentType + ? { + "Content-Type": contentType, + } + : {}), + "User-Agent": httpUserAgent(), + }, + body: bodyData + }), + ); } catch (error) { throw new HttpRequestError(`Request failed: ${error}`, { cause: error, diff --git a/packages/actor/src/driver-test-suite/mod.ts b/packages/actor/src/driver-test-suite/mod.ts index 574435a7f..dc78c5fa4 100644 --- a/packages/actor/src/driver-test-suite/mod.ts +++ b/packages/actor/src/driver-test-suite/mod.ts @@ -17,7 +17,7 @@ import { createNodeWebSocket, type NodeWebSocket } from "@hono/node-ws"; import invariant from "invariant"; import { bundleRequire } from "bundle-require"; import { getPort } from "@/test/mod"; -import { Transport } from "@/client/mod"; +import { Client, Transport } from "@/client/mod"; import { runActorConnTests } from "./tests/actor-conn"; import { runActorHandleTests } from "./tests/actor-handle"; import { runActionFeaturesTests } from "./tests/action-features"; @@ -25,6 +25,7 @@ import { runActorVarsTests } from "./tests/actor-vars"; import { runActorConnStateTests } from "./tests/actor-conn-state"; import { runActorMetadataTests } from "./tests/actor-metadata"; import { runActorErrorHandlingTests } from "./tests/actor-error-handling"; +import { ClientDriver } from "@/client/client"; export interface DriverTestConfig { /** Deploys an app and returns the connection endpoint. */ @@ -40,40 +41,61 @@ export interface DriverTestConfig { HACK_skipCleanupNet?: boolean; transport?: Transport; + + clientType: ClientType; } +/** + * The type of client to run the test with. + * + * The logic for HTTP vs inline is very different, so this helps validate all behavior matches. + **/ +type ClientType = "http" | "inline"; + export interface DriverDeployOutput { endpoint: string; + inlineClientDriver: ClientDriver; /** Cleans up the test. */ cleanup(): Promise; } /** Runs all Vitest tests against the provided drivers. */ -export function runDriverTests(driverTestConfig: DriverTestConfig) { - runActorDriverTests(driverTestConfig); - runManagerDriverTests(driverTestConfig); - - for (const transport of ["websocket", "sse"] as Transport[]) { - describe(`actor connection (${transport})`, () => { - runActorConnTests({ - ...driverTestConfig, - transport, - }); - - runActorConnStateTests({ ...driverTestConfig, transport }); - }); - } +export function runDriverTests( + driverTestConfigPartial: Omit, +) { + for (const clientType of ["http", "inline"] as ClientType[]) { + const driverTestConfig: DriverTestConfig = { + ...driverTestConfigPartial, + clientType, + }; - runActorHandleTests(driverTestConfig); + describe(`client type (${clientType})`, () => { + runActorDriverTests(driverTestConfig); + runManagerDriverTests(driverTestConfig); - runActionFeaturesTests(driverTestConfig); + for (const transport of ["websocket", "sse"] as Transport[]) { + describe(`transport (${transport})`, () => { + runActorConnTests({ + ...driverTestConfig, + transport, + }); - runActorVarsTests(driverTestConfig); + runActorConnStateTests({ ...driverTestConfig, transport }); + }); + } - runActorMetadataTests(driverTestConfig); + runActorHandleTests(driverTestConfig); - runActorErrorHandlingTests(driverTestConfig); + runActionFeaturesTests(driverTestConfig); + + runActorVarsTests(driverTestConfig); + + runActorMetadataTests(driverTestConfig); + + runActorErrorHandlingTests(driverTestConfig); + }); + } } /** @@ -144,5 +166,9 @@ export async function createTestRuntime( await driverCleanup?.(); }; - return { endpoint: `http://127.0.0.1:${port}`, cleanup }; + return { + endpoint: `http://127.0.0.1:${port}`, + inlineClientDriver: topology.clientDriver, + cleanup, + }; } diff --git a/packages/actor/src/driver-test-suite/tests/actor-error-handling.ts b/packages/actor/src/driver-test-suite/tests/actor-error-handling.ts index f59c0c5d4..e10786255 100644 --- a/packages/actor/src/driver-test-suite/tests/actor-error-handling.ts +++ b/packages/actor/src/driver-test-suite/tests/actor-error-handling.ts @@ -1,6 +1,6 @@ -import { describe, test, expect, vi } from "vitest"; +import { describe, test, expect } from "vitest"; import type { DriverTestConfig } from "../mod"; -import { setupDriverTest, waitFor } from "../utils"; +import { setupDriverTest } from "../utils"; import { ERROR_HANDLING_APP_PATH, type ErrorHandlingApp } from "../test-apps"; export function runActorErrorHandlingTests(driverTestConfig: DriverTestConfig) { diff --git a/packages/actor/src/driver-test-suite/utils.ts b/packages/actor/src/driver-test-suite/utils.ts index f0c455665..4d2a0dfa9 100644 --- a/packages/actor/src/driver-test-suite/utils.ts +++ b/packages/actor/src/driver-test-suite/utils.ts @@ -1,7 +1,10 @@ import type { ActorCoreApp } from "@/mod"; import { type TestContext, vi } from "vitest"; import { createClient, type Client } from "@/client/mod"; -import type { DriverTestConfig, } from "./mod"; +import type { DriverTestConfig } from "./mod"; +import { assertUnreachable } from "@/actor/utils"; +import { createInlineClientDriver } from "@/app/inline-client-driver"; +import { createClientWithDriver } from "@/client/client"; // Must use `TestContext` since global hooks do not work when running concurrently export async function setupDriverTest>( @@ -16,13 +19,24 @@ export async function setupDriverTest>( } // Build drivers - const { endpoint, cleanup } = await driverTestConfig.start(appPath); + const { endpoint, inlineClientDriver, cleanup } = + await driverTestConfig.start(appPath); c.onTestFinished(cleanup); - // Create client - const client = createClient(endpoint, { - transport: driverTestConfig.transport, - }); + let client: Client; + if (driverTestConfig.clientType === "http") { + // Create client + client = createClient(endpoint, { + transport: driverTestConfig.transport, + }); + } else if (driverTestConfig.clientType === "inline") { + // Use inline client from driver + client = createClientWithDriver(inlineClientDriver); + } else { + assertUnreachable(driverTestConfig.clientType); + } + + // Cleanup client if (!driverTestConfig.HACK_skipCleanupNet) { c.onTestFinished(async () => await client.dispose()); } diff --git a/packages/actor/src/manager/driver.ts b/packages/actor/src/manager/driver.ts index 714893881..d95713f89 100644 --- a/packages/actor/src/manager/driver.ts +++ b/packages/actor/src/manager/driver.ts @@ -1,6 +1,6 @@ import type { ActorKey } from "@/common/utils"; import type { ManagerInspector } from "@/inspector/manager"; -import type { Env, Context as HonoContext } from "hono"; +import type { Env, Context as HonoContext, HonoRequest } from "hono"; export interface ManagerDriver { getForId(input: GetForIdInput): Promise; @@ -11,18 +11,18 @@ export interface ManagerDriver { inspector?: ManagerInspector; } export interface GetForIdInput { - c?: HonoContext; + req?: HonoRequest | undefined; actorId: string; } export interface GetWithKeyInput { - c?: HonoContext; + req?: HonoRequest | undefined; name: string; key: ActorKey; } export interface GetOrCreateWithKeyInput { - c?: HonoContext; + req?: HonoRequest | undefined; name: string; key: ActorKey; input?: unknown; @@ -30,7 +30,7 @@ export interface GetOrCreateWithKeyInput { } export interface CreateInput { - c?: HonoContext; + req?: HonoRequest | undefined; name: string; key: ActorKey; input?: unknown; diff --git a/packages/actor/src/manager/router.ts b/packages/actor/src/manager/router.ts index 18fa24a9a..4c1ef2105 100644 --- a/packages/actor/src/manager/router.ts +++ b/packages/actor/src/manager/router.ts @@ -49,39 +49,11 @@ import { } from "./protocol/query"; import type { ActorQuery } from "./protocol/query"; import { VERSION } from "@/utils"; - -type ProxyMode = - | { - inline: { - handlers: ConnectionHandlers; - }; - } - | { - custom: { - onProxyRequest: OnProxyRequest; - onProxyWebSocket: OnProxyWebSocket; - }; - }; - -export type BuildProxyEndpoint = (c: HonoContext, actorId: string) => string; - -export type OnProxyRequest = ( - c: HonoContext, - actorRequest: Request, - actorId: string, - meta?: unknown, -) => Promise; - -export type OnProxyWebSocket = ( - c: HonoContext, - path: string, - actorId: string, - meta?: unknown, -) => Promise; +import { ConnRoutingHandler } from "@/actor/conn-routing-handler"; type ManagerRouterHandler = { onConnectInspector?: ManagerInspectorConnHandler; - proxyMode: ProxyMode; + routingHandler: ConnRoutingHandler; }; const OPENAPI_ENCODING = z.string().openapi({ @@ -408,14 +380,14 @@ export async function queryActor( let actorOutput: { actorId: string; meta?: unknown }; if ("getForId" in query) { const output = await driver.getForId({ - c, + req: c.req, actorId: query.getForId.actorId, }); if (!output) throw new errors.ActorNotFound(query.getForId.actorId); actorOutput = output; } else if ("getForKey" in query) { const existingActor = await driver.getWithKey({ - c, + req: c.req, name: query.getForKey.name, key: query.getForKey.key, }); @@ -427,7 +399,7 @@ export async function queryActor( actorOutput = existingActor; } else if ("getOrCreateForKey" in query) { const getOrCreateOutput = await driver.getOrCreateWithKey({ - c, + req: c.req, name: query.getOrCreateForKey.name, key: query.getOrCreateForKey.key, input: query.getOrCreateForKey.input, @@ -439,7 +411,7 @@ export async function queryActor( }; } else if ("create" in query) { const createOutput = await driver.createActor({ - c, + req: c.req, name: query.create.name, key: query.create.key, input: query.create.input, @@ -496,17 +468,17 @@ async function handleSseConnectRequest( logger().debug("sse connection to actor", { actorId, meta }); // Handle based on mode - if ("inline" in handler.proxyMode) { + if ("inline" in handler.routingHandler) { logger().debug("using inline proxy mode for sse connection"); // Use the shared SSE handler return await handleSseConnect( c, appConfig, driverConfig, - handler.proxyMode.inline.handlers.onConnectSse, + handler.routingHandler.inline.handlers.onConnectSse, actorId, ); - } else if ("custom" in handler.proxyMode) { + } else if ("custom" in handler.routingHandler) { logger().debug("using custom proxy mode for sse connection"); const url = new URL("http://actor/connect/sse"); const proxyRequest = new Request(url, c.req.raw); @@ -514,14 +486,14 @@ async function handleSseConnectRequest( if (params.data.connParams) { proxyRequest.headers.set(HEADER_CONN_PARAMS, params.data.connParams); } - return await handler.proxyMode.custom.onProxyRequest( + return await handler.routingHandler.custom.proxyRequest( c, proxyRequest, actorId, meta, ); } else { - assertUnreachable(handler.proxyMode); + assertUnreachable(handler.routingHandler); } } catch (error) { // If we receive an error during setup, we send the error and close the socket immediately @@ -616,15 +588,15 @@ async function handleWebSocketConnectRequest( logger().debug("found actor for websocket connection", { actorId, meta }); invariant(actorId, "missing actor id"); - if ("inline" in handler.proxyMode) { + if ("inline" in handler.routingHandler) { logger().debug("using inline proxy mode for websocket connection"); invariant( - handler.proxyMode.inline.handlers.onConnectWebSocket, + handler.routingHandler.inline.handlers.onConnectWebSocket, "onConnectWebSocket not provided", ); const onConnectWebSocket = - handler.proxyMode.inline.handlers.onConnectWebSocket; + handler.routingHandler.inline.handlers.onConnectWebSocket; return upgradeWebSocket((c) => { return handleWebSocketConnect( c, @@ -634,16 +606,16 @@ async function handleWebSocketConnectRequest( actorId, )(); })(c, noopNext()); - } else if ("custom" in handler.proxyMode) { + } else if ("custom" in handler.routingHandler) { logger().debug("using custom proxy mode for websocket connection"); - return await handler.proxyMode.custom.onProxyWebSocket( + return await handler.routingHandler.custom.proxyWebSocket( c, `/connect/websocket?encoding=${params.data.encoding}`, actorId, meta, ); } else { - assertUnreachable(handler.proxyMode); + assertUnreachable(handler.routingHandler); } } catch (error) { // If we receive an error during setup, we send the error and close the socket immediately @@ -715,18 +687,18 @@ async function handleMessageRequest( const { actorId, connId, encoding, connToken } = params.data; // Handle based on mode - if ("inline" in handler.proxyMode) { + if ("inline" in handler.routingHandler) { logger().debug("using inline proxy mode for connection message"); // Use shared connection message handler with direct parameters return handleConnectionMessage( c, appConfig, - handler.proxyMode.inline.handlers.onConnMessage, + handler.routingHandler.inline.handlers.onConnMessage, connId, connToken as string, actorId, ); - } else if ("custom" in handler.proxyMode) { + } else if ("custom" in handler.routingHandler) { logger().debug("using custom proxy mode for connection message"); const url = new URL(`http://actor/connections/message`); @@ -735,13 +707,13 @@ async function handleMessageRequest( proxyRequest.headers.set(HEADER_CONN_ID, connId); proxyRequest.headers.set(HEADER_CONN_TOKEN, connToken); - return await handler.proxyMode.custom.onProxyRequest( + return await handler.routingHandler.custom.proxyRequest( c, proxyRequest, actorId, ); } else { - assertUnreachable(handler.proxyMode); + assertUnreachable(handler.routingHandler); } } catch (error) { logger().error("error proxying connection message", { error }); @@ -788,31 +760,34 @@ async function handleActionRequest( invariant(actorId, "Missing actor ID"); // Handle based on mode - if ("inline" in handler.proxyMode) { + if ("inline" in handler.routingHandler) { logger().debug("using inline proxy mode for action call"); // Use shared action handler with direct parameter return handleAction( c, appConfig, driverConfig, - handler.proxyMode.inline.handlers.onAction, + handler.routingHandler.inline.handlers.onAction, actionName, actorId, ); - } else if ("custom" in handler.proxyMode) { + } else if ("custom" in handler.routingHandler) { logger().debug("using custom proxy mode for action call"); + + // TODO: Encoding + // TODO: Parameters const url = new URL( `http://actor/action/${encodeURIComponent(actionName)}`, ); const proxyRequest = new Request(url, c.req.raw); - return await handler.proxyMode.custom.onProxyRequest( + return await handler.routingHandler.custom.proxyRequest( c, proxyRequest, actorId, meta, ); } else { - assertUnreachable(handler.proxyMode); + assertUnreachable(handler.routingHandler); } } catch (error) { logger().error("error in action handler", { error }); diff --git a/packages/actor/src/topologies/coordinate/router/sse.ts b/packages/actor/src/topologies/coordinate/router/sse.ts index 456c3e220..712a410a7 100644 --- a/packages/actor/src/topologies/coordinate/router/sse.ts +++ b/packages/actor/src/topologies/coordinate/router/sse.ts @@ -4,9 +4,9 @@ import { encodeDataToString, serialize } from "@/actor/protocol/serde"; import type { CoordinateDriver } from "../driver"; import { RelayConn } from "../conn/mod"; import type { ActorDriver } from "@/actor/driver"; -import type { ConnectSseOpts, ConnectSseOutput } from "@/actor/router"; import { DriverConfig } from "@/driver-helpers/config"; import { AppConfig } from "@/app/config"; +import { ConnectSseOpts, ConnectSseOutput } from "@/actor/router-endpoints"; export async function serveSse( appConfig: AppConfig, @@ -15,7 +15,7 @@ export async function serveSse( CoordinateDriver: CoordinateDriver, globalState: GlobalState, actorId: string, - { encoding, params: params }: ConnectSseOpts, + { encoding, params }: ConnectSseOpts, ): Promise { let conn: RelayConn | undefined; return { diff --git a/packages/actor/src/topologies/coordinate/router/websocket.ts b/packages/actor/src/topologies/coordinate/router/websocket.ts index a5c7fb15d..618f02b79 100644 --- a/packages/actor/src/topologies/coordinate/router/websocket.ts +++ b/packages/actor/src/topologies/coordinate/router/websocket.ts @@ -8,9 +8,9 @@ import type { CoordinateDriver } from "../driver"; import { RelayConn } from "../conn/mod"; import { publishMessageToLeader } from "../node/message"; import type { ActorDriver } from "@/actor/driver"; -import type { ConnectWebSocketOpts, ConnectWebSocketOutput } from "@/actor/router"; -import { DriverConfig } from "@/driver-helpers/config"; -import { AppConfig } from "@/app/config"; +import type { DriverConfig } from "@/driver-helpers/config"; +import type { AppConfig } from "@/app/config"; +import { ConnectWebSocketOpts, ConnectWebSocketOutput } from "@/actor/router-endpoints"; export async function serveWebSocket( appConfig: AppConfig, @@ -65,7 +65,7 @@ export async function serveWebSocket( }, }, }, - req.raw.signal, + req?.raw.signal, ); }, onClose: async () => { diff --git a/packages/actor/src/topologies/coordinate/topology.ts b/packages/actor/src/topologies/coordinate/topology.ts index 09b2b6a9b..13b3410fc 100644 --- a/packages/actor/src/topologies/coordinate/topology.ts +++ b/packages/actor/src/topologies/coordinate/topology.ts @@ -1,5 +1,3 @@ -import { serveSse } from "./router/sse"; -import { serveWebSocket } from "./router/websocket"; import { Node } from "./node/mod"; import type { ActorPeer } from "./actor-peer"; import * as errors from "@/actor/errors"; @@ -7,7 +5,6 @@ import * as events from "node:events"; import { publishMessageToLeader } from "./node/message"; import type { RelayConn } from "./conn/mod"; import { Hono } from "hono"; -import { createActorRouter } from "@/actor/router"; import { handleRouteError, handleRouteNotFound } from "@/common/router"; import type { DriverConfig } from "@/driver-helpers/config"; import type { AppConfig } from "@/app/config"; @@ -22,6 +19,12 @@ import type { ActionOutput, ConnectionHandlers, } from "@/actor/router-endpoints"; +import invariant from "invariant"; +import { createInlineClientDriver } from "@/app/inline-client-driver"; +import { serveWebSocket } from "./router/websocket"; +import { serveSse } from "./router/sse"; +import { ClientDriver } from "@/client/client"; +import { ConnRoutingHandler } from "@/actor/conn-routing-handler"; export interface GlobalState { nodeId: string; @@ -34,6 +37,7 @@ export interface GlobalState { } export class CoordinateTopology { + public readonly clientDriver: ClientDriver; public readonly router: Hono; constructor(appConfig: AppConfig, driverConfig: DriverConfig) { @@ -110,18 +114,23 @@ export class CoordinateTopology { }, }, }, - opts.req.raw.signal, + opts.req?.raw.signal, ); }, }; + const routingHandler: ConnRoutingHandler = { + inline: { handlers: connectionHandlers }, + }; + + // Create driver + const managerDriver = driverConfig.drivers.manager; + invariant(managerDriver, "missing manager driver"); + this.clientDriver = createInlineClientDriver(managerDriver, routingHandler); + // Build manager router const managerRouter = createManagerRouter(appConfig, driverConfig, { - proxyMode: { - inline: { - handlers: connectionHandlers, - }, - }, + routingHandler, onConnectInspector: () => { throw new errors.Unsupported("inspect"); }, diff --git a/packages/actor/src/topologies/partition/actor-router.ts b/packages/actor/src/topologies/partition/actor-router.ts new file mode 100644 index 000000000..3502b9170 --- /dev/null +++ b/packages/actor/src/topologies/partition/actor-router.ts @@ -0,0 +1,188 @@ +import { Hono, type Context as HonoContext } from "hono"; +import { logger } from "./log"; +import { cors } from "hono/cors"; +import { + handleRouteError, + handleRouteNotFound, + loggerMiddleware, +} from "@/common/router"; +import type { DriverConfig } from "@/driver-helpers/config"; +import type { AppConfig } from "@/app/config"; +import { + type ActorInspectorConnHandler, + createActorInspectorRouter, +} from "@/inspector/actor"; +import { + type ConnectWebSocketOpts, + type ConnectWebSocketOutput, + type ConnectSseOpts, + type ConnectSseOutput, + type ActionOpts, + type ActionOutput, + type ConnsMessageOpts, + type ConnectionHandlers, + handleWebSocketConnect, + handleSseConnect, + handleAction, + handleConnectionMessage, + HEADER_CONN_TOKEN, + HEADER_CONN_ID, + ALL_HEADERS, +} from "@/actor/router-endpoints"; + +export type { + ConnectWebSocketOpts, + ConnectWebSocketOutput, + ConnectSseOpts, + ConnectSseOutput, + ActionOpts, + ActionOutput, + ConnsMessageOpts, +}; + +export interface ActorRouterHandler { + getActorId: () => Promise; + + // Connection handlers as a required subobject + connectionHandlers: ConnectionHandlers; + + onConnectInspector?: ActorInspectorConnHandler; +} + +/** + * Creates a router that runs on the partitioned instance. + */ +export function createActorRouter( + appConfig: AppConfig, + driverConfig: DriverConfig, + handler: ActorRouterHandler, +): Hono { + const app = new Hono(); + + const upgradeWebSocket = driverConfig.getUpgradeWebSocket?.(app); + + app.use("*", loggerMiddleware(logger())); + + // Apply CORS middleware if configured + // + //This is only relevant if the actor is exposed directly publicly + if (appConfig.cors) { + const corsConfig = appConfig.cors; + + app.use("*", async (c, next) => { + const path = c.req.path; + + // Don't apply to WebSocket routes, see https://hono.dev/docs/helpers/websocket#upgradewebsocket + if (path === "/connect/websocket" || path === "/inspect") { + return next(); + } + + return cors({ + ...corsConfig, + allowHeaders: [...(appConfig.cors?.allowHeaders ?? []), ...ALL_HEADERS], + })(c, next); + }); + } + + app.get("/", (c) => { + return c.text( + "This is an ActorCore server.\n\nLearn more at https://actorcore.org", + ); + }); + + app.get("/health", (c) => { + return c.text("ok"); + }); + + // Use the handlers from connectionHandlers + const handlers = handler.connectionHandlers; + + if (upgradeWebSocket && handlers.onConnectWebSocket) { + app.get( + "/connect/websocket", + upgradeWebSocket(async (c) => { + const actorId = await handler.getActorId(); + return handleWebSocketConnect( + c as HonoContext, + appConfig, + driverConfig, + handlers.onConnectWebSocket!, + actorId, + )(); + }), + ); + } else { + app.get("/connect/websocket", (c) => { + return c.text( + "WebSockets are not enabled for this driver. Use SSE instead.", + 400, + ); + }); + } + + app.get("/connect/sse", async (c) => { + if (!handlers.onConnectSse) { + throw new Error("onConnectSse handler is required"); + } + const actorId = await handler.getActorId(); + return handleSseConnect( + c, + appConfig, + driverConfig, + handlers.onConnectSse, + actorId, + ); + }); + + app.post("/action/:action", async (c) => { + if (!handlers.onAction) { + throw new Error("onAction handler is required"); + } + const actionName = c.req.param("action"); + const actorId = await handler.getActorId(); + return handleAction( + c, + appConfig, + driverConfig, + handlers.onAction, + actionName, + actorId, + ); + }); + + app.post("/connections/message", async (c) => { + if (!handlers.onConnMessage) { + throw new Error("onConnMessage handler is required"); + } + const connId = c.req.header(HEADER_CONN_ID); + const connToken = c.req.header(HEADER_CONN_TOKEN); + const actorId = await handler.getActorId(); + if (!connId || !connToken) { + throw new Error("Missing required parameters"); + } + return handleConnectionMessage( + c, + appConfig, + handlers.onConnMessage, + connId, + connToken, + actorId, + ); + }); + + if (appConfig.inspector.enabled) { + app.route( + "/inspect", + createActorInspectorRouter( + upgradeWebSocket, + handler.onConnectInspector, + appConfig.inspector, + ), + ); + } + + app.notFound(handleRouteNotFound); + app.onError(handleRouteError); + + return app; +} diff --git a/packages/actor/src/topologies/partition/mod.ts b/packages/actor/src/topologies/partition/mod.ts index 588ad2de3..ca533cc9a 100644 --- a/packages/actor/src/topologies/partition/mod.ts +++ b/packages/actor/src/topologies/partition/mod.ts @@ -1 +1 @@ -export { PartitionTopologyManager, PartitionTopologyActor } from "./toplogy"; +export { PartitionTopologyManager, PartitionTopologyActor } from "./topology"; diff --git a/packages/actor/src/topologies/partition/toplogy.ts b/packages/actor/src/topologies/partition/topology.ts similarity index 86% rename from packages/actor/src/topologies/partition/toplogy.ts rename to packages/actor/src/topologies/partition/topology.ts index 78cca6622..9c5427827 100644 --- a/packages/actor/src/topologies/partition/toplogy.ts +++ b/packages/actor/src/topologies/partition/topology.ts @@ -1,5 +1,5 @@ -import { Hono } from "hono"; -import { createActorRouter } from "@/actor/router"; +import { Hono, HonoRequest } from "hono"; +import { createActorRouter } from "@/topologies/partition/actor-router"; import type { AnyActorInstance } from "@/actor/instance"; import * as errors from "@/actor/errors"; import { @@ -24,11 +24,7 @@ import type { ActorKey } from "@/common/utils"; import type { DriverConfig } from "@/driver-helpers/config"; import type { AppConfig } from "@/app/config"; import type { ActorInspectorConnection } from "@/inspector/actor"; -import { - createManagerRouter, - OnProxyWebSocket, - type OnProxyRequest, -} from "@/manager/router"; +import { createManagerRouter } from "@/manager/router"; import type { ManagerInspectorConnection } from "@/inspector/manager"; import type { ConnectWebSocketOpts, @@ -39,22 +35,49 @@ import type { ConnectSseOutput, ActionOutput, } from "@/actor/router-endpoints"; +import { ClientDriver } from "@/client/client"; +import { ToServer } from "@/actor/protocol/message/to-server"; +import { ActorQuery } from "@/manager/protocol/query"; +import { Encoding } from "@/mod"; +import { EventSource } from "eventsource"; +import { createInlineClientDriver } from "@/app/inline-client-driver"; +import { + ConnRoutingHandler, + ConnRoutingHandlerCustom, +} from "@/actor/conn-routing-handler"; +import invariant from "invariant"; + +export type SendRequestHandler = ( + actorRequest: Request, + actorId: string, + meta?: unknown, +) => Promise; + +export type OpenWebSocketHandler = ( + path: string, + actorId: string, + meta?: unknown, +) => Promise; export class PartitionTopologyManager { + clientDriver: ClientDriver; router: Hono; constructor( appConfig: AppConfig, driverConfig: DriverConfig, - proxyCustomConfig: { - onProxyRequest: OnProxyRequest; - onProxyWebSocket: OnProxyWebSocket; - }, + customRoutingHandlers: ConnRoutingHandlerCustom, ) { + const routingHandler: ConnRoutingHandler = { + custom: customRoutingHandlers, + }; + + const managerDriver = driverConfig.drivers.manager; + invariant(managerDriver, "missing manager driver"); + this.clientDriver = createInlineClientDriver(managerDriver, routingHandler); + this.router = createManagerRouter(appConfig, driverConfig, { - proxyMode: { - custom: proxyCustomConfig, - }, + routingHandler, onConnectInspector: async () => { const inspector = driverConfig.drivers?.manager?.inspector; if (!inspector) throw new errors.Unsupported("inspector"); @@ -127,7 +150,7 @@ export class PartitionTopologyActor { const connId = generateConnId(); const connToken = generateConnToken(); - const connState = await actor.prepareConn(opts.params, opts.req.raw); + const connState = await actor.prepareConn(opts.params, opts.req?.raw); let conn: AnyConn | undefined; return { @@ -177,7 +200,7 @@ export class PartitionTopologyActor { const connId = generateConnId(); const connToken = generateConnToken(); - const connState = await actor.prepareConn(opts.params, opts.req.raw); + const connState = await actor.prepareConn(opts.params, opts.req?.raw); let conn: AnyConn | undefined; return { @@ -217,7 +240,7 @@ export class PartitionTopologyActor { // Create conn const connState = await actor.prepareConn( opts.params, - opts.req.raw, + opts.req?.raw, ); conn = await actor.createConn( generateConnId(), diff --git a/packages/actor/src/topologies/standalone/topology.ts b/packages/actor/src/topologies/standalone/topology.ts index dc6e824df..6b7308176 100644 --- a/packages/actor/src/topologies/standalone/topology.ts +++ b/packages/actor/src/topologies/standalone/topology.ts @@ -32,6 +32,10 @@ import type { ActionOutput, ConnectionHandlers, } from "@/actor/router-endpoints"; +import { createInlineClientDriver } from "@/app/inline-client-driver"; +import invariant from "invariant"; +import { ClientDriver } from "@/client/client"; +import { ConnRoutingHandler } from "@/actor/conn-routing-handler"; class ActorHandler { /** Will be undefined if not yet loaded. */ @@ -48,10 +52,8 @@ class ActorHandler { * Manages actors in a single instance without distributed coordination. */ export class StandaloneTopology { - /** - * The router instance. - */ - readonly router: Hono; + clientDriver: ClientDriver; + router: Hono; #appConfig: AppConfig; #driverConfig: DriverConfig; @@ -139,7 +141,7 @@ export class StandaloneTopology { const connId = generateConnId(); const connToken = generateConnToken(); - const connState = await actor.prepareConn(opts.params, opts.req.raw); + const connState = await actor.prepareConn(opts.params, opts.req?.raw); let conn: AnyConn | undefined; return { @@ -181,7 +183,7 @@ export class StandaloneTopology { const connId = generateConnId(); const connToken = generateConnToken(); - const connState = await actor.prepareConn(opts.params, opts.req.raw); + const connState = await actor.prepareConn(opts.params, opts.req?.raw); let conn: AnyConn | undefined; return { @@ -214,7 +216,7 @@ export class StandaloneTopology { const { actor } = await this.#getActor(opts.actorId); // Create conn - const connState = await actor.prepareConn(opts.params, opts.req.raw); + const connState = await actor.prepareConn(opts.params, opts.req?.raw); conn = await actor.createConn( generateConnId(), generateConnToken(), @@ -259,13 +261,18 @@ export class StandaloneTopology { }, }; + const routingHandler: ConnRoutingHandler = { + inline: { handlers: sharedConnectionHandlers }, + }; + + // Build client driver + const managerDriver = this.#driverConfig.drivers.manager; + invariant(managerDriver, "missing manager driver"); + this.clientDriver = createInlineClientDriver(managerDriver, routingHandler); + // Build manager router const managerRouter = createManagerRouter(appConfig, driverConfig, { - proxyMode: { - inline: { - handlers: sharedConnectionHandlers, - }, - }, + routingHandler, onConnectInspector: async () => { const inspector = driverConfig.drivers?.manager?.inspector; if (!inspector) throw new errors.Unsupported("inspector");