Skip to content
This repository was archived by the owner on Oct 22, 2025. It is now read-only.

Commit 12d05fe

Browse files
committed
chore: refactor client to support ClientDriver
1 parent 1a1a340 commit 12d05fe

File tree

8 files changed

+372
-256
lines changed

8 files changed

+372
-256
lines changed

packages/actor/src/client/actor-common.ts

Lines changed: 0 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -35,46 +35,3 @@ export type ActorDefinitionActions<AD extends AnyActorDefinition> =
3535
}
3636
: never;
3737

38-
/**
39-
* Resolves an actor ID from a query by making a request to the /actors/resolve endpoint
40-
*
41-
* @param {string} endpoint - The manager endpoint URL
42-
* @param {ActorQuery} actorQuery - The query to resolve
43-
* @param {Encoding} encodingKind - The encoding to use (json or cbor)
44-
* @returns {Promise<string>} - A promise that resolves to the actor's ID
45-
*/
46-
export async function resolveActorId(
47-
endpoint: string,
48-
actorQuery: ActorQuery,
49-
encodingKind: Encoding,
50-
): Promise<string> {
51-
logger().debug("resolving actor ID", { query: actorQuery });
52-
53-
try {
54-
const result = await sendHttpRequest<
55-
Record<never, never>,
56-
protoHttpResolve.ResolveResponse
57-
>({
58-
url: `${endpoint}/actors/resolve`,
59-
method: "POST",
60-
headers: {
61-
[HEADER_ENCODING]: encodingKind,
62-
[HEADER_ACTOR_QUERY]: JSON.stringify(actorQuery),
63-
},
64-
body: {},
65-
encoding: encodingKind,
66-
});
67-
68-
logger().debug("resolved actor ID", { actorId: result.i });
69-
return result.i;
70-
} catch (error) {
71-
logger().error("failed to resolve actor ID", { error });
72-
if (error instanceof errors.ActorError) {
73-
throw error;
74-
} else {
75-
throw new errors.InternalError(
76-
`Failed to resolve actor ID: ${String(error)}`,
77-
);
78-
}
79-
}
80-
}

packages/actor/src/client/actor-conn.ts

Lines changed: 65 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,15 @@ import { importWebSocket } from "@/common/websocket";
1111
import type { ActorQuery } from "@/manager/protocol/query";
1212
import * as cbor from "cbor-x";
1313
import pRetry from "p-retry";
14-
import { ACTOR_CONNS_SYMBOL, type ClientRaw, TRANSPORT_SYMBOL } from "./client";
14+
import {
15+
ACTOR_CONNS_SYMBOL,
16+
ClientDriver,
17+
type ClientRaw,
18+
TRANSPORT_SYMBOL,
19+
} from "./client";
1520
import * as errors from "./errors";
1621
import { logger } from "./log";
17-
import { type WebSocketMessage as ConnMessage, messageLength } from "./utils";
22+
import { type WebSocketMessage as ConnMessage, messageLength, serializeWithEncoding } from "./utils";
1823
import {
1924
HEADER_ACTOR_ID,
2025
HEADER_ACTOR_QUERY,
@@ -51,19 +56,14 @@ export type EventUnsubscribe = () => void;
5156
*/
5257
export type ActorErrorCallback = (error: errors.ActorError) => void;
5358

54-
interface SendOpts {
59+
export interface SendHttpMessageOpts {
5560
ephemeral: boolean;
5661
}
5762

5863
export type ConnTransport = { websocket: WebSocket } | { sse: EventSource };
5964

6065
export const CONNECT_SYMBOL = Symbol("connect");
6166

62-
interface DynamicImports {
63-
WebSocket: typeof WebSocket;
64-
EventSource: typeof EventSource;
65-
}
66-
6767
/**
6868
* Provides underlying functions for {@link ActorConn}. See {@link ActorConn} for using type-safe remote procedure calls.
6969
*
@@ -102,46 +102,38 @@ export class ActorConnRaw {
102102
*/
103103
#keepNodeAliveInterval: NodeJS.Timeout;
104104

105-
/** Promise used to indicate the required properties for using this class have loaded. Currently just #dynamicImports */
106-
#onConstructedPromise: Promise<void>;
107-
108105
/** Promise used to indicate the socket has connected successfully. This will be rejected if the connection fails. */
109106
#onOpenPromise?: PromiseWithResolvers<undefined>;
110107

111-
// TODO: ws message queue
108+
#client: ClientRaw;
109+
#driver: ClientDriver;
110+
#params: unknown;
111+
#encodingKind: Encoding;
112+
#actorQuery: ActorQuery;
112113

113-
// External imports
114-
#dynamicImports!: DynamicImports;
114+
// TODO: ws message queue
115115

116116
/**
117117
* Do not call this directly.
118118
*
119119
* Creates an instance of ActorConnRaw.
120120
*
121-
* @param {string} endpoint - The endpoint to connect to.
122-
*
123121
* @protected
124122
*/
125123
public constructor(
126-
private readonly client: ClientRaw,
127-
private readonly endpoint: string,
128-
private readonly params: unknown,
129-
private readonly encodingKind: Encoding,
130-
private readonly actorQuery: ActorQuery,
124+
private client: ClientRaw,
125+
private driver: ClientDriver,
126+
private params: unknown,
127+
private encodingKind: Encoding,
128+
private actorQuery: ActorQuery,
131129
) {
132-
this.#keepNodeAliveInterval = setInterval(() => 60_000);
130+
this.#client = client;
131+
this.#driver = driver;
132+
this.#params = params;
133+
this.#encodingKind = encodingKind;
134+
this.#actorQuery = actorQuery;
133135

134-
this.#onConstructedPromise = (async () => {
135-
// Import dynamic dependencies
136-
const [WebSocket, EventSource] = await Promise.all([
137-
importWebSocket(),
138-
importEventSource(),
139-
]);
140-
this.#dynamicImports = {
141-
WebSocket,
142-
EventSource,
143-
};
144-
})();
136+
this.#keepNodeAliveInterval = setInterval(() => 60_000);
145137
}
146138

147139
/**
@@ -158,8 +150,6 @@ export class ActorConnRaw {
158150
name: string,
159151
...args: Args
160152
): Promise<Response> {
161-
await this.#onConstructedPromise;
162-
163153
logger().debug("action", { name, args });
164154

165155
// If we have an active connection, use the websockactionId
@@ -238,20 +228,18 @@ enc
238228

239229
async #connectAndWait() {
240230
try {
241-
await this.#onConstructedPromise;
242-
243231
// Create promise for open
244232
if (this.#onOpenPromise)
245233
throw new Error("#onOpenPromise already defined");
246234
this.#onOpenPromise = Promise.withResolvers();
247235

248236
// Connect transport
249-
if (this.client[TRANSPORT_SYMBOL] === "websocket") {
250-
this.#connectWebSocket();
251-
} else if (this.client[TRANSPORT_SYMBOL] === "sse") {
252-
this.#connectSse();
237+
if (this.#client[TRANSPORT_SYMBOL] === "websocket") {
238+
await this.#connectWebSocket();
239+
} else if (this.#client[TRANSPORT_SYMBOL] === "sse") {
240+
await this.#connectSse();
253241
} else {
254-
assertUnreachable(this.client[TRANSPORT_SYMBOL]);
242+
assertUnreachable(this.#client[TRANSPORT_SYMBOL]);
255243
}
256244

257245
// Wait for result
@@ -261,35 +249,19 @@ enc
261249
}
262250
}
263251

264-
#connectWebSocket() {
265-
const { WebSocket } = this.#dynamicImports;
266-
267-
const actorQueryStr = encodeURIComponent(JSON.stringify(this.actorQuery));
268-
const endpoint = this.endpoint
269-
.replace(/^http:/, "ws:")
270-
.replace(/^https:/, "wss:");
271-
const url = `${endpoint}/actors/connect/websocket?encoding=${this.encodingKind}&query=${actorQueryStr}`;
272-
273-
logger().debug("connecting to websocket", { url });
274-
const ws = new WebSocket(url);
275-
if (this.encodingKind === "cbor") {
276-
ws.binaryType = "arraybuffer";
277-
} else if (this.encodingKind === "json") {
278-
// HACK: Bun bug prevents changing binary type, so we ignore the error https://github.com/oven-sh/bun/issues/17005
279-
try {
280-
ws.binaryType = "blob";
281-
} catch (error) {}
282-
} else {
283-
assertUnreachable(this.encodingKind);
284-
}
252+
async #connectWebSocket() {
253+
const ws = await this.#driver.connectWebSocket(
254+
this.#actorQuery,
255+
this.#encodingKind,
256+
);
285257
this.#transport = { websocket: ws };
286258
ws.onopen = () => {
287259
logger().debug("websocket open");
288260

289261
// Set init message
290262
this.#sendMessage(
291263
{
292-
b: { i: { p: this.params } },
264+
b: { i: { p: this.#params } },
293265
},
294266
{ ephemeral: true },
295267
);
@@ -307,28 +279,12 @@ enc
307279
};
308280
}
309281

310-
#connectSse() {
311-
const { EventSource } = this.#dynamicImports;
312-
313-
const url = `${this.endpoint}/actors/connect/sse`;
314-
315-
logger().debug("connecting to sse", { url });
316-
const eventSource = new EventSource(url, {
317-
fetch: (input, init) => {
318-
return fetch(input, {
319-
...init,
320-
headers: {
321-
...init?.headers,
322-
"User-Agent": httpUserAgent(),
323-
[HEADER_ENCODING]: this.encodingKind,
324-
[HEADER_ACTOR_QUERY]: JSON.stringify(this.actorQuery),
325-
...(this.params !== undefined
326-
? { [HEADER_CONN_PARAMS]: JSON.stringify(this.params) }
327-
: {}),
328-
},
329-
});
330-
},
331-
});
282+
async #connectSse() {
283+
const eventSource = await this.#driver.connectSse(
284+
this.#actorQuery,
285+
this.#encodingKind,
286+
this.#params,
287+
);
332288
this.#transport = { sse: eventSource };
333289
eventSource.onopen = () => {
334290
logger().debug("eventsource open");
@@ -338,7 +294,7 @@ enc
338294
this.#handleOnMessage(ev);
339295
};
340296
eventSource.onerror = (ev) => {
341-
if (eventSource.readyState === EventSource.CLOSED) {
297+
if (eventSource.readyState === eventSource.CLOSED) {
342298
// This error indicates a close event
343299
this.#handleOnClose(ev);
344300
} else {
@@ -635,7 +591,7 @@ enc
635591
};
636592
}
637593

638-
#sendMessage(message: wsToServer.ToServer, opts?: SendOpts) {
594+
#sendMessage(message: wsToServer.ToServer, opts?: SendHttpMessageOpts) {
639595
if (this.#disposed) {
640596
throw new errors.ActorConnDisposed();
641597
}
@@ -645,10 +601,12 @@ enc
645601
// No transport connected yet
646602
queueMessage = true;
647603
} else if ("websocket" in this.#transport) {
648-
const { WebSocket } = this.#dynamicImports;
649-
if (this.#transport.websocket.readyState === WebSocket.OPEN) {
604+
if (this.#transport.websocket.readyState === 1) {
650605
try {
651-
const messageSerialized = this.#serialize(message);
606+
const messageSerialized = serializeWithEncoding(
607+
this.#encodingKind,
608+
message,
609+
);
652610
this.#transport.websocket.send(messageSerialized);
653611
logger().debug("sent websocket message", {
654612
message: message,
@@ -666,9 +624,7 @@ enc
666624
queueMessage = true;
667625
}
668626
} else if ("sse" in this.#transport) {
669-
const { EventSource } = this.#dynamicImports;
670-
671-
if (this.#transport.sse.readyState === EventSource.OPEN) {
627+
if (this.#transport.sse.readyState === 1) {
672628
// Spawn in background since #sendMessage cannot be async
673629
this.#sendHttpMessage(message, opts);
674630
} else {
@@ -684,25 +640,21 @@ enc
684640
}
685641
}
686642

687-
async #sendHttpMessage(message: wsToServer.ToServer, opts?: SendOpts) {
643+
async #sendHttpMessage(
644+
message: wsToServer.ToServer,
645+
opts?: SendHttpMessageOpts,
646+
) {
688647
try {
689648
if (!this.#actorId || !this.#connectionId || !this.#connectionToken)
690649
throw new errors.InternalError("Missing connection ID or token.");
691650

692-
// TODO: Implement ordered messages, this is not guaranteed order. Needs to use an index in order to ensure we can pipeline requests efficiently.
693-
// TODO: Validate that we're using HTTP/3 whenever possible for pipelining requests
694-
const messageSerialized = this.#serialize(message);
695-
const res = await fetch(`${this.endpoint}/actors/message`, {
696-
method: "POST",
697-
headers: {
698-
"User-Agent": httpUserAgent(),
699-
[HEADER_ENCODING]: this.encodingKind,
700-
[HEADER_ACTOR_ID]: this.#actorId,
701-
[HEADER_CONN_ID]: this.#connectionId,
702-
[HEADER_CONN_TOKEN]: this.#connectionToken,
703-
},
704-
body: messageSerialized,
705-
});
651+
const res = await this.#driver.sendHttpMessage(
652+
this.#actorId,
653+
this.#encodingKind,
654+
this.#connectionId,
655+
this.#connectionToken,
656+
message,
657+
);
706658

707659
if (!res.ok) {
708660
throw new errors.InternalError(
@@ -729,12 +681,12 @@ enc
729681
}
730682

731683
async #parse(data: ConnMessage): Promise<unknown> {
732-
if (this.encodingKind === "json") {
684+
if (this.#encodingKind === "json") {
733685
if (typeof data !== "string") {
734686
throw new Error("received non-string for json parse");
735687
}
736688
return JSON.parse(data);
737-
} else if (this.encodingKind === "cbor") {
689+
} else if (this.#encodingKind === "cbor") {
738690
if (!this.#transport) {
739691
// Do thing
740692
throw new Error("Cannot parse message when no transport defined");
@@ -769,17 +721,7 @@ enc
769721
);
770722
}
771723
} else {
772-
assertUnreachable(this.encodingKind);
773-
}
774-
}
775-
776-
#serialize(value: unknown): ConnMessage {
777-
if (this.encodingKind === "json") {
778-
return JSON.stringify(value);
779-
} else if (this.encodingKind === "cbor") {
780-
return cbor.encode(value);
781-
} else {
782-
assertUnreachable(this.encodingKind);
724+
assertUnreachable(this.#encodingKind);
783725
}
784726
}
785727

@@ -789,8 +731,6 @@ enc
789731
* @returns {Promise<void>} A promise that resolves when the socket is gracefully closed.
790732
*/
791733
async dispose(): Promise<void> {
792-
await this.#onConstructedPromise;
793-
794734
// Internally, this "disposes" the connection
795735

796736
if (this.#disposed) {
@@ -808,7 +748,7 @@ enc
808748
this.#abortController.abort();
809749

810750
// Remove from registry
811-
this.client[ACTOR_CONNS_SYMBOL].delete(this);
751+
this.#client[ACTOR_CONNS_SYMBOL].delete(this);
812752

813753
// Disconnect transport cleanly
814754
if (!this.#transport) {

0 commit comments

Comments
 (0)