Skip to content
This repository was archived by the owner on Oct 22, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 0 additions & 43 deletions packages/actor/src/client/actor-common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,46 +35,3 @@ export type ActorDefinitionActions<AD extends AnyActorDefinition> =
}
: never;

/**
* Resolves an actor ID from a query by making a request to the /actors/resolve endpoint
*
* @param {string} endpoint - The manager endpoint URL
* @param {ActorQuery} actorQuery - The query to resolve
* @param {Encoding} encodingKind - The encoding to use (json or cbor)
* @returns {Promise<string>} - A promise that resolves to the actor's ID
*/
export async function resolveActorId(
endpoint: string,
actorQuery: ActorQuery,
encodingKind: Encoding,
): Promise<string> {
logger().debug("resolving actor ID", { query: actorQuery });

try {
const result = await sendHttpRequest<
Record<never, never>,
protoHttpResolve.ResolveResponse
>({
url: `${endpoint}/actors/resolve`,
method: "POST",
headers: {
[HEADER_ENCODING]: encodingKind,
[HEADER_ACTOR_QUERY]: JSON.stringify(actorQuery),
},
body: {},
encoding: encodingKind,
});

logger().debug("resolved actor ID", { actorId: result.i });
return result.i;
} catch (error) {
logger().error("failed to resolve actor ID", { error });
if (error instanceof errors.ActorError) {
throw error;
} else {
throw new errors.InternalError(
`Failed to resolve actor ID: ${String(error)}`,
);
}
}
}
190 changes: 65 additions & 125 deletions packages/actor/src/client/actor-conn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,15 @@ import { importWebSocket } from "@/common/websocket";
import type { ActorQuery } from "@/manager/protocol/query";
import * as cbor from "cbor-x";
import pRetry from "p-retry";
import { ACTOR_CONNS_SYMBOL, type ClientRaw, TRANSPORT_SYMBOL } from "./client";
import {
ACTOR_CONNS_SYMBOL,
ClientDriver,
type ClientRaw,
TRANSPORT_SYMBOL,
} from "./client";
import * as errors from "./errors";
import { logger } from "./log";
import { type WebSocketMessage as ConnMessage, messageLength } from "./utils";
import { type WebSocketMessage as ConnMessage, messageLength, serializeWithEncoding } from "./utils";
import {
HEADER_ACTOR_ID,
HEADER_ACTOR_QUERY,
Expand Down Expand Up @@ -51,19 +56,14 @@ export type EventUnsubscribe = () => void;
*/
export type ActorErrorCallback = (error: errors.ActorError) => void;

interface SendOpts {
export interface SendHttpMessageOpts {
ephemeral: boolean;
}

export type ConnTransport = { websocket: WebSocket } | { sse: EventSource };

export const CONNECT_SYMBOL = Symbol("connect");

interface DynamicImports {
WebSocket: typeof WebSocket;
EventSource: typeof EventSource;
}

/**
* Provides underlying functions for {@link ActorConn}. See {@link ActorConn} for using type-safe remote procedure calls.
*
Expand Down Expand Up @@ -102,46 +102,38 @@ export class ActorConnRaw {
*/
#keepNodeAliveInterval: NodeJS.Timeout;

/** Promise used to indicate the required properties for using this class have loaded. Currently just #dynamicImports */
#onConstructedPromise: Promise<void>;

/** Promise used to indicate the socket has connected successfully. This will be rejected if the connection fails. */
#onOpenPromise?: PromiseWithResolvers<undefined>;

// TODO: ws message queue
#client: ClientRaw;
#driver: ClientDriver;
#params: unknown;
#encodingKind: Encoding;
#actorQuery: ActorQuery;

// External imports
#dynamicImports!: DynamicImports;
// TODO: ws message queue

/**
* Do not call this directly.
*
* Creates an instance of ActorConnRaw.
*
* @param {string} endpoint - The endpoint to connect to.
*
* @protected
*/
public constructor(
private readonly client: ClientRaw,
private readonly endpoint: string,
private readonly params: unknown,
private readonly encodingKind: Encoding,
private readonly actorQuery: ActorQuery,
private client: ClientRaw,
private driver: ClientDriver,
private params: unknown,
private encodingKind: Encoding,
private actorQuery: ActorQuery,
) {
this.#keepNodeAliveInterval = setInterval(() => 60_000);
this.#client = client;
this.#driver = driver;
this.#params = params;
this.#encodingKind = encodingKind;
this.#actorQuery = actorQuery;

this.#onConstructedPromise = (async () => {
// Import dynamic dependencies
const [WebSocket, EventSource] = await Promise.all([
importWebSocket(),
importEventSource(),
]);
this.#dynamicImports = {
WebSocket,
EventSource,
};
})();
this.#keepNodeAliveInterval = setInterval(() => 60_000);
}

/**
Expand All @@ -158,8 +150,6 @@ export class ActorConnRaw {
name: string,
...args: Args
): Promise<Response> {
await this.#onConstructedPromise;

logger().debug("action", { name, args });

// If we have an active connection, use the websockactionId
Expand Down Expand Up @@ -238,20 +228,18 @@ enc

async #connectAndWait() {
try {
await this.#onConstructedPromise;

// Create promise for open
if (this.#onOpenPromise)
throw new Error("#onOpenPromise already defined");
this.#onOpenPromise = Promise.withResolvers();

// Connect transport
if (this.client[TRANSPORT_SYMBOL] === "websocket") {
this.#connectWebSocket();
} else if (this.client[TRANSPORT_SYMBOL] === "sse") {
this.#connectSse();
if (this.#client[TRANSPORT_SYMBOL] === "websocket") {
await this.#connectWebSocket();
} else if (this.#client[TRANSPORT_SYMBOL] === "sse") {
await this.#connectSse();
} else {
assertUnreachable(this.client[TRANSPORT_SYMBOL]);
assertUnreachable(this.#client[TRANSPORT_SYMBOL]);
}

// Wait for result
Expand All @@ -261,35 +249,19 @@ enc
}
}

#connectWebSocket() {
const { WebSocket } = this.#dynamicImports;

const actorQueryStr = encodeURIComponent(JSON.stringify(this.actorQuery));
const endpoint = this.endpoint
.replace(/^http:/, "ws:")
.replace(/^https:/, "wss:");
const url = `${endpoint}/actors/connect/websocket?encoding=${this.encodingKind}&query=${actorQueryStr}`;

logger().debug("connecting to websocket", { url });
const ws = new WebSocket(url);
if (this.encodingKind === "cbor") {
ws.binaryType = "arraybuffer";
} else if (this.encodingKind === "json") {
// HACK: Bun bug prevents changing binary type, so we ignore the error https://github.com/oven-sh/bun/issues/17005
try {
ws.binaryType = "blob";
} catch (error) {}
} else {
assertUnreachable(this.encodingKind);
}
async #connectWebSocket() {
const ws = await this.#driver.connectWebSocket(
this.#actorQuery,
this.#encodingKind,
);
this.#transport = { websocket: ws };
ws.onopen = () => {
logger().debug("websocket open");

// Set init message
this.#sendMessage(
{
b: { i: { p: this.params } },
b: { i: { p: this.#params } },
},
{ ephemeral: true },
);
Expand All @@ -307,28 +279,12 @@ enc
};
}

#connectSse() {
const { EventSource } = this.#dynamicImports;

const url = `${this.endpoint}/actors/connect/sse`;

logger().debug("connecting to sse", { url });
const eventSource = new EventSource(url, {
fetch: (input, init) => {
return fetch(input, {
...init,
headers: {
...init?.headers,
"User-Agent": httpUserAgent(),
[HEADER_ENCODING]: this.encodingKind,
[HEADER_ACTOR_QUERY]: JSON.stringify(this.actorQuery),
...(this.params !== undefined
? { [HEADER_CONN_PARAMS]: JSON.stringify(this.params) }
: {}),
},
});
},
});
async #connectSse() {
const eventSource = await this.#driver.connectSse(
this.#actorQuery,
this.#encodingKind,
this.#params,
);
this.#transport = { sse: eventSource };
eventSource.onopen = () => {
logger().debug("eventsource open");
Expand All @@ -338,7 +294,7 @@ enc
this.#handleOnMessage(ev);
};
eventSource.onerror = (ev) => {
if (eventSource.readyState === EventSource.CLOSED) {
if (eventSource.readyState === eventSource.CLOSED) {
// This error indicates a close event
this.#handleOnClose(ev);
} else {
Expand Down Expand Up @@ -635,7 +591,7 @@ enc
};
}

#sendMessage(message: wsToServer.ToServer, opts?: SendOpts) {
#sendMessage(message: wsToServer.ToServer, opts?: SendHttpMessageOpts) {
if (this.#disposed) {
throw new errors.ActorConnDisposed();
}
Expand All @@ -645,10 +601,12 @@ enc
// No transport connected yet
queueMessage = true;
} else if ("websocket" in this.#transport) {
const { WebSocket } = this.#dynamicImports;
if (this.#transport.websocket.readyState === WebSocket.OPEN) {
if (this.#transport.websocket.readyState === 1) {
try {
const messageSerialized = this.#serialize(message);
const messageSerialized = serializeWithEncoding(
this.#encodingKind,
message,
);
this.#transport.websocket.send(messageSerialized);
logger().debug("sent websocket message", {
message: message,
Expand All @@ -666,9 +624,7 @@ enc
queueMessage = true;
}
} else if ("sse" in this.#transport) {
const { EventSource } = this.#dynamicImports;

if (this.#transport.sse.readyState === EventSource.OPEN) {
if (this.#transport.sse.readyState === 1) {
// Spawn in background since #sendMessage cannot be async
this.#sendHttpMessage(message, opts);
} else {
Expand All @@ -684,25 +640,21 @@ enc
}
}

async #sendHttpMessage(message: wsToServer.ToServer, opts?: SendOpts) {
async #sendHttpMessage(
message: wsToServer.ToServer,
opts?: SendHttpMessageOpts,
) {
try {
if (!this.#actorId || !this.#connectionId || !this.#connectionToken)
throw new errors.InternalError("Missing connection ID or token.");

// TODO: Implement ordered messages, this is not guaranteed order. Needs to use an index in order to ensure we can pipeline requests efficiently.
// TODO: Validate that we're using HTTP/3 whenever possible for pipelining requests
const messageSerialized = this.#serialize(message);
const res = await fetch(`${this.endpoint}/actors/message`, {
method: "POST",
headers: {
"User-Agent": httpUserAgent(),
[HEADER_ENCODING]: this.encodingKind,
[HEADER_ACTOR_ID]: this.#actorId,
[HEADER_CONN_ID]: this.#connectionId,
[HEADER_CONN_TOKEN]: this.#connectionToken,
},
body: messageSerialized,
});
const res = await this.#driver.sendHttpMessage(
this.#actorId,
this.#encodingKind,
this.#connectionId,
this.#connectionToken,
message,
);

if (!res.ok) {
throw new errors.InternalError(
Expand All @@ -729,12 +681,12 @@ enc
}

async #parse(data: ConnMessage): Promise<unknown> {
if (this.encodingKind === "json") {
if (this.#encodingKind === "json") {
if (typeof data !== "string") {
throw new Error("received non-string for json parse");
}
return JSON.parse(data);
} else if (this.encodingKind === "cbor") {
} else if (this.#encodingKind === "cbor") {
if (!this.#transport) {
// Do thing
throw new Error("Cannot parse message when no transport defined");
Expand Down Expand Up @@ -769,17 +721,7 @@ enc
);
}
} else {
assertUnreachable(this.encodingKind);
}
}

#serialize(value: unknown): ConnMessage {
if (this.encodingKind === "json") {
return JSON.stringify(value);
} else if (this.encodingKind === "cbor") {
return cbor.encode(value);
} else {
assertUnreachable(this.encodingKind);
assertUnreachable(this.#encodingKind);
}
}

Expand All @@ -789,8 +731,6 @@ enc
* @returns {Promise<void>} A promise that resolves when the socket is gracefully closed.
*/
async dispose(): Promise<void> {
await this.#onConstructedPromise;

// Internally, this "disposes" the connection

if (this.#disposed) {
Expand All @@ -808,7 +748,7 @@ enc
this.#abortController.abort();

// Remove from registry
this.client[ACTOR_CONNS_SYMBOL].delete(this);
this.#client[ACTOR_CONNS_SYMBOL].delete(this);

// Disconnect transport cleanly
if (!this.#transport) {
Expand Down
Loading
Loading