Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import type { ConnDriver } from "../driver";
import { DriverReadyState } from "../driver";

/**
* Creates a raw HTTP connection driver.
*
* This driver is used for raw HTTP connections that don't use the RivetKit protocol.
* Unlike the standard HTTP driver, this provides connection lifecycle management
* for tracking the HTTP request through the actor's onRequest handler.
*/
export function createRawHttpSocket(): ConnDriver {
return {
requestId: crypto.randomUUID(),
requestIdBuf: undefined,
hibernatable: false,

disconnect: async () => {
// Noop
},

getConnectionReadyState: (): DriverReadyState | undefined => {
// HTTP connections are always considered open until the request completes
return DriverReadyState.OPEN;
},
};
}
44 changes: 33 additions & 11 deletions rivetkit-typescript/packages/rivetkit/src/actor/router-endpoints.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ import {
import {
arrayBuffersEqual,
bufferToArrayBuffer,
idToStr,
promiseWithResolvers,
} from "@/utils";
import { createHttpSocket } from "./conn/drivers/http";
import { createRawHttpSocket } from "./conn/drivers/raw-http";
import { createRawWebSocketSocket } from "./conn/drivers/raw-websocket";
import { createWebSocketSocket } from "./conn/drivers/websocket";
import type { ActorDriver } from "./driver";
Expand Down Expand Up @@ -394,22 +396,18 @@ export async function handleRawWebSocketHandler(
return {
onOpen: async (evt: any, ws: any) => {
// Extract rivetRequestId provided by engine runner
const rivetRequestId = evt?.rivetRequestId;
const isHibernatable =
!!rivetRequestId &&
!!requestIdBuf &&
actor[
ACTOR_INSTANCE_PERSIST_SYMBOL
].hibernatableConns.findIndex((conn) =>
arrayBuffersEqual(
conn.hibernatableRequestId,
rivetRequestId,
),
arrayBuffersEqual(conn.hibernatableRequestId, requestIdBuf),
) !== -1;

// Wrap the Hono WebSocket in our adapter
const adapter = new HonoWebSocketAdapter(
ws,
rivetRequestId,
requestIdBuf,
isHibernatable,
);

Expand All @@ -434,13 +432,13 @@ export async function handleRawWebSocketHandler(

try {
// Create connection using actor.createConn - this handles deduplication for hibernatable connections
const requestId = rivetRequestId
? String(rivetRequestId)
const requestIdStr = requestIdBuf
? idToStr(requestIdBuf)
: crypto.randomUUID();
const conn = await actor.createConn(
createRawWebSocketSocket(
requestId,
rivetRequestId,
requestIdStr,
requestIdBuf,
isHibernatable,
adapter,
closePromiseResolvers.promise,
Expand Down Expand Up @@ -550,6 +548,30 @@ export function getRequestConnParams(req: HonoRequest): unknown {
}
}

export async function handleRawHttpHandler(
req: Request,
actorDriver: ActorDriver,
actorId: string,
): Promise<Response> {
const actor = await actorDriver.loadActor(actorId);

// Track connection outside of scope for cleanup
let createdConn: AnyConn | undefined;

try {
const conn = await actor.createConn(createRawHttpSocket(), {}, req);

createdConn = conn;

return await actor.handleRawRequest(req, {});
} finally {
// Clean up the connection after the request completes
if (createdConn) {
actor.connDisconnected(createdConn, true);
}
}
}

/**
* Truncase the PATH_WEBSOCKET_PREFIX path prefix in order to pass a clean
* path to the onWebSocket handler.
Expand Down
20 changes: 7 additions & 13 deletions rivetkit-typescript/packages/rivetkit/src/actor/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
type ConnectWebSocketOutput,
type ConnsMessageOpts,
handleAction,
handleRawHttpHandler,
handleRawWebSocketHandler,
handleWebSocketConnect,
} from "@/actor/router-endpoints";
Expand All @@ -31,7 +32,6 @@ import { isInspectorEnabled, secureInspector } from "@/inspector/utils";
import type { RunnerConfig } from "@/registry/run-config";
import { CONN_DRIVER_SYMBOL, generateConnRequestId } from "./conn/mod";
import type { ActorDriver } from "./driver";
import { InternalError } from "./errors";
import { loggerWithoutContext } from "./log";

export type {
Expand Down Expand Up @@ -173,12 +173,10 @@ export function createActorRouter(

// Raw HTTP endpoints - /request/*
router.all("/request/*", async (c) => {
const actor = await actorDriver.loadActor(c.env.actorId);

// TODO: This is not a clean way of doing this since `/http/` might exist mid-path
// Strip the /http prefix from the URL to get the original path
const url = new URL(c.req.url);
const originalPath = url.pathname.replace(/^\/raw\/http/, "") || "/";
const originalPath = url.pathname.replace(/^\/request/, "") || "/";

// Create a new request with the corrected URL
const correctedUrl = new URL(originalPath + url.search, url.origin);
Expand All @@ -195,15 +193,11 @@ export function createActorRouter(
to: correctedRequest.url,
});

// Call the actor's onRequest handler - it will throw appropriate errors
const response = await actor.handleRawRequest(correctedRequest, {});

// This should never happen now since handleFetch throws errors
if (!response) {
throw new InternalError("handleFetch returned void unexpectedly");
}

return response;
return await handleRawHttpHandler(
correctedRequest,
actorDriver,
c.env.actorId,
);
});

// Raw WebSocket endpoint - /websocket/*
Expand Down
Loading