Skip to content
This repository was archived by the owner on Oct 22, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions packages/core/src/actor/router-endpoints.ts
Original file line number Diff line number Diff line change
Expand Up @@ -360,16 +360,13 @@ export async function handleSseConnect(
authData,
);

// HACK: This is required so the abort handler below works
//
// See https://github.com/honojs/hono/issues/1770#issuecomment-2461966225
stream.onAbort(() => {});

// Wait for close
const abortResolver = Promise.withResolvers();
c.req.raw.signal.addEventListener("abort", async () => {

// Handle stream abort (when client closes the connection)
stream.onAbort(async () => {
try {
logger().debug("sse shutting down");
logger().debug("sse stream aborted");

// Cleanup
if (connId) {
Expand All @@ -384,6 +381,7 @@ export async function handleSseConnect(
abortResolver.resolve(undefined);
} catch (error) {
logger().error("error closing sse connection", { error });
abortResolver.resolve(undefined);
}
});

Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/driver-test-suite/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ export function runDriverTests(

// TODO: Add back SSE once fixed in Rivet driver & CF lifecycle
// for (const transport of ["websocket", "sse"] as Transport[]) {
for (const transport of ["websocket"] as Transport[]) {
for (const transport of ["websocket", "sse"] as Transport[]) {
describe(`transport (${transport})`, () => {
runActorConnTests({
...driverTestConfig,
Expand Down
18 changes: 12 additions & 6 deletions packages/core/src/driver-test-suite/tests/actor-conn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,16 @@ export function runActorConnTests(driverTestConfig: DriverTestConfig) {

// Set up event listener
const receivedEvents: number[] = [];
connection.on("newCount", (count: number) => {
receivedEvents.push(count);
const receivedEventsPromise = new Promise((resolve) => {
connection.on("newCount", (count: number) => {
receivedEvents.push(count);
if (
receivedEvents.includes(1) &&
receivedEvents.includes(6) &&
receivedEvents.includes(9)
)
resolve(undefined);
});
});

// Send one RPC call over the connection to ensure it's open
Expand All @@ -96,10 +104,8 @@ export function runActorConnTests(driverTestConfig: DriverTestConfig) {
await handle.increment(5);
await handle.increment(3);

// Verify events were received from both connection and handle calls
expect(receivedEvents).toContain(1); // From connection call
expect(receivedEvents).toContain(6); // From first handle call (1+5)
expect(receivedEvents).toContain(9); // From second handle call (6+3)
// Wait for all events to be received
await receivedEventsPromise;

// Clean up
await connection.dispose();
Expand Down
Loading