Skip to content

Commit 26c1b12

Browse files
committed
fix(rivetkit): properly handle msgIndex for hibernatable websocket reconnection
1 parent f014009 commit 26c1b12

File tree

8 files changed

+194
-151
lines changed

8 files changed

+194
-151
lines changed

rivetkit-typescript/packages/rivetkit/schemas/actor-persist/v2.bare

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,5 +51,5 @@ type PersistedActor struct {
5151
state: data
5252
connections: list<PersistedConnection>
5353
scheduledEvents: list<PersistedScheduleEvent>
54-
hibernatableWebSocket: list<PersistedHibernatableWebSocket>
54+
hibernatableWebSockets: list<PersistedHibernatableWebSocket>
5555
}

rivetkit-typescript/packages/rivetkit/src/actor/conn.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ export class Conn<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
137137
return false;
138138
}
139139
return (
140-
this.#actor[PERSIST_SYMBOL].hibernatableWebSocket.findIndex((x) =>
140+
this.#actor[PERSIST_SYMBOL].hibernatableWebSockets.findIndex((x) =>
141141
arrayBuffersEqual(
142142
x.requestId,
143143
this.__persist.hibernatableRequestId!,

rivetkit-typescript/packages/rivetkit/src/actor/instance.ts

Lines changed: 18 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -822,7 +822,7 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
822822
msg: "actor restoring",
823823
connections: persistData.connections.length,
824824
hibernatableWebSockets:
825-
persistData.hibernatableWebSocket.length,
825+
persistData.hibernatableWebSockets.length,
826826
});
827827

828828
// Set initial state
@@ -1222,7 +1222,7 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
12221222
// Check if this connection is for a hibernatable websocket
12231223
if (socket.requestIdBuf) {
12241224
const isHibernatable =
1225-
this.#persist.hibernatableWebSocket.findIndex((ws) =>
1225+
this.#persist.hibernatableWebSockets.findIndex((ws) =>
12261226
arrayBuffersEqual(ws.requestId, socket.requestIdBuf!),
12271227
) !== -1;
12281228

@@ -1654,7 +1654,7 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
16541654
/**
16551655
* Handles raw WebSocket connections to the actor.
16561656
*/
1657-
async handleWebSocket(
1657+
async handleRawWebSocket(
16581658
websocket: UniversalWebSocket,
16591659
opts: { request: Request },
16601660
): Promise<void> {
@@ -1672,116 +1672,13 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
16721672
this.#activeRawWebSockets.add(websocket);
16731673
this.#resetSleepTimer();
16741674

1675-
// Track hibernatable WebSockets
1676-
let rivetRequestId: ArrayBuffer | undefined;
1677-
let persistedHibernatableWebSocket:
1678-
| PersistedHibernatableWebSocket
1679-
| undefined;
1680-
1681-
const onSocketOpened = (event: any) => {
1682-
rivetRequestId = event?.rivetRequestId;
1683-
1684-
// Find hibernatable WS
1685-
if (rivetRequestId) {
1686-
const rivetRequestIdLocal = rivetRequestId;
1687-
persistedHibernatableWebSocket =
1688-
this.#persist.hibernatableWebSocket.find((ws) =>
1689-
arrayBuffersEqual(
1690-
ws.requestId,
1691-
rivetRequestIdLocal,
1692-
),
1693-
);
1694-
1695-
if (persistedHibernatableWebSocket) {
1696-
persistedHibernatableWebSocket.lastSeenTimestamp =
1697-
BigInt(Date.now());
1698-
}
1699-
}
1700-
1701-
this.#rLog.debug({
1702-
msg: "actor instance onSocketOpened",
1703-
rivetRequestId,
1704-
isHibernatable: !!persistedHibernatableWebSocket,
1705-
hibernationMsgIndex:
1706-
persistedHibernatableWebSocket?.msgIndex,
1707-
});
1708-
};
1709-
1710-
const onSocketMessage = (event: any) => {
1711-
// Update state of hibernatable WS
1712-
if (persistedHibernatableWebSocket) {
1713-
persistedHibernatableWebSocket.lastSeenTimestamp = BigInt(
1714-
Date.now(),
1715-
);
1716-
persistedHibernatableWebSocket.msgIndex = BigInt(
1717-
event.rivetMessageIndex,
1718-
);
1719-
}
1720-
1721-
this.#rLog.debug({
1722-
msg: "actor instance onSocketMessage",
1723-
rivetRequestId,
1724-
isHibernatable: !!persistedHibernatableWebSocket,
1725-
hibernationMsgIndex:
1726-
persistedHibernatableWebSocket?.msgIndex,
1727-
});
1728-
};
1729-
17301675
const onSocketClosed = (_event: any) => {
1731-
// Remove hibernatable WS
1732-
if (rivetRequestId) {
1733-
const rivetRequestIdLocal = rivetRequestId;
1734-
const wsIndex =
1735-
this.#persist.hibernatableWebSocket.findIndex((ws) =>
1736-
arrayBuffersEqual(
1737-
ws.requestId,
1738-
rivetRequestIdLocal,
1739-
),
1740-
);
1741-
1742-
const removed = this.#persist.hibernatableWebSocket.splice(
1743-
wsIndex,
1744-
1,
1745-
);
1746-
if (removed.length > 0) {
1747-
this.#rLog.debug({
1748-
msg: "removed hibernatable websocket",
1749-
rivetRequestId,
1750-
hibernationMsgIndex:
1751-
persistedHibernatableWebSocket?.msgIndex,
1752-
});
1753-
} else {
1754-
this.#rLog.warn({
1755-
msg: "could not find hibernatable websocket to remove",
1756-
rivetRequestId,
1757-
hibernationMsgIndex:
1758-
persistedHibernatableWebSocket?.msgIndex,
1759-
});
1760-
}
1761-
}
1762-
1763-
this.#rLog.debug({
1764-
msg: "actor instance onSocketMessage",
1765-
rivetRequestId,
1766-
isHibernatable: !!persistedHibernatableWebSocket,
1767-
hibernatableWebSocketCount:
1768-
this.#persist.hibernatableWebSocket.length,
1769-
});
1770-
1771-
// Remove listener and socket from tracking
1772-
try {
1773-
websocket.removeEventListener("open", onSocketOpened);
1774-
websocket.removeEventListener("message", onSocketMessage);
1775-
websocket.removeEventListener("close", onSocketClosed);
1776-
websocket.removeEventListener("error", onSocketClosed);
1777-
} catch {}
1676+
// Remove socket from tracking
17781677
this.#activeRawWebSockets.delete(websocket);
17791678
this.#resetSleepTimer();
17801679
};
17811680

17821681
try {
1783-
websocket.addEventListener("open", onSocketOpened);
1784-
websocket.addEventListener("message", onSocketMessage);
17851682
websocket.addEventListener("close", onSocketClosed);
17861683
websocket.addEventListener("error", onSocketClosed);
17871684
} catch {}
@@ -2288,11 +2185,13 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
22882185
},
22892186
},
22902187
})),
2291-
hibernatableWebSocket: persist.hibernatableWebSocket.map((ws) => ({
2292-
requestId: ws.requestId,
2293-
lastSeenTimestamp: ws.lastSeenTimestamp,
2294-
msgIndex: ws.msgIndex,
2295-
})),
2188+
hibernatableWebSockets: persist.hibernatableWebSockets.map(
2189+
(ws) => ({
2190+
requestId: ws.requestId,
2191+
lastSeenTimestamp: ws.lastSeenTimestamp,
2192+
msgIndex: ws.msgIndex,
2193+
}),
2194+
),
22962195
};
22972196
}
22982197

@@ -2326,11 +2225,13 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
23262225
},
23272226
},
23282227
})),
2329-
hibernatableWebSocket: bareData.hibernatableWebSocket.map((ws) => ({
2330-
requestId: ws.requestId,
2331-
lastSeenTimestamp: ws.lastSeenTimestamp,
2332-
msgIndex: ws.msgIndex,
2333-
})),
2228+
hibernatableWebSockets: bareData.hibernatableWebSockets.map(
2229+
(ws) => ({
2230+
requestId: ws.requestId,
2231+
lastSeenTimestamp: ws.lastSeenTimestamp,
2232+
msgIndex: ws.msgIndex,
2233+
}),
2234+
),
23342235
};
23352236
}
23362237
}

rivetkit-typescript/packages/rivetkit/src/actor/persisted.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ export interface PersistedActor<S, CP, CS, I> {
55
state: S;
66
connections: PersistedConn<CP, CS>[];
77
scheduledEvents: PersistedScheduleEvent[];
8-
hibernatableWebSocket: PersistedHibernatableWebSocket[];
8+
hibernatableWebSockets: PersistedHibernatableWebSocket[];
99
}
1010

1111
/** Object representing connection that gets persisted to storage. */

rivetkit-typescript/packages/rivetkit/src/actor/router-endpoints.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ export async function handleWebSocketConnect(
188188
// Check if this is a hibernatable websocket
189189
const isHibernatable =
190190
!!requestIdBuf &&
191-
actor[PERSIST_SYMBOL].hibernatableWebSocket.findIndex(
191+
actor[PERSIST_SYMBOL].hibernatableWebSockets.findIndex(
192192
(ws) =>
193193
arrayBuffersEqual(ws.requestId, requestIdBuf),
194194
) !== -1;
@@ -616,7 +616,7 @@ export async function handleRawWebSocketHandler(
616616
// Extract rivetRequestId provided by engine runner
617617
const rivetRequestId = evt?.rivetRequestId;
618618
const isHibernatable =
619-
actor[PERSIST_SYMBOL].hibernatableWebSocket.findIndex((ws) =>
619+
actor[PERSIST_SYMBOL].hibernatableWebSockets.findIndex((ws) =>
620620
arrayBuffersEqual(ws.requestId, rivetRequestId),
621621
) !== -1;
622622

@@ -647,7 +647,7 @@ export async function handleRawWebSocketHandler(
647647
});
648648

649649
// Call the actor's onWebSocket handler with the adapted WebSocket
650-
actor.handleWebSocket(adapter, {
650+
actor.handleRawWebSocket(adapter, {
651651
request: newRequest,
652652
});
653653
},

rivetkit-typescript/packages/rivetkit/src/driver-helpers/utils.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ export function serializeEmptyPersistData(
1515
state: bufferToArrayBuffer(cbor.encode(undefined)),
1616
connections: [],
1717
scheduledEvents: [],
18-
hibernatableWebSocket: [],
18+
hibernatableWebSockets: [],
1919
};
2020
return PERSISTED_ACTOR_VERSIONED.serializeWithEmbeddedVersion(persistData);
2121
}

0 commit comments

Comments
 (0)