From 63e28e46527b2ddb73a9debc9dba137092adfe83 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Wed, 10 Sep 2025 11:58:27 -0700 Subject: [PATCH] fix(core): fix disconnecting sse connections --- packages/core/src/actor/router-endpoints.ts | 12 +++++------- packages/core/src/driver-test-suite/mod.ts | 2 +- .../src/driver-test-suite/tests/actor-conn.ts | 18 ++++++++++++------ 3 files changed, 18 insertions(+), 14 deletions(-) diff --git a/packages/core/src/actor/router-endpoints.ts b/packages/core/src/actor/router-endpoints.ts index 166d643c0..05dca2f64 100644 --- a/packages/core/src/actor/router-endpoints.ts +++ b/packages/core/src/actor/router-endpoints.ts @@ -360,16 +360,13 @@ export async function handleSseConnect( authData, ); - // HACK: This is required so the abort handler below works - // - // See https://github.com/honojs/hono/issues/1770#issuecomment-2461966225 - stream.onAbort(() => {}); - // Wait for close const abortResolver = Promise.withResolvers(); - c.req.raw.signal.addEventListener("abort", async () => { + + // Handle stream abort (when client closes the connection) + stream.onAbort(async () => { try { - logger().debug("sse shutting down"); + logger().debug("sse stream aborted"); // Cleanup if (connId) { @@ -384,6 +381,7 @@ export async function handleSseConnect( abortResolver.resolve(undefined); } catch (error) { logger().error("error closing sse connection", { error }); + abortResolver.resolve(undefined); } }); diff --git a/packages/core/src/driver-test-suite/mod.ts b/packages/core/src/driver-test-suite/mod.ts index 388a53f11..d0c2228b2 100644 --- a/packages/core/src/driver-test-suite/mod.ts +++ b/packages/core/src/driver-test-suite/mod.ts @@ -88,7 +88,7 @@ export function runDriverTests( // TODO: Add back SSE once fixed in Rivet driver & CF lifecycle // for (const transport of ["websocket", "sse"] as Transport[]) { - for (const transport of ["websocket"] as Transport[]) { + for (const transport of ["websocket", "sse"] as Transport[]) { describe(`transport (${transport})`, () => { runActorConnTests({ ...driverTestConfig, diff --git a/packages/core/src/driver-test-suite/tests/actor-conn.ts b/packages/core/src/driver-test-suite/tests/actor-conn.ts index 98c42eaee..a0e3c9cfb 100644 --- a/packages/core/src/driver-test-suite/tests/actor-conn.ts +++ b/packages/core/src/driver-test-suite/tests/actor-conn.ts @@ -84,8 +84,16 @@ export function runActorConnTests(driverTestConfig: DriverTestConfig) { // Set up event listener const receivedEvents: number[] = []; - connection.on("newCount", (count: number) => { - receivedEvents.push(count); + const receivedEventsPromise = new Promise((resolve) => { + connection.on("newCount", (count: number) => { + receivedEvents.push(count); + if ( + receivedEvents.includes(1) && + receivedEvents.includes(6) && + receivedEvents.includes(9) + ) + resolve(undefined); + }); }); // Send one RPC call over the connection to ensure it's open @@ -96,10 +104,8 @@ export function runActorConnTests(driverTestConfig: DriverTestConfig) { await handle.increment(5); await handle.increment(3); - // Verify events were received from both connection and handle calls - expect(receivedEvents).toContain(1); // From connection call - expect(receivedEvents).toContain(6); // From first handle call (1+5) - expect(receivedEvents).toContain(9); // From second handle call (6+3) + // Wait for all events to be received + await receivedEventsPromise; // Clean up await connection.dispose();