From 989db0f82fe8ce8166bf7c704f86a032cba4f138 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Thu, 6 Nov 2025 17:22:45 -0800 Subject: [PATCH] chore(rivetkit): close websockets on out of sequence messages --- .../src/drivers/engine/actor-driver.ts | 71 +++++++++++-------- 1 file changed, 43 insertions(+), 28 deletions(-) diff --git a/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts b/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts index 55512ea142..2ae17efeb9 100644 --- a/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts +++ b/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts @@ -614,12 +614,11 @@ export class EngineActorDriver implements ActorDriver { } websocket.addEventListener("message", (event: RivetMessageEvent) => { - wsHandlerPromise.then((x) => x.onMessage?.(event, wsContext)); - invariant(event.rivetRequestId, "missing rivetRequestId"); invariant(event.rivetMessageIndex, "missing rivetMessageIndex"); // Handle hibernatable WebSockets: + // - Check for out of sequence messages // - Save msgIndex for WS restoration // - Queue WS acks const actorHandler = this.#actors.get(actorId); @@ -631,41 +630,39 @@ export class EngineActorDriver implements ActorDriver { ); if (hibernatableWs) { - // Update msgIndex for next WebSocket open msgIndex restoration - const oldMsgIndex = hibernatableWs.msgIndex; - hibernatableWs.msgIndex = event.rivetMessageIndex; - hibernatableWs.lastSeenTimestamp = Date.now(); - - logger().debug({ - msg: "updated hibernatable websocket msgIndex in engine driver", - requestId, - oldMsgIndex: oldMsgIndex.toString(), - newMsgIndex: event.rivetMessageIndex, - actorId, - }); - // Track msgIndex for sending acks const currentEntry = this.#hibernatableWebSocketAckQueue.get(requestId); if (currentEntry) { const previousIndex = currentEntry.messageIndex; - // Warn about any non-sequential message indices + // Check for out-of-sequence messages if (event.rivetMessageIndex !== previousIndex + 1) { + let closeReason: string; + let sequenceType: string; + + if (event.rivetMessageIndex < previousIndex) { + closeReason = "ws.message_index_regressed"; + sequenceType = "regressed"; + } else if ( + event.rivetMessageIndex === previousIndex + ) { + closeReason = "ws.message_index_duplicate"; + sequenceType = "duplicate"; + } else { + closeReason = "ws.message_index_skip"; + sequenceType = "gap/skipped"; + } + logger().warn({ - msg: "websocket message index out of sequence", + msg: "hibernatable websocket message index out of sequence, closing connection", requestId, actorId, previousIndex, expectedIndex: previousIndex + 1, receivedIndex: event.rivetMessageIndex, - sequenceType: - event.rivetMessageIndex < previousIndex - ? "regressed" - : event.rivetMessageIndex === - previousIndex - ? "duplicate" - : "gap/skipped", + sequenceType, + closeReason, gap: event.rivetMessageIndex > previousIndex ? event.rivetMessageIndex - @@ -673,18 +670,33 @@ export class EngineActorDriver implements ActorDriver { 1 : 0, }); - } - // Update to the highest seen index - if (event.rivetMessageIndex > previousIndex) { - currentEntry.messageIndex = event.rivetMessageIndex; + // Close the WebSocket and skip processing + wsContext.close(1008, closeReason); + return; } + + // Update to the next index + currentEntry.messageIndex = event.rivetMessageIndex; } else { this.#hibernatableWebSocketAckQueue.set(requestId, { requestIdBuf, messageIndex: event.rivetMessageIndex, }); } + + // Update msgIndex for next WebSocket open msgIndex restoration + const oldMsgIndex = hibernatableWs.msgIndex; + hibernatableWs.msgIndex = event.rivetMessageIndex; + hibernatableWs.lastSeenTimestamp = Date.now(); + + logger().debug({ + msg: "updated hibernatable websocket msgIndex in engine driver", + requestId, + oldMsgIndex: oldMsgIndex.toString(), + newMsgIndex: event.rivetMessageIndex, + actorId, + }); } } else { // Warn if we receive a message for a hibernatable websocket but can't find the actor @@ -697,6 +709,9 @@ export class EngineActorDriver implements ActorDriver { hasActor: !!actorHandler?.actor, }); } + + // Process the message after all hibernation logic and validation in case the message is out of order + wsHandlerPromise.then((x) => x.onMessage?.(event, wsContext)); }); websocket.addEventListener("close", (event) => {