Skip to content
This repository was archived by the owner on Oct 22, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ async function createPRForIssue(
c.state.github.prInfo = await github.createPullRequest(
c,
`${title}`, // Just use the issue title
`Closes ${issueFriendlyId}\n\nImplements changes requested in Linear issue.\n\n${summary}\n\n*Authored by ActorCore Coding Agent*`, // Include "Closes" keyword
`Closes ${issueFriendlyId}\n\nImplements changes requested in Linear issue.\n\n${summary}\n\n*Authored by RivetKit Coding Agent*`, // Include "Closes" keyword
c.state.github.branchName,
c.state.github.baseBranch,
);
Expand Down Expand Up @@ -713,4 +713,4 @@ export async function getIssueStatus(
console.error(`[LINEAR] Failed to get issue status:`, error);
return null;
}
}
}
22 changes: 15 additions & 7 deletions packages/actor/scripts/dump-openapi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { OpenAPIHono } from "@hono/zod-openapi";
import { VERSION } from "@/utils";
import * as fs from "node:fs/promises";
import { resolve } from "node:path";
import type { ClientDriver } from "@/client/client";

function main() {
const appConfig: AppConfig = AppConfigSchema.parse({ actors: {} });
Expand Down Expand Up @@ -40,13 +41,20 @@ function main() {
},
};

const managerRouter = createManagerRouter(appConfig, driverConfig, {
proxyMode: {
inline: {
handlers: sharedConnectionHandlers,
},
},
}) as unknown as OpenAPIHono;
const clientDriver: ClientDriver = {
action: unimplemented,
resolveActorId: unimplemented,
connectWebSocket: unimplemented,
connectSse: unimplemented,
sendHttpMessage: unimplemented,
};

const managerRouter = createManagerRouter(
appConfig,
driverConfig,
clientDriver,
{},
) as unknown as OpenAPIHono;

const openApiDoc = managerRouter.getOpenAPIDocument({
openapi: "3.0.0",
Expand Down
82 changes: 1 addition & 81 deletions packages/actor/src/actor/router-endpoints.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export interface ConnectSseOutput {
}

export interface ActionOpts {
req: HonoRequest;
req?: HonoRequest;
params: unknown;
actionName: string;
actionArgs: unknown[];
Expand Down Expand Up @@ -266,86 +266,6 @@ export async function handleSseConnect(
});
}

/**
* Creates an action handler
*/
export async function handleAction(
c: HonoContext,
appConfig: AppConfig,
driverConfig: DriverConfig,
handler: (opts: ActionOpts) => Promise<ActionOutput>,
actionName: string,
actorId: string,
) {
const encoding = getRequestEncoding(c.req, false);
const parameters = getRequestConnParams(c.req, appConfig, driverConfig);

logger().debug("handling action", { actionName, encoding });

// Validate incoming request
let actionArgs: unknown[];
if (encoding === "json") {
try {
actionArgs = await c.req.json();
} catch (err) {
throw new errors.InvalidActionRequest("Invalid JSON");
}

if (!Array.isArray(actionArgs)) {
throw new errors.InvalidActionRequest("Action arguments must be an array");
}
} else if (encoding === "cbor") {
try {
const value = await c.req.arrayBuffer();
const uint8Array = new Uint8Array(value);
const deserialized = await deserialize(
uint8Array as unknown as InputData,
encoding,
);

// Validate using the action schema
const result = protoHttpAction.ActionRequestSchema.safeParse(deserialized);
if (!result.success) {
throw new errors.InvalidActionRequest("Invalid action request format");
}

actionArgs = result.data.a;
} catch (err) {
throw new errors.InvalidActionRequest(
`Invalid binary format: ${stringifyError(err)}`,
);
}
} else {
return assertUnreachable(encoding);
}

// Invoke the action
const result = await handler({
req: c.req,
params: parameters,
actionName: actionName,
actionArgs: actionArgs,
actorId,
});

// Encode the response
if (encoding === "json") {
return c.json(result.output as Record<string, unknown>);
} else if (encoding === "cbor") {
// Use serialize from serde.ts instead of custom encoder
const responseData = {
o: result.output, // Use the format expected by ResponseOkSchema
};
const serialized = serialize(responseData, encoding);

return c.body(serialized as Uint8Array, 200, {
"Content-Type": "application/octet-stream",
});
} else {
return assertUnreachable(encoding);
}
}

/**
* Create a connection message handler
*/
Expand Down
178 changes: 178 additions & 0 deletions packages/actor/src/app/inline-client-driver.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
import * as errors from "@/actor/errors";
import { logger } from "./log";
import type { EventSource } from "eventsource";
import type * as wsToServer from "@/actor/protocol/message/to-server";
import { type Encoding, serialize } from "@/actor/protocol/serde";
import { type ConnectionHandlers } from "@/actor/router-endpoints";
import { HonoRequest, type Context as HonoContext, type Next } from "hono";
import invariant from "invariant";
import { ClientDriver } from "@/client/client";
import { ManagerDriver } from "@/manager/driver";
import { ActorQuery } from "@/manager/protocol/query";

/**
* Client driver that calls the manager driver inline.
*
* This driver can access private resources.
*
* This driver serves a double purpose as:
* - Providing the client for the internal requests
* - Provide the driver for the manager HTTP router (see manager/router.ts)
*/
export function createInlineClientDriver(
managerDriver: ManagerDriver,
connHandlers: ConnectionHandlers,
): ClientDriver {
//// Lazily import the dynamic imports so we don't have to turn `createClient` in to an aysnc fn
//const dynamicImports = (async () => {
// // Import dynamic dependencies
// const [WebSocket, EventSource] = await Promise.all([
// importWebSocket(),
// importEventSource(),
// ]);
// return {
// WebSocket,
// EventSource,
// };
//})();

const driver: ClientDriver = {
action: async <Args extends Array<unknown> = unknown[], Response = unknown>(
req: HonoRequest | undefined,
actorQuery: ActorQuery,
encoding: Encoding,
params: unknown,
actionName: string,
...args: Args
): Promise<Response> => {
// Get the actor ID and meta
const { actorId, meta } = await queryActor(
req,
actorQuery,
managerDriver,
);
logger().debug("found actor for action", { actorId, meta });
invariant(actorId, "Missing actor ID");

logger().debug("handling action", { actionName, encoding });

// Invoke the action
const { output } = await connHandlers.onAction({
req,
params,
actionName,
actionArgs: args,
actorId,
});

return output as Response;
},

resolveActorId: async (
req: HonoRequest | undefined,
actorQuery: ActorQuery,
_encodingKind: Encoding,
): Promise<string> => {
// Get the actor ID and meta
const { actorId } = await queryActor(req, actorQuery, managerDriver);
logger().debug("resolved actor", { actorId });
invariant(actorId, "Missing actor ID");

return actorId;
},

connectWebSocket: async (
req: HonoRequest | undefined,
actorQuery: ActorQuery,
encodingKind: Encoding,
): Promise<WebSocket> => {
throw "UNIMPLEMENTED";
},

connectSse: async (
req: HonoRequest | undefined,
actorQuery: ActorQuery,
encodingKind: Encoding,
params: unknown,
): Promise<EventSource> => {
throw "UNIMPLEMENTED";
},

sendHttpMessage: async (
req: HonoRequest | undefined,
actorId: string,
encoding: Encoding,
connectionId: string,
connectionToken: string,
message: wsToServer.ToServer,
): Promise<Response> => {
throw "UNIMPLEMENTED";
},
};

return driver;
}

/**
* Query the manager driver to get or create an actor based on the provided query
*/
export async function queryActor(
req: HonoRequest | undefined,
query: ActorQuery,
driver: ManagerDriver,
): Promise<{ actorId: string; meta?: unknown }> {
logger().debug("querying actor", { query });
let actorOutput: { actorId: string; meta?: unknown };
if ("getForId" in query) {
const output = await driver.getForId({
req,
actorId: query.getForId.actorId,
});
if (!output) throw new errors.ActorNotFound(query.getForId.actorId);
actorOutput = output;
} else if ("getForKey" in query) {
const existingActor = await driver.getWithKey({
req,
name: query.getForKey.name,
key: query.getForKey.key,
});
if (!existingActor) {
throw new errors.ActorNotFound(
`${query.getForKey.name}:${JSON.stringify(query.getForKey.key)}`,
);
}
actorOutput = existingActor;
} else if ("getOrCreateForKey" in query) {
const getOrCreateOutput = await driver.getOrCreateWithKey({
req,
name: query.getOrCreateForKey.name,
key: query.getOrCreateForKey.key,
input: query.getOrCreateForKey.input,
region: query.getOrCreateForKey.region,
});
actorOutput = {
actorId: getOrCreateOutput.actorId,
meta: getOrCreateOutput.meta,
};
} else if ("create" in query) {
const createOutput = await driver.createActor({
req,
name: query.create.name,
key: query.create.key,
input: query.create.input,
region: query.create.region,
});
actorOutput = {
actorId: createOutput.actorId,
meta: createOutput.meta,
};
} else {
throw new errors.InvalidRequest("Invalid query format");
}

logger().debug("actor query result", {
actorId: actorOutput.actorId,
meta: actorOutput.meta,
});
return { actorId: actorOutput.actorId, meta: actorOutput.meta };
}
7 changes: 7 additions & 0 deletions packages/actor/src/app/log.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { getLogger } from "@/common//log";

export const LOGGER_NAME = "actor-app";

export function logger() {
return getLogger(LOGGER_NAME);
}
16 changes: 3 additions & 13 deletions packages/actor/src/client/actor-conn.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
import type { AnyActorDefinition } from "@/actor/definition";
import type { Transport } from "@/actor/protocol/message/mod";
import type * as wsToClient from "@/actor/protocol/message/to-client";
import type * as wsToServer from "@/actor/protocol/message/to-server";
import type { Encoding } from "@/actor/protocol/serde";
import { importEventSource } from "@/common/eventsource";
import { MAX_CONN_PARAMS_SIZE } from "@/common/network";
import { httpUserAgent } from "@/utils";
import { assertUnreachable, stringifyError } from "@/common/utils";
import { importWebSocket } from "@/common/websocket";
import type { ActorQuery } from "@/manager/protocol/query";
import * as cbor from "cbor-x";
import pRetry from "p-retry";
Expand All @@ -20,14 +15,6 @@ import {
import * as errors from "./errors";
import { logger } from "./log";
import { type WebSocketMessage as ConnMessage, messageLength, serializeWithEncoding } from "./utils";
import {
HEADER_ACTOR_ID,
HEADER_ACTOR_QUERY,
HEADER_CONN_ID,
HEADER_CONN_TOKEN,
HEADER_ENCODING,
HEADER_CONN_PARAMS,
} from "@/actor/router-endpoints";
import type { EventSource } from "eventsource";
import { ActorDefinitionActions } from "./actor-common";

Expand Down Expand Up @@ -251,6 +238,7 @@ enc

async #connectWebSocket() {
const ws = await this.#driver.connectWebSocket(
undefined,
this.#actorQuery,
this.#encodingKind,
);
Expand Down Expand Up @@ -281,6 +269,7 @@ enc

async #connectSse() {
const eventSource = await this.#driver.connectSse(
undefined,
this.#actorQuery,
this.#encodingKind,
this.#params,
Expand Down Expand Up @@ -649,6 +638,7 @@ enc
throw new errors.InternalError("Missing connection ID or token.");

const res = await this.#driver.sendHttpMessage(
undefined,
this.#actorId,
this.#encodingKind,
this.#connectionId,
Expand Down
Loading
Loading