Skip to content

Commit d307ae6

Browse files
committed
fix(rivetkit): properly handle msgIndex for hibernatable websocket reconnection
1 parent c298cf6 commit d307ae6

File tree

2 files changed

+168
-26
lines changed

2 files changed

+168
-26
lines changed

rivetkit-typescript/packages/rivetkit/schemas/actor-persist/v2.bare

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,5 +51,5 @@ type PersistedActor struct {
5151
state: data
5252
connections: list<PersistedConnection>
5353
scheduledEvents: list<PersistedScheduleEvent>
54-
hibernatableWebSocket: list<PersistedHibernatableWebSocket>
54+
hibernatableWebSockets: list<PersistedHibernatableWebSocket>
5555
}

rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts

Lines changed: 167 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ export class EngineActorDriver implements ActorDriver {
7676
#runnerStopped: PromiseWithResolvers<undefined> = promiseWithResolvers();
7777
#isRunnerStopped: boolean = false;
7878

79-
// WebSocket message acknowledgment debouncing
80-
#wsAckQueue: Map<
79+
// WebSocket message acknowledgment debouncing for hibernatable websockets
80+
#hibernatableWebSocketAckQueue: Map<
8181
string,
8282
{ requestIdBuf: ArrayBuffer; messageIndex: number }
8383
> = new Map();
@@ -176,22 +176,29 @@ export class EngineActorDriver implements ActorDriver {
176176
msg: "checking hibernatable websockets",
177177
requestId: idToStr(requestId),
178178
existingHibernatableWebSockets: hibernatableArray.length,
179+
actorId,
179180
});
181+
180182
const existingWs = hibernatableArray.find((conn) =>
181183
arrayBuffersEqual(conn.hibernatableRequestId, requestId),
182184
);
183185

184186
// Determine configuration for new WS
185187
let hibernationConfig: HibernationConfig;
186188
if (existingWs) {
189+
// Convert msgIndex to number, treating -1 as undefined (no messages processed yet)
190+
const lastMsgIndex =
191+
existingWs.msgIndex >= 0n
192+
? Number(existingWs.msgIndex)
193+
: undefined;
187194
logger().debug({
188195
msg: "found existing hibernatable websocket",
189196
requestId: idToStr(requestId),
190-
lastMsgIndex: existingWs.msgIndex,
197+
lastMsgIndex: lastMsgIndex ?? -1,
191198
});
192199
hibernationConfig = {
193200
enabled: true,
194-
lastMsgIndex: Number(existingWs.msgIndex),
201+
lastMsgIndex,
195202
};
196203
} else {
197204
logger().debug({
@@ -268,6 +275,7 @@ export class EngineActorDriver implements ActorDriver {
268275
logger().debug({
269276
msg: "updated existing hibernatable websocket timestamp",
270277
requestId: idToStr(requestId),
278+
currentMsgIndex: existingWs.msgIndex,
271279
});
272280
existingWs.lastSeenTimestamp = Date.now();
273281
} else if (path === PATH_CONNECT) {
@@ -277,7 +285,7 @@ export class EngineActorDriver implements ActorDriver {
277285
msg: "will create hibernatable conn when connection is created",
278286
requestId: idToStr(requestId),
279287
});
280-
// Note: The actual hibernatable connection is created in instance.ts
288+
// Note: The actual hibernatable connection is created in connection-manager.ts
281289
// when createConn is called with a hibernatable requestId
282290
}
283291

@@ -302,7 +310,10 @@ export class EngineActorDriver implements ActorDriver {
302310
//
303311
// Gateway timeout configured to 30s
304312
// https://github.com/rivet-dev/rivet/blob/222dae87e3efccaffa2b503de40ecf8afd4e31eb/engine/packages/pegboard-gateway/src/shared_state.rs#L17
305-
this.#wsAckFlushInterval = setInterval(() => this.#flushWsAcks(), 1000);
313+
this.#wsAckFlushInterval = setInterval(
314+
() => this.#flushHibernatableWebSocketAcks(),
315+
1000,
316+
);
306317
}
307318

308319
async #loadActorHandler(actorId: string): Promise<ActorHandler> {
@@ -321,17 +332,17 @@ export class EngineActorDriver implements ActorDriver {
321332
return handler.actor;
322333
}
323334

324-
#flushWsAcks(): void {
325-
if (this.#wsAckQueue.size === 0) return;
335+
#flushHibernatableWebSocketAcks(): void {
336+
if (this.#hibernatableWebSocketAckQueue.size === 0) return;
326337

327338
for (const {
328339
requestIdBuf: requestId,
329340
messageIndex: index,
330-
} of this.#wsAckQueue.values()) {
341+
} of this.#hibernatableWebSocketAckQueue.values()) {
331342
this.#runner.sendWebsocketMessageAck(requestId, index);
332343
}
333344

334-
this.#wsAckQueue.clear();
345+
this.#hibernatableWebSocketAckQueue.clear();
335346
}
336347

337348
getContext(actorId: string): DriverContext {
@@ -608,39 +619,170 @@ export class EngineActorDriver implements ActorDriver {
608619
invariant(event.rivetRequestId, "missing rivetRequestId");
609620
invariant(event.rivetMessageIndex, "missing rivetMessageIndex");
610621

611-
// Track only the highest seen message index per request
612-
// Convert ArrayBuffer to string for Map key
613-
const currentEntry = this.#wsAckQueue.get(requestId);
614-
if (currentEntry) {
615-
if (event.rivetMessageIndex > currentEntry.messageIndex) {
616-
currentEntry.messageIndex = event.rivetMessageIndex;
617-
} else {
618-
logger().warn({
619-
msg: "received lower index than ack queue for message",
622+
// Handle hibernatable WebSockets:
623+
// - Save msgIndex for WS restoration
624+
// - Queue WS acks
625+
const actorHandler = this.#actors.get(actorId);
626+
if (actorHandler?.actor) {
627+
const hibernatableWs = actorHandler.actor[
628+
ACTOR_INSTANCE_PERSIST_SYMBOL
629+
].hibernatableConns.find((conn: any) =>
630+
arrayBuffersEqual(conn.hibernatableRequestId, requestIdBuf),
631+
);
632+
633+
if (hibernatableWs) {
634+
// Update msgIndex for next WebSocket open msgIndex restoration
635+
const oldMsgIndex = hibernatableWs.msgIndex;
636+
hibernatableWs.msgIndex = BigInt(event.rivetMessageIndex);
637+
hibernatableWs.lastSeenTimestamp = Date.now();
638+
639+
logger().debug({
640+
msg: "updated hibernatable websocket msgIndex in engine driver",
620641
requestId,
621-
queuedMessageIndex: currentEntry,
622-
eventMessageIndex: event.rivetMessageIndex,
642+
oldMsgIndex: oldMsgIndex.toString(),
643+
newMsgIndex: event.rivetMessageIndex,
644+
actorId,
623645
});
646+
647+
// Track msgIndex for sending acks
648+
const currentEntry =
649+
this.#hibernatableWebSocketAckQueue.get(requestId);
650+
if (currentEntry) {
651+
const previousIndex = currentEntry.messageIndex;
652+
653+
// Warn about any non-sequential message indices
654+
if (event.rivetMessageIndex !== previousIndex + 1) {
655+
logger().warn({
656+
msg: "websocket message index out of sequence",
657+
requestId,
658+
actorId,
659+
previousIndex,
660+
expectedIndex: previousIndex + 1,
661+
receivedIndex: event.rivetMessageIndex,
662+
sequenceType:
663+
event.rivetMessageIndex < previousIndex
664+
? "regressed"
665+
: event.rivetMessageIndex ===
666+
previousIndex
667+
? "duplicate"
668+
: "gap/skipped",
669+
gap:
670+
event.rivetMessageIndex > previousIndex
671+
? event.rivetMessageIndex -
672+
previousIndex -
673+
1
674+
: 0,
675+
});
676+
}
677+
678+
// Update to the highest seen index
679+
if (event.rivetMessageIndex > previousIndex) {
680+
currentEntry.messageIndex = event.rivetMessageIndex;
681+
}
682+
} else {
683+
this.#hibernatableWebSocketAckQueue.set(requestId, {
684+
requestIdBuf,
685+
messageIndex: event.rivetMessageIndex,
686+
});
687+
}
624688
}
625689
} else {
626-
this.#wsAckQueue.set(requestId, {
627-
requestIdBuf,
690+
// Warn if we receive a message for a hibernatable websocket but can't find the actor
691+
logger().warn({
692+
msg: "received websocket message but actor not found for hibernatable tracking",
693+
actorId,
694+
requestId,
628695
messageIndex: event.rivetMessageIndex,
696+
hasHandler: !!actorHandler,
697+
hasActor: !!actorHandler?.actor,
629698
});
630699
}
631700
});
632701

633702
websocket.addEventListener("close", (event) => {
634703
// Flush any pending acks before closing
635-
this.#flushWsAcks();
704+
this.#flushHibernatableWebSocketAcks();
705+
706+
// Clean up hibernatable WebSocket
707+
this.#cleanupHibernatableWebSocket(
708+
actorId,
709+
requestIdBuf,
710+
requestId,
711+
"close",
712+
event,
713+
);
714+
636715
wsHandlerPromise.then((x) => x.onClose?.(event, wsContext));
637716
});
638717

639718
websocket.addEventListener("error", (event) => {
719+
// Clean up hibernatable WebSocket on error
720+
this.#cleanupHibernatableWebSocket(
721+
actorId,
722+
requestIdBuf,
723+
requestId,
724+
"error",
725+
event,
726+
);
727+
640728
wsHandlerPromise.then((x) => x.onError?.(event, wsContext));
641729
});
642730
}
643731

732+
/**
733+
* Helper method to clean up hibernatable WebSocket entries
734+
* Eliminates duplication between close and error handlers
735+
*/
736+
#cleanupHibernatableWebSocket(
737+
actorId: string,
738+
requestIdBuf: ArrayBuffer,
739+
requestId: string,
740+
eventType: "close" | "error",
741+
event?: any,
742+
) {
743+
const actorHandler = this.#actors.get(actorId);
744+
if (actorHandler?.actor) {
745+
const hibernatableArray =
746+
actorHandler.actor[ACTOR_INSTANCE_PERSIST_SYMBOL].hibernatableConns;
747+
const wsIndex = hibernatableArray.findIndex((conn: any) =>
748+
arrayBuffersEqual(conn.hibernatableRequestId, requestIdBuf),
749+
);
750+
751+
if (wsIndex !== -1) {
752+
const removed = hibernatableArray.splice(wsIndex, 1);
753+
const logData: any = {
754+
msg: `removed hibernatable websocket on ${eventType}`,
755+
requestId,
756+
actorId,
757+
removedMsgIndex:
758+
removed[0]?.msgIndex?.toString() ?? "unknown",
759+
};
760+
// Add error context if this is an error event
761+
if (eventType === "error" && event) {
762+
logData.error = event;
763+
}
764+
logger().debug(logData);
765+
}
766+
} else {
767+
// Warn if actor not found during cleanup
768+
const warnData: any = {
769+
msg: `websocket ${eventType === "close" ? "closed" : "error"} but actor not found for hibernatable cleanup`,
770+
actorId,
771+
requestId,
772+
hasHandler: !!actorHandler,
773+
hasActor: !!actorHandler?.actor,
774+
};
775+
// Add error context if this is an error event
776+
if (eventType === "error" && event) {
777+
warnData.error = event;
778+
}
779+
logger().warn(warnData);
780+
}
781+
782+
// Also remove from ack queue
783+
this.#hibernatableWebSocketAckQueue.delete(requestId);
784+
}
785+
644786
startSleep(actorId: string) {
645787
this.#runner.sleepActor(actorId);
646788
}
@@ -700,7 +842,7 @@ export class EngineActorDriver implements ActorDriver {
700842
}
701843

702844
// Flush any remaining acks
703-
this.#flushWsAcks();
845+
this.#flushHibernatableWebSocketAcks();
704846

705847
await this.#runner.shutdown(immediate);
706848
}

0 commit comments

Comments
 (0)