Skip to content
This repository was archived by the owner on Oct 22, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions examples/chat-room-python/src/workers/registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ export const chatRoom = worker({
// Create and export the app
export const registry = setup({
workers: { chatRoom },
cors: {
origin: "*", // Allow all origins
allowMethods: ["GET", "POST", "OPTIONS"], // Allow specific methods
allowHeaders: ["Content-Type", "Authorization", "User-Agent"], // Allow specific headers
},
});

// Export type for client type checking
Expand Down
8 changes: 5 additions & 3 deletions examples/chat-room/scripts/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ async function main() {

// connect to chat room - now accessed via property
// can still pass parameters like room
const chatRoom = client.chatRoom.getOrCreate(room, {
params: { room },
}).connect();
const chatRoom = client.chatRoom
.getOrCreate(room, {
params: { room },
})
.connect();

// fetch history
const history = await chatRoom.getHistory();
Expand Down
14 changes: 11 additions & 3 deletions packages/core/src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ export interface ClientOptions {
export interface QueryOptions {
/** Parameters to pass to the connection. */
params?: unknown;
/** Signal to abort the request. */
signal?: AbortSignal;
}

/**
Expand Down Expand Up @@ -160,25 +162,29 @@ export interface ClientDriver {
encoding: Encoding,
params: unknown,
name: string,
...args: Args
args: Args,
opts: { signal?: AbortSignal } | undefined,
): Promise<Response>;
resolveWorkerId(
c: HonoContext | undefined,
workerQuery: WorkerQuery,
encodingKind: Encoding,
params: unknown,
opts: { signal?: AbortSignal } | undefined,
): Promise<string>;
connectWebSocket(
c: HonoContext | undefined,
workerQuery: WorkerQuery,
encodingKind: Encoding,
params: unknown,
opts: { signal?: AbortSignal } | undefined,
): Promise<WebSocket>;
connectSse(
c: HonoContext | undefined,
workerQuery: WorkerQuery,
encodingKind: Encoding,
params: unknown,
opts: { signal?: AbortSignal } | undefined,
): Promise<EventSource>;
sendHttpMessage(
c: HonoContext | undefined,
Expand All @@ -187,6 +193,7 @@ export interface ClientDriver {
connectionId: string,
connectionToken: string,
message: wsToServer.ToServer,
opts: { signal?: AbortSignal } | undefined,
): Promise<Response>;
}

Expand Down Expand Up @@ -360,6 +367,7 @@ export class ClientRaw {
createQuery,
this.#encodingKind,
opts?.params,
opts?.signal ? { signal: opts.signal } : undefined,
);
logger().debug("created worker with ID", {
name,
Expand Down Expand Up @@ -541,7 +549,7 @@ function createWorkerProxy<AD extends AnyWorkerDefinition>(

let method = methodCache.get(prop);
if (!method) {
method = (...args: unknown[]) => target.action(prop, ...args);
method = (...args: unknown[]) => target.action({ name: prop, args });
methodCache.set(prop, method);
}
return method;
Expand Down Expand Up @@ -580,7 +588,7 @@ function createWorkerProxy<AD extends AnyWorkerDefinition>(
configurable: true,
enumerable: false,
writable: false,
value: (...args: unknown[]) => target.action(prop, ...args),
value: (...args: unknown[]) => target.action({ name: prop, args }),
};
}
return undefined;
Expand Down
4 changes: 3 additions & 1 deletion packages/core/src/client/http-client-driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ export function createHttpClientDriver(managerEndpoint: string): ClientDriver {
encoding: Encoding,
params: unknown,
name: string,
...args: Args
args: Args,
opts: { signal?: AbortSignal } | undefined,
): Promise<Response> => {
logger().debug("worker handle action", {
name,
Expand All @@ -73,6 +74,7 @@ export function createHttpClientDriver(managerEndpoint: string): ClientDriver {
},
body: { a: args } satisfies ActionRequest,
encoding: encoding,
signal: opts?.signal,
},
);

Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/client/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export interface HttpRequestOpts<Body> {
body?: Body;
encoding: Encoding;
skipParseResponse?: boolean;
signal?: AbortSignal;
customFetch?: (req: Request) => Promise<Response>;
}

Expand Down Expand Up @@ -75,6 +76,7 @@ export async function sendHttpRequest<
"User-Agent": httpUserAgent(),
},
body: bodyData,
signal: opts.signal,
}),
);
} catch (error) {
Expand Down
29 changes: 18 additions & 11 deletions packages/core/src/client/worker-conn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import {
type WebSocketMessage as ConnMessage,
messageLength,
serializeWithEncoding,
WebSocketMessage,
} from "./utils";
import {
HEADER_WORKER_ID,
Expand Down Expand Up @@ -64,6 +63,7 @@ export type WorkerErrorCallback = (error: errors.WorkerError) => void;

export interface SendHttpMessageOpts {
ephemeral: boolean;
signal?: AbortSignal;
}

export type ConnTransport = { websocket: WebSocket } | { sse: EventSource };
Expand Down Expand Up @@ -152,26 +152,30 @@ export class WorkerConnRaw {
* @param {...Args} args - The arguments to pass to the action function.
* @returns {Promise<Response>} - A promise that resolves to the response of the action function.
*/
async action<Args extends Array<unknown> = unknown[], Response = unknown>(
name: string,
...args: Args
): Promise<Response> {
logger().debug("action", { name, args });
async action<
Args extends Array<unknown> = unknown[],
Response = unknown,
>(opts: {
name: string;
args: Args;
signal?: AbortSignal;
}): Promise<Response> {
logger().debug("action", { name: opts.name, args: opts.args });

// If we have an active connection, use the websockactionId
const actionId = this.#actionIdCounter;
this.#actionIdCounter += 1;

const { promise, resolve, reject } =
Promise.withResolvers<wsToClient.ActionResponse>();
this.#actionsInFlight.set(actionId, { name, resolve, reject });
this.#actionsInFlight.set(actionId, { name: opts.name, resolve, reject });

this.#sendMessage({
b: {
ar: {
i: actionId,
n: name,
a: args,
n: opts.name,
a: opts.args,
},
},
} satisfies wsToServer.ToServer);
Expand Down Expand Up @@ -255,12 +259,13 @@ enc
}
}

async #connectWebSocket() {
async #connectWebSocket({ signal }: { signal?: AbortSignal } = {}) {
const ws = await this.#driver.connectWebSocket(
undefined,
this.#workerQuery,
this.#encodingKind,
this.#params,
signal ? { signal } : undefined,
);
this.#transport = { websocket: ws };
ws.onopen = () => {
Expand All @@ -277,12 +282,13 @@ enc
};
}

async #connectSse() {
async #connectSse({ signal }: { signal?: AbortSignal } = {}) {
const eventSource = await this.#driver.connectSse(
undefined,
this.#workerQuery,
this.#encodingKind,
this.#params,
signal ? { signal } : undefined,
);
this.#transport = { sse: eventSource };
eventSource.onopen = () => {
Expand Down Expand Up @@ -659,6 +665,7 @@ enc
this.#connectionId,
this.#connectionToken,
message,
opts?.signal ? { signal: opts.signal } : undefined,
);

if (!res.ok) {
Expand Down
23 changes: 13 additions & 10 deletions packages/core/src/client/worker-handle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,21 +52,23 @@ export class WorkerHandleRaw {
* @see {@link WorkerHandle}
* @template Args - The type of arguments to pass to the action function.
* @template Response - The type of the response returned by the action function.
* @param {string} name - The name of the action function to call.
* @param {...Args} args - The arguments to pass to the action function.
* @returns {Promise<Response>} - A promise that resolves to the response of the action function.
*/
async action<Args extends Array<unknown> = unknown[], Response = unknown>(
name: string,
...args: Args
): Promise<Response> {
async action<
Args extends Array<unknown> = unknown[],
Response = unknown,
>(opts: {
name: string;
args: Args;
signal?: AbortSignal;
}): Promise<Response> {
return await this.#driver.action<Args, Response>(
undefined,
this.#workerQuery,
this.#encodingKind,
this.#params,
name,
...args,
opts.name,
opts.args,
{ signal: opts.signal },
);
}

Expand Down Expand Up @@ -99,7 +101,7 @@ export class WorkerHandleRaw {
*
* @returns {Promise<string>} - A promise that resolves to the worker's ID
*/
async resolve(): Promise<string> {
async resolve({ signal }: { signal?: AbortSignal } = {}): Promise<string> {
if (
"getForKey" in this.#workerQuery ||
"getOrCreateForKey" in this.#workerQuery
Expand All @@ -110,6 +112,7 @@ export class WorkerHandleRaw {
this.#workerQuery,
this.#encodingKind,
this.#params,
signal ? { signal } : undefined,
);
this.#workerQuery = { getForId: { workerId } };
return workerId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ export function createTestInlineClientDriver(
encoding: Encoding,
params: unknown,
name: string,
...args: Args
args: Args
): Promise<Response> => {
return makeInlineRequest<Response>(
endpoint,
encoding,
transport,
"action",
[undefined, workerQuery, encoding, params, name, ...args],
[undefined, workerQuery, encoding, params, name, args],
);
},

Expand Down
4 changes: 3 additions & 1 deletion packages/core/src/inline-client-driver/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ export function createInlineClientDriver(
encoding: Encoding,
params: unknown,
actionName: string,
...args: Args
args: Args,
opts: { signal?: AbortSignal },
): Promise<Response> => {
try {
// Get the worker ID
Expand Down Expand Up @@ -115,6 +116,7 @@ export function createInlineClientDriver(
undefined,
workerId,
),
signal: opts?.signal,
});

return responseData.o as Response;
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/manager/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ export function createManagerRouter(
workerQuery,
encodingKind,
params,
undefined,
);

// Store a reference to the resolved WebSocket
Expand Down
3 changes: 0 additions & 3 deletions packages/drivers/file-system/src/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ export class FileSystemManagerDriver implements ManagerDriver {
workerId,
name: state.name,
key: state.key,
meta: undefined,
};
} catch (error) {
logger().error("failed to read worker state", { workerId, error });
Expand All @@ -66,7 +65,6 @@ export class FileSystemManagerDriver implements ManagerDriver {
workerId: worker.id,
name,
key: worker.key,
meta: undefined,
};
}

Expand Down Expand Up @@ -102,7 +100,6 @@ export class FileSystemManagerDriver implements ManagerDriver {
workerId,
name,
key,
meta: undefined,
};
}
}
8 changes: 4 additions & 4 deletions packages/drivers/memory/src/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ export class MemoryManagerDriver implements ManagerDriver {
this.#state = state;
}

async getForId({ workerId }: GetForIdInput): Promise<WorkerOutput | undefined> {
async getForId({
workerId,
}: GetForIdInput): Promise<WorkerOutput | undefined> {
// Validate the worker exists
const worker = this.#state.getWorker(workerId);
if (!worker) {
Expand All @@ -41,7 +43,6 @@ export class MemoryManagerDriver implements ManagerDriver {
workerId: worker.id,
name: worker.name,
key: worker.key,
meta: undefined,
};
}

Expand Down Expand Up @@ -75,7 +76,6 @@ export class MemoryManagerDriver implements ManagerDriver {
workerId: worker.id,
name,
key: worker.key,
meta: undefined,
};
}

Expand Down Expand Up @@ -105,6 +105,6 @@ export class MemoryManagerDriver implements ManagerDriver {

this.inspector.onWorkersChange(this.#state.getAllWorkers());

return { workerId, name, key, meta: undefined };
return { workerId, name, key };
}
}
2 changes: 0 additions & 2 deletions packages/drivers/redis/src/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ export class RedisManagerDriver implements ManagerDriver {
workerId,
name,
key,
meta: undefined,
};
}

Expand Down Expand Up @@ -134,7 +133,6 @@ export class RedisManagerDriver implements ManagerDriver {
workerId,
name,
key,
meta: undefined,
};
}

Expand Down
Loading
Loading