Skip to content

Commit 2368e4d

Browse files
committed
chore(rivetkit): close websockets on out of sequence messages
1 parent e5467bd commit 2368e4d

File tree

1 file changed

+43
-28
lines changed

1 file changed

+43
-28
lines changed

rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts

Lines changed: 43 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -611,12 +611,11 @@ export class EngineActorDriver implements ActorDriver {
611611
}
612612

613613
websocket.addEventListener("message", (event: RivetMessageEvent) => {
614-
wsHandlerPromise.then((x) => x.onMessage?.(event, wsContext));
615-
616614
invariant(event.rivetRequestId, "missing rivetRequestId");
617615
invariant(event.rivetMessageIndex, "missing rivetMessageIndex");
618616

619617
// Handle hibernatable WebSockets:
618+
// - Check for out of sequence messages
620619
// - Save msgIndex for WS restoration
621620
// - Queue WS acks
622621
const actorHandler = this.#actors.get(actorId);
@@ -628,60 +627,73 @@ export class EngineActorDriver implements ActorDriver {
628627
);
629628

630629
if (hibernatableWs) {
631-
// Update msgIndex for next WebSocket open msgIndex restoration
632-
const oldMsgIndex = hibernatableWs.msgIndex;
633-
hibernatableWs.msgIndex = BigInt(event.rivetMessageIndex);
634-
hibernatableWs.lastSeenTimestamp = BigInt(Date.now());
635-
636-
logger().debug({
637-
msg: "updated hibernatable websocket msgIndex in engine driver",
638-
requestId,
639-
oldMsgIndex: oldMsgIndex.toString(),
640-
newMsgIndex: event.rivetMessageIndex,
641-
actorId,
642-
});
643-
644630
// Track msgIndex for sending acks
645631
const currentEntry =
646632
this.#hibernatableWebSocketAckQueue.get(requestId);
647633
if (currentEntry) {
648634
const previousIndex = currentEntry.messageIndex;
649635

650-
// Warn about any non-sequential message indices
636+
// Check for out-of-sequence messages
651637
if (event.rivetMessageIndex !== previousIndex + 1) {
638+
let closeReason: string;
639+
let sequenceType: string;
640+
641+
if (event.rivetMessageIndex < previousIndex) {
642+
closeReason = "ws.message_index_regressed";
643+
sequenceType = "regressed";
644+
} else if (
645+
event.rivetMessageIndex === previousIndex
646+
) {
647+
closeReason = "ws.message_index_duplicate";
648+
sequenceType = "duplicate";
649+
} else {
650+
closeReason = "ws.message_index_skip";
651+
sequenceType = "gap/skipped";
652+
}
653+
652654
logger().warn({
653-
msg: "websocket message index out of sequence",
655+
msg: "hibernatable websocket message index out of sequence, closing connection",
654656
requestId,
655657
actorId,
656658
previousIndex,
657659
expectedIndex: previousIndex + 1,
658660
receivedIndex: event.rivetMessageIndex,
659-
sequenceType:
660-
event.rivetMessageIndex < previousIndex
661-
? "regressed"
662-
: event.rivetMessageIndex ===
663-
previousIndex
664-
? "duplicate"
665-
: "gap/skipped",
661+
sequenceType,
662+
closeReason,
666663
gap:
667664
event.rivetMessageIndex > previousIndex
668665
? event.rivetMessageIndex -
669666
previousIndex -
670667
1
671668
: 0,
672669
});
673-
}
674670

675-
// Update to the highest seen index
676-
if (event.rivetMessageIndex > previousIndex) {
677-
currentEntry.messageIndex = event.rivetMessageIndex;
671+
// Close the WebSocket and skip processing
672+
wsContext.close(1008, closeReason);
673+
return;
678674
}
675+
676+
// Update to the next index
677+
currentEntry.messageIndex = event.rivetMessageIndex;
679678
} else {
680679
this.#hibernatableWebSocketAckQueue.set(requestId, {
681680
requestIdBuf,
682681
messageIndex: event.rivetMessageIndex,
683682
});
684683
}
684+
685+
// Update msgIndex for next WebSocket open msgIndex restoration
686+
const oldMsgIndex = hibernatableWs.msgIndex;
687+
hibernatableWs.msgIndex = BigInt(event.rivetMessageIndex);
688+
hibernatableWs.lastSeenTimestamp = BigInt(Date.now());
689+
690+
logger().debug({
691+
msg: "updated hibernatable websocket msgIndex in engine driver",
692+
requestId,
693+
oldMsgIndex: oldMsgIndex.toString(),
694+
newMsgIndex: event.rivetMessageIndex,
695+
actorId,
696+
});
685697
}
686698
} else {
687699
// Warn if we receive a message for a hibernatable websocket but can't find the actor
@@ -694,6 +706,9 @@ export class EngineActorDriver implements ActorDriver {
694706
hasActor: !!actorHandler?.actor,
695707
});
696708
}
709+
710+
// Process the message after all hibernation logic and validation in case the message is out of order
711+
wsHandlerPromise.then((x) => x.onMessage?.(event, wsContext));
697712
});
698713

699714
websocket.addEventListener("close", (event) => {

0 commit comments

Comments
 (0)