Skip to content

Commit 989db0f

Browse files
committed
chore(rivetkit): close websockets on out of sequence messages
1 parent 406bd5a commit 989db0f

File tree

1 file changed

+43
-28
lines changed

1 file changed

+43
-28
lines changed

rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts

Lines changed: 43 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -614,12 +614,11 @@ export class EngineActorDriver implements ActorDriver {
614614
}
615615

616616
websocket.addEventListener("message", (event: RivetMessageEvent) => {
617-
wsHandlerPromise.then((x) => x.onMessage?.(event, wsContext));
618-
619617
invariant(event.rivetRequestId, "missing rivetRequestId");
620618
invariant(event.rivetMessageIndex, "missing rivetMessageIndex");
621619

622620
// Handle hibernatable WebSockets:
621+
// - Check for out of sequence messages
623622
// - Save msgIndex for WS restoration
624623
// - Queue WS acks
625624
const actorHandler = this.#actors.get(actorId);
@@ -631,60 +630,73 @@ export class EngineActorDriver implements ActorDriver {
631630
);
632631

633632
if (hibernatableWs) {
634-
// Update msgIndex for next WebSocket open msgIndex restoration
635-
const oldMsgIndex = hibernatableWs.msgIndex;
636-
hibernatableWs.msgIndex = event.rivetMessageIndex;
637-
hibernatableWs.lastSeenTimestamp = Date.now();
638-
639-
logger().debug({
640-
msg: "updated hibernatable websocket msgIndex in engine driver",
641-
requestId,
642-
oldMsgIndex: oldMsgIndex.toString(),
643-
newMsgIndex: event.rivetMessageIndex,
644-
actorId,
645-
});
646-
647633
// Track msgIndex for sending acks
648634
const currentEntry =
649635
this.#hibernatableWebSocketAckQueue.get(requestId);
650636
if (currentEntry) {
651637
const previousIndex = currentEntry.messageIndex;
652638

653-
// Warn about any non-sequential message indices
639+
// Check for out-of-sequence messages
654640
if (event.rivetMessageIndex !== previousIndex + 1) {
641+
let closeReason: string;
642+
let sequenceType: string;
643+
644+
if (event.rivetMessageIndex < previousIndex) {
645+
closeReason = "ws.message_index_regressed";
646+
sequenceType = "regressed";
647+
} else if (
648+
event.rivetMessageIndex === previousIndex
649+
) {
650+
closeReason = "ws.message_index_duplicate";
651+
sequenceType = "duplicate";
652+
} else {
653+
closeReason = "ws.message_index_skip";
654+
sequenceType = "gap/skipped";
655+
}
656+
655657
logger().warn({
656-
msg: "websocket message index out of sequence",
658+
msg: "hibernatable websocket message index out of sequence, closing connection",
657659
requestId,
658660
actorId,
659661
previousIndex,
660662
expectedIndex: previousIndex + 1,
661663
receivedIndex: event.rivetMessageIndex,
662-
sequenceType:
663-
event.rivetMessageIndex < previousIndex
664-
? "regressed"
665-
: event.rivetMessageIndex ===
666-
previousIndex
667-
? "duplicate"
668-
: "gap/skipped",
664+
sequenceType,
665+
closeReason,
669666
gap:
670667
event.rivetMessageIndex > previousIndex
671668
? event.rivetMessageIndex -
672669
previousIndex -
673670
1
674671
: 0,
675672
});
676-
}
677673

678-
// Update to the highest seen index
679-
if (event.rivetMessageIndex > previousIndex) {
680-
currentEntry.messageIndex = event.rivetMessageIndex;
674+
// Close the WebSocket and skip processing
675+
wsContext.close(1008, closeReason);
676+
return;
681677
}
678+
679+
// Update to the next index
680+
currentEntry.messageIndex = event.rivetMessageIndex;
682681
} else {
683682
this.#hibernatableWebSocketAckQueue.set(requestId, {
684683
requestIdBuf,
685684
messageIndex: event.rivetMessageIndex,
686685
});
687686
}
687+
688+
// Update msgIndex for next WebSocket open msgIndex restoration
689+
const oldMsgIndex = hibernatableWs.msgIndex;
690+
hibernatableWs.msgIndex = event.rivetMessageIndex;
691+
hibernatableWs.lastSeenTimestamp = Date.now();
692+
693+
logger().debug({
694+
msg: "updated hibernatable websocket msgIndex in engine driver",
695+
requestId,
696+
oldMsgIndex: oldMsgIndex.toString(),
697+
newMsgIndex: event.rivetMessageIndex,
698+
actorId,
699+
});
688700
}
689701
} else {
690702
// Warn if we receive a message for a hibernatable websocket but can't find the actor
@@ -697,6 +709,9 @@ export class EngineActorDriver implements ActorDriver {
697709
hasActor: !!actorHandler?.actor,
698710
});
699711
}
712+
713+
// Process the message after all hibernation logic and validation in case the message is out of order
714+
wsHandlerPromise.then((x) => x.onMessage?.(event, wsContext));
700715
});
701716

702717
websocket.addEventListener("close", (event) => {

0 commit comments

Comments
 (0)