Skip to content
This repository was archived by the owner on Oct 22, 2025. It is now read-only.

Commit 7e56428

Browse files
committed
feat: add inline client
1 parent 12d05fe commit 7e56428

File tree

22 files changed

+688
-154
lines changed

22 files changed

+688
-154
lines changed

packages/actor/scripts/dump-openapi.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ function main() {
4141
};
4242

4343
const managerRouter = createManagerRouter(appConfig, driverConfig, {
44-
proxyMode: {
44+
routingHandler: {
4545
inline: {
4646
handlers: sharedConnectionHandlers,
4747
},
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import type { ConnectionHandlers as ConnHandlers } from "./router-endpoints";
2+
import type { Context as HonoContext, HonoRequest } from "hono";
3+
4+
/**
5+
* Deterines how requests to actors should be routed.
6+
*
7+
* Inline handlers calls the connection handlers directly.
8+
*
9+
* Custom will let a custom function handle the request. This usually will proxy the request to another location.
10+
*/
11+
export type ConnRoutingHandler =
12+
| {
13+
inline: {
14+
handlers: ConnHandlers;
15+
};
16+
}
17+
| {
18+
custom: ConnRoutingHandlerCustom;
19+
};
20+
21+
export interface ConnRoutingHandlerCustom {
22+
sendRequest: SendRequestHandler;
23+
openWebSocket: OpenWebSocketHandler;
24+
proxyRequest: ProxyRequestHandler;
25+
proxyWebSocket: ProxyWebSocketHandler;
26+
}
27+
28+
export type BuildProxyEndpoint = (c: HonoContext, actorId: string) => string;
29+
30+
export type SendRequestHandler = (
31+
actorId: string,
32+
meta: unknown | undefined,
33+
actorRequest: Request,
34+
) => Promise<Response>;
35+
36+
export type OpenWebSocketHandler = (
37+
actorId: string,
38+
meta?: unknown,
39+
) => Promise<WebSocket>;
40+
41+
export type ProxyRequestHandler = (
42+
c: HonoContext,
43+
actorRequest: Request,
44+
actorId: string,
45+
meta?: unknown,
46+
) => Promise<Response>;
47+
48+
export type ProxyWebSocketHandler = (
49+
c: HonoContext,
50+
path: string,
51+
actorId: string,
52+
meta?: unknown,
53+
) => Promise<Response>;

packages/actor/src/actor/router-endpoints.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import type { DriverConfig } from "@/driver-helpers/config";
2121
import invariant from "invariant";
2222

2323
export interface ConnectWebSocketOpts {
24-
req: HonoRequest;
24+
req?: HonoRequest;
2525
encoding: Encoding;
2626
params: unknown;
2727
actorId: string;
@@ -34,7 +34,7 @@ export interface ConnectWebSocketOutput {
3434
}
3535

3636
export interface ConnectSseOpts {
37-
req: HonoRequest;
37+
req?: HonoRequest;
3838
encoding: Encoding;
3939
params: unknown;
4040
actorId: string;
@@ -46,7 +46,7 @@ export interface ConnectSseOutput {
4646
}
4747

4848
export interface ActionOpts {
49-
req: HonoRequest;
49+
req?: HonoRequest;
5050
params: unknown;
5151
actionName: string;
5252
actionArgs: unknown[];
@@ -58,7 +58,7 @@ export interface ActionOutput {
5858
}
5959

6060
export interface ConnsMessageOpts {
61-
req: HonoRequest;
61+
req?: HonoRequest;
6262
connId: string;
6363
connToken: string;
6464
message: messageToServer.ToServer;
Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
import * as errors from "@/actor/errors";
2+
import * as protoHttpAction from "@/actor/protocol/http/action";
3+
import { logger } from "./log";
4+
import type { EventSource } from "eventsource";
5+
import type * as wsToServer from "@/actor/protocol/message/to-server";
6+
import { type Encoding, serialize } from "@/actor/protocol/serde";
7+
import {
8+
HEADER_CONN_PARAMS,
9+
HEADER_ENCODING,
10+
type ConnectionHandlers,
11+
} from "@/actor/router-endpoints";
12+
import { HonoRequest, type Context as HonoContext, type Next } from "hono";
13+
import invariant from "invariant";
14+
import { ClientDriver } from "@/client/client";
15+
import { ManagerDriver } from "@/manager/driver";
16+
import { ActorQuery } from "@/manager/protocol/query";
17+
import { ConnRoutingHandler } from "@/actor/conn-routing-handler";
18+
import { sendHttpRequest, serializeWithEncoding } from "@/client/utils";
19+
import { ActionRequest, ActionResponse } from "@/actor/protocol/http/action";
20+
import { assertUnreachable } from "@/actor/utils";
21+
22+
/**
23+
* Client driver that calls the manager driver inline.
24+
*
25+
* This driver can access private resources.
26+
*
27+
* This driver serves a double purpose as:
28+
* - Providing the client for the internal requests
29+
* - Provide the driver for the manager HTTP router (see manager/router.ts)
30+
*/
31+
export function createInlineClientDriver(
32+
managerDriver: ManagerDriver,
33+
routingHandler: ConnRoutingHandler,
34+
): ClientDriver {
35+
//// Lazily import the dynamic imports so we don't have to turn `createClient` in to an aysnc fn
36+
//const dynamicImports = (async () => {
37+
// // Import dynamic dependencies
38+
// const [WebSocket, EventSource] = await Promise.all([
39+
// importWebSocket(),
40+
// importEventSource(),
41+
// ]);
42+
// return {
43+
// WebSocket,
44+
// EventSource,
45+
// };
46+
//})();
47+
48+
const driver: ClientDriver = {
49+
action: async <Args extends Array<unknown> = unknown[], Response = unknown>(
50+
req: HonoRequest | undefined,
51+
actorQuery: ActorQuery,
52+
encoding: Encoding,
53+
params: unknown,
54+
actionName: string,
55+
...args: Args
56+
): Promise<Response> => {
57+
// Get the actor ID and meta
58+
const { actorId, meta } = await queryActor(
59+
req,
60+
actorQuery,
61+
managerDriver,
62+
);
63+
logger().debug("found actor for action", { actorId, meta });
64+
invariant(actorId, "Missing actor ID");
65+
66+
// Invoke the action
67+
logger().debug("handling action", { actionName, encoding });
68+
if ("inline" in routingHandler) {
69+
const { output } = await routingHandler.inline.handlers.onAction({
70+
req,
71+
params,
72+
actionName,
73+
actionArgs: args,
74+
actorId,
75+
});
76+
return output as Response;
77+
} else if ("custom" in routingHandler) {
78+
const responseData = await sendHttpRequest<
79+
ActionRequest,
80+
ActionResponse
81+
>({
82+
url: `http://actor/action/${encodeURIComponent(actionName)}`,
83+
method: "POST",
84+
headers: {
85+
[HEADER_ENCODING]: encoding,
86+
...(params !== undefined
87+
? { [HEADER_CONN_PARAMS]: JSON.stringify(params) }
88+
: {}),
89+
},
90+
body: { a: args } satisfies ActionRequest,
91+
encoding: encoding,
92+
customFetch: routingHandler.custom.sendRequest.bind(
93+
undefined,
94+
actorId,
95+
meta,
96+
),
97+
});
98+
99+
return responseData.o as Response;
100+
} else {
101+
assertUnreachable(routingHandler);
102+
}
103+
},
104+
105+
resolveActorId: async (
106+
req: HonoRequest | undefined,
107+
actorQuery: ActorQuery,
108+
_encodingKind: Encoding,
109+
): Promise<string> => {
110+
// Get the actor ID and meta
111+
const { actorId } = await queryActor(req, actorQuery, managerDriver);
112+
logger().debug("resolved actor", { actorId });
113+
invariant(actorId, "missing actor ID");
114+
115+
return actorId;
116+
},
117+
118+
connectWebSocket: async (
119+
req: HonoRequest | undefined,
120+
actorQuery: ActorQuery,
121+
encodingKind: Encoding,
122+
): Promise<WebSocket> => {
123+
throw "UNIMPLEMENTED";
124+
},
125+
126+
connectSse: async (
127+
req: HonoRequest | undefined,
128+
actorQuery: ActorQuery,
129+
encodingKind: Encoding,
130+
params: unknown,
131+
): Promise<EventSource> => {
132+
throw "UNIMPLEMENTED";
133+
},
134+
135+
sendHttpMessage: async (
136+
req: HonoRequest | undefined,
137+
actorId: string,
138+
encoding: Encoding,
139+
connectionId: string,
140+
connectionToken: string,
141+
message: wsToServer.ToServer,
142+
): Promise<Response> => {
143+
throw "UNIMPLEMENTED";
144+
},
145+
};
146+
147+
return driver;
148+
}
149+
150+
/**
151+
* Query the manager driver to get or create an actor based on the provided query
152+
*/
153+
export async function queryActor(
154+
req: HonoRequest | undefined,
155+
query: ActorQuery,
156+
driver: ManagerDriver,
157+
): Promise<{ actorId: string; meta?: unknown }> {
158+
logger().debug("querying actor", { query });
159+
let actorOutput: { actorId: string; meta?: unknown };
160+
if ("getForId" in query) {
161+
const output = await driver.getForId({
162+
req,
163+
actorId: query.getForId.actorId,
164+
});
165+
if (!output) throw new errors.ActorNotFound(query.getForId.actorId);
166+
actorOutput = output;
167+
} else if ("getForKey" in query) {
168+
const existingActor = await driver.getWithKey({
169+
req,
170+
name: query.getForKey.name,
171+
key: query.getForKey.key,
172+
});
173+
if (!existingActor) {
174+
throw new errors.ActorNotFound(
175+
`${query.getForKey.name}:${JSON.stringify(query.getForKey.key)}`,
176+
);
177+
}
178+
actorOutput = existingActor;
179+
} else if ("getOrCreateForKey" in query) {
180+
const getOrCreateOutput = await driver.getOrCreateWithKey({
181+
req,
182+
name: query.getOrCreateForKey.name,
183+
key: query.getOrCreateForKey.key,
184+
input: query.getOrCreateForKey.input,
185+
region: query.getOrCreateForKey.region,
186+
});
187+
actorOutput = {
188+
actorId: getOrCreateOutput.actorId,
189+
meta: getOrCreateOutput.meta,
190+
};
191+
} else if ("create" in query) {
192+
const createOutput = await driver.createActor({
193+
req,
194+
name: query.create.name,
195+
key: query.create.key,
196+
input: query.create.input,
197+
region: query.create.region,
198+
});
199+
actorOutput = {
200+
actorId: createOutput.actorId,
201+
meta: createOutput.meta,
202+
};
203+
} else {
204+
throw new errors.InvalidRequest("Invalid query format");
205+
}
206+
207+
logger().debug("actor query result", {
208+
actorId: actorOutput.actorId,
209+
meta: actorOutput.meta,
210+
});
211+
return { actorId: actorOutput.actorId, meta: actorOutput.meta };
212+
}

packages/actor/src/app/log.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
import { getLogger } from "@/common//log";
2+
3+
export const LOGGER_NAME = "actor-app";
4+
5+
export function logger() {
6+
return getLogger(LOGGER_NAME);
7+
}

packages/actor/src/client/actor-conn.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,7 @@ enc
251251

252252
async #connectWebSocket() {
253253
const ws = await this.#driver.connectWebSocket(
254+
undefined,
254255
this.#actorQuery,
255256
this.#encodingKind,
256257
);
@@ -281,6 +282,7 @@ enc
281282

282283
async #connectSse() {
283284
const eventSource = await this.#driver.connectSse(
285+
undefined,
284286
this.#actorQuery,
285287
this.#encodingKind,
286288
this.#params,
@@ -649,6 +651,7 @@ enc
649651
throw new errors.InternalError("Missing connection ID or token.");
650652

651653
const res = await this.#driver.sendHttpMessage(
654+
undefined,
652655
this.#actorId,
653656
this.#encodingKind,
654657
this.#connectionId,

packages/actor/src/client/actor-handle.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ export class ActorHandleRaw {
6161
...args: Args
6262
): Promise<Response> {
6363
return await this.#driver.action<Args, Response>(
64+
undefined,
6465
this.#actorQuery,
6566
this.#encodingKind,
6667
this.#params,
@@ -105,6 +106,7 @@ export class ActorHandleRaw {
105106
) {
106107
// TODO:
107108
const actorId = await this.#driver.resolveActorId(
109+
undefined,
108110
this.#actorQuery,
109111
this.#encodingKind,
110112
);

0 commit comments

Comments
 (0)