Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -614,12 +614,11 @@ export class EngineActorDriver implements ActorDriver {
}

websocket.addEventListener("message", (event: RivetMessageEvent) => {
wsHandlerPromise.then((x) => x.onMessage?.(event, wsContext));

invariant(event.rivetRequestId, "missing rivetRequestId");
invariant(event.rivetMessageIndex, "missing rivetMessageIndex");

// Handle hibernatable WebSockets:
// - Check for out of sequence messages
// - Save msgIndex for WS restoration
// - Queue WS acks
const actorHandler = this.#actors.get(actorId);
Expand All @@ -631,60 +630,73 @@ export class EngineActorDriver implements ActorDriver {
);

if (hibernatableWs) {
// Update msgIndex for next WebSocket open msgIndex restoration
const oldMsgIndex = hibernatableWs.msgIndex;
hibernatableWs.msgIndex = event.rivetMessageIndex;
hibernatableWs.lastSeenTimestamp = Date.now();

logger().debug({
msg: "updated hibernatable websocket msgIndex in engine driver",
requestId,
oldMsgIndex: oldMsgIndex.toString(),
newMsgIndex: event.rivetMessageIndex,
actorId,
});

// Track msgIndex for sending acks
const currentEntry =
this.#hibernatableWebSocketAckQueue.get(requestId);
if (currentEntry) {
const previousIndex = currentEntry.messageIndex;

// Warn about any non-sequential message indices
// Check for out-of-sequence messages
if (event.rivetMessageIndex !== previousIndex + 1) {
let closeReason: string;
let sequenceType: string;

if (event.rivetMessageIndex < previousIndex) {
closeReason = "ws.message_index_regressed";
sequenceType = "regressed";
} else if (
event.rivetMessageIndex === previousIndex
) {
closeReason = "ws.message_index_duplicate";
sequenceType = "duplicate";
} else {
closeReason = "ws.message_index_skip";
sequenceType = "gap/skipped";
}

logger().warn({
msg: "websocket message index out of sequence",
msg: "hibernatable websocket message index out of sequence, closing connection",
requestId,
actorId,
previousIndex,
expectedIndex: previousIndex + 1,
receivedIndex: event.rivetMessageIndex,
sequenceType:
event.rivetMessageIndex < previousIndex
? "regressed"
: event.rivetMessageIndex ===
previousIndex
? "duplicate"
: "gap/skipped",
sequenceType,
closeReason,
gap:
event.rivetMessageIndex > previousIndex
? event.rivetMessageIndex -
previousIndex -
1
: 0,
});
}

// Update to the highest seen index
if (event.rivetMessageIndex > previousIndex) {
currentEntry.messageIndex = event.rivetMessageIndex;
// Close the WebSocket and skip processing
wsContext.close(1008, closeReason);
return;
}

// Update to the next index
currentEntry.messageIndex = event.rivetMessageIndex;
} else {
this.#hibernatableWebSocketAckQueue.set(requestId, {
requestIdBuf,
messageIndex: event.rivetMessageIndex,
});
}

// Update msgIndex for next WebSocket open msgIndex restoration
const oldMsgIndex = hibernatableWs.msgIndex;
hibernatableWs.msgIndex = event.rivetMessageIndex;
hibernatableWs.lastSeenTimestamp = Date.now();

logger().debug({
msg: "updated hibernatable websocket msgIndex in engine driver",
requestId,
oldMsgIndex: oldMsgIndex.toString(),
newMsgIndex: event.rivetMessageIndex,
actorId,
});
}
} else {
// Warn if we receive a message for a hibernatable websocket but can't find the actor
Expand All @@ -697,6 +709,9 @@ export class EngineActorDriver implements ActorDriver {
hasActor: !!actorHandler?.actor,
});
}

// Process the message after all hibernation logic and validation in case the message is out of order
wsHandlerPromise.then((x) => x.onMessage?.(event, wsContext));
});

websocket.addEventListener("close", (event) => {
Expand Down
Loading