From d1c3287636d59cd6d4a8b3462be4babd9e058a88 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Sun, 9 Nov 2025 14:52:57 -0800 Subject: [PATCH] chore(rivetkit): add connection for raw http --- .../src/actor/conn/drivers/raw-http.ts | 26 +++++++++++ .../rivetkit/src/actor/router-endpoints.ts | 44 ++++++++++++++----- .../packages/rivetkit/src/actor/router.ts | 20 +++------ 3 files changed, 66 insertions(+), 24 deletions(-) create mode 100644 rivetkit-typescript/packages/rivetkit/src/actor/conn/drivers/raw-http.ts diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/conn/drivers/raw-http.ts b/rivetkit-typescript/packages/rivetkit/src/actor/conn/drivers/raw-http.ts new file mode 100644 index 0000000000..91b2e1d193 --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/src/actor/conn/drivers/raw-http.ts @@ -0,0 +1,26 @@ +import type { ConnDriver } from "../driver"; +import { DriverReadyState } from "../driver"; + +/** + * Creates a raw HTTP connection driver. + * + * This driver is used for raw HTTP connections that don't use the RivetKit protocol. + * Unlike the standard HTTP driver, this provides connection lifecycle management + * for tracking the HTTP request through the actor's onRequest handler. + */ +export function createRawHttpSocket(): ConnDriver { + return { + requestId: crypto.randomUUID(), + requestIdBuf: undefined, + hibernatable: false, + + disconnect: async () => { + // Noop + }, + + getConnectionReadyState: (): DriverReadyState | undefined => { + // HTTP connections are always considered open until the request completes + return DriverReadyState.OPEN; + }, + }; +} diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/router-endpoints.ts b/rivetkit-typescript/packages/rivetkit/src/actor/router-endpoints.ts index 123421cd8e..6dcc77c2c9 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/router-endpoints.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/router-endpoints.ts @@ -33,9 +33,11 @@ import { import { arrayBuffersEqual, bufferToArrayBuffer, + idToStr, promiseWithResolvers, } from "@/utils"; import { createHttpSocket } from "./conn/drivers/http"; +import { createRawHttpSocket } from "./conn/drivers/raw-http"; import { createRawWebSocketSocket } from "./conn/drivers/raw-websocket"; import { createWebSocketSocket } from "./conn/drivers/websocket"; import type { ActorDriver } from "./driver"; @@ -394,22 +396,18 @@ export async function handleRawWebSocketHandler( return { onOpen: async (evt: any, ws: any) => { // Extract rivetRequestId provided by engine runner - const rivetRequestId = evt?.rivetRequestId; const isHibernatable = - !!rivetRequestId && + !!requestIdBuf && actor[ ACTOR_INSTANCE_PERSIST_SYMBOL ].hibernatableConns.findIndex((conn) => - arrayBuffersEqual( - conn.hibernatableRequestId, - rivetRequestId, - ), + arrayBuffersEqual(conn.hibernatableRequestId, requestIdBuf), ) !== -1; // Wrap the Hono WebSocket in our adapter const adapter = new HonoWebSocketAdapter( ws, - rivetRequestId, + requestIdBuf, isHibernatable, ); @@ -434,13 +432,13 @@ export async function handleRawWebSocketHandler( try { // Create connection using actor.createConn - this handles deduplication for hibernatable connections - const requestId = rivetRequestId - ? String(rivetRequestId) + const requestIdStr = requestIdBuf + ? idToStr(requestIdBuf) : crypto.randomUUID(); const conn = await actor.createConn( createRawWebSocketSocket( - requestId, - rivetRequestId, + requestIdStr, + requestIdBuf, isHibernatable, adapter, closePromiseResolvers.promise, @@ -550,6 +548,30 @@ export function getRequestConnParams(req: HonoRequest): unknown { } } +export async function handleRawHttpHandler( + req: Request, + actorDriver: ActorDriver, + actorId: string, +): Promise { + const actor = await actorDriver.loadActor(actorId); + + // Track connection outside of scope for cleanup + let createdConn: AnyConn | undefined; + + try { + const conn = await actor.createConn(createRawHttpSocket(), {}, req); + + createdConn = conn; + + return await actor.handleRawRequest(req, {}); + } finally { + // Clean up the connection after the request completes + if (createdConn) { + actor.connDisconnected(createdConn, true); + } + } +} + /** * Truncase the PATH_WEBSOCKET_PREFIX path prefix in order to pass a clean * path to the onWebSocket handler. diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/router.ts b/rivetkit-typescript/packages/rivetkit/src/actor/router.ts index e64943e4a3..41dccd989d 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/router.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/router.ts @@ -8,6 +8,7 @@ import { type ConnectWebSocketOutput, type ConnsMessageOpts, handleAction, + handleRawHttpHandler, handleRawWebSocketHandler, handleWebSocketConnect, } from "@/actor/router-endpoints"; @@ -31,7 +32,6 @@ import { isInspectorEnabled, secureInspector } from "@/inspector/utils"; import type { RunnerConfig } from "@/registry/run-config"; import { CONN_DRIVER_SYMBOL, generateConnRequestId } from "./conn/mod"; import type { ActorDriver } from "./driver"; -import { InternalError } from "./errors"; import { loggerWithoutContext } from "./log"; export type { @@ -173,12 +173,10 @@ export function createActorRouter( // Raw HTTP endpoints - /request/* router.all("/request/*", async (c) => { - const actor = await actorDriver.loadActor(c.env.actorId); - // TODO: This is not a clean way of doing this since `/http/` might exist mid-path // Strip the /http prefix from the URL to get the original path const url = new URL(c.req.url); - const originalPath = url.pathname.replace(/^\/raw\/http/, "") || "/"; + const originalPath = url.pathname.replace(/^\/request/, "") || "/"; // Create a new request with the corrected URL const correctedUrl = new URL(originalPath + url.search, url.origin); @@ -195,15 +193,11 @@ export function createActorRouter( to: correctedRequest.url, }); - // Call the actor's onRequest handler - it will throw appropriate errors - const response = await actor.handleRawRequest(correctedRequest, {}); - - // This should never happen now since handleFetch throws errors - if (!response) { - throw new InternalError("handleFetch returned void unexpectedly"); - } - - return response; + return await handleRawHttpHandler( + correctedRequest, + actorDriver, + c.env.actorId, + ); }); // Raw WebSocket endpoint - /websocket/*