-
Notifications
You must be signed in to change notification settings - Fork 131
fix(rivetkit): properly handle msgIndex for hibernatable websocket reconnection #3401
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 11-08-chore_rivetkit_rename_onstart_onstop_-_onwake_onsleep
Are you sure you want to change the base?
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
3 Skipped Deployments
|
cf4dd44 to
8f43e55
Compare
Code Review for PR #3401SummaryThis PR fixes hibernatable WebSocket reconnection by properly handling
Positive Aspects1. Improved Separation of Concerns 2. Proper msgIndex Initialization // src/drivers/engine/actor-driver.ts:191-195
const lastMsgIndex =
existingWs.msgIndex >= 0n
? Number(existingWs.msgIndex)
: undefined;Good handling of the sentinel value 3. Message Sequence Gap Detection // src/drivers/engine/actor-driver.ts:652-668
if (event.rivetMessageIndex > currentEntry.messageIndex + 1) {
logger().warn({
msg: "detected gap in websocket message sequence",
// ...
});
}Excellent defensive programming - this will help detect message loss or ordering issues early. 4. Comprehensive Logging
Issues & Concerns1. CRITICAL: Potential Race Condition with msgIndex Updates The msgIndex is now updated directly in the message event handler (actor-driver.ts:633) without any synchronization: hibernatableWs.msgIndex = BigInt(event.rivetMessageIndex);However, the actor's persist data is accessed from multiple places:
Concern: If a persistence operation reads the hibernatableWebSockets array while a message handler is updating msgIndex, there could be race conditions. TypeScript/JavaScript is single-threaded, but async operations could interleave. Recommendation: Consider if the msgIndex updates should be queued and applied during the next persistence cycle, or if explicit locking/coordination is needed. 2. Missing Event Listener Cleanup in handleRawWebSocket() The simplified const onSocketClosed = (_event: any) => {
this.#activeRawWebSockets.delete(websocket);
this.#resetSleepTimer();
};
try {
websocket.addEventListener("close", onSocketClosed);
websocket.addEventListener("error", onSocketClosed);
} catch {}Issue: The event listeners are never explicitly removed. While they'll be garbage collected when the websocket is closed, the old code explicitly removed them. Recommendation: Add cleanup in the finally block or ensure the listeners are removed on close: const onSocketClosed = (_event: any) => {
try {
websocket.removeEventListener("close", onSocketClosed);
websocket.removeEventListener("error", onSocketClosed);
} catch {}
this.#activeRawWebSockets.delete(websocket);
this.#resetSleepTimer();
};3. Inconsistent Type Handling In hibernatableWebSockets: bareData.hibernatableWebSockets.map(
(ws: any) => ({
requestId: ws.requestId,
// ...
}),
),Issue: Using Recommendation: Import and use the proper type from the BARE schema definitions. 4. Potential Memory Leak in Ack Queue The Current behavior:
Issue: If a websocket closes between flush intervals, and the close event doesn't trigger for some reason, the entry could leak. Recommendation: Explicitly remove from ack queue in the WebSocket close handler, before flushing. 5. Missing msgIndex Update on WebSocket Close When a hibernatable WebSocket closes, the code removes it from the tracking array but doesn't verify the msgIndex is up-to-date. Consider: Should the close handler flush pending acks before removing from hibernatableWebSockets array? The current order (line 699) flushes acks, but by that point, if the actor has been destroyed, the msgIndex might not be saved. 6. Lack of Test Coverage No test files were modified/added. Given the critical nature of msgIndex tracking for message ordering and the complexity of hibernation: Recommendation: Add tests covering:
Minor Issues7. Method Rename May Break External Code The rename from Impact: Any external code calling 8. Unclear Comment About "-1" in Logging // actor-driver.ts:199
lastMsgIndex: lastMsgIndex ?? -1,This logs Recommendation: Log Performance ConsiderationsPositive:
Concern:
Security ConsiderationsNo obvious security issues. The changes are mostly refactoring and don't introduce new attack surfaces. Proper input validation ( Summary of RecommendationsHigh Priority:
Medium Priority: Low Priority: Overall, this is a solid refactoring that improves code organization and fixes the msgIndex tracking issue. The main concerns are around potential race conditions and missing cleanup logic. With the recommended fixes, this should be good to merge. |
More templates
@rivetkit/actor
@rivetkit/cloudflare-workers
@rivetkit/core
@rivetkit/db
@rivetkit/framework-base
@rivetkit/next-js
@rivetkit/react
rivetkit
@rivetkit/sql-loader
@rivetkit/engine-runner
@rivetkit/engine-runner-protocol
commit: |
Code Review: Fix hibernatable WebSocket msgIndex handlingSummaryThis PR fixes a critical issue with message index tracking for hibernatable WebSocket reconnections. The changes move msgIndex management from the actor instance layer to the engine driver layer, ensuring proper message sequencing during WebSocket hibernation/restoration. ✅ Positive Aspects
🐛 Potential Issues1. Race Condition in Message Handler (Medium Priority)Location: The message event handler looks up the actor and hibernatable WebSocket on every message: const actorHandler = this.#actors.get(actorId);
if (actorHandler?.actor) {
const hibernatableWs = actorHandler.actor[PERSIST_SYMBOL]
.hibernatableWebSockets.find((ws: any) => ...);Issue: If the actor is unloaded/hibernated between messages, this lookup could fail even for valid hibernatable WebSockets. The warning at line 687-694 suggests this scenario is considered, but it might be expected behavior rather than an error condition. Suggestion: Consider documenting when this warning is expected vs. when it indicates a real problem, or adjust the log level. 2. Missing WebSocket Cleanup Logic (High Priority)Location: The old code (removed in this PR) had logic to remove hibernatable WebSockets from the persist array when they closed: // Old code (removed):
const wsIndex = this.#persist.hibernatableWebSocket.findIndex(...);
const removed = this.#persist.hibernatableWebSocket.splice(wsIndex, 1);Issue: The new code at
Suggestion: Add cleanup logic in the close handler or implement a TTL-based cleanup mechanism. The 3. Type Safety Issue (Low Priority)Location: .hibernatableWebSockets.find((ws: any) => ...);Using ⚡ Performance Considerations
🔒 Security ConcernsNo significant security issues identified. The code properly:
Minor suggestion: Consider sanitizing or truncating 🧪 Test CoverageCRITICAL ISSUE: No test coverage found for hibernatable WebSocket functionality. Required Tests:
Recommendation: Add tests in 📝 Additional Observations
📋 RecommendationsBefore Merge:
Nice to Have: VerdictThe core fix is solid and addresses the msgIndex tracking issue correctly. However, the missing cleanup logic is a significant concern that should be addressed before merge. The lack of test coverage for this critical functionality is also concerning given the complexity of hibernatable WebSocket state management. Recommendation: Request changes to add cleanup logic and basic test coverage before merging. |
8f43e55 to
c7a52ec
Compare
PR Review: Fix hibernatable WebSocket msgIndex handlingSummaryThis PR fixes a critical bug in the handling of Positive Observations ✅1. Correct Architecture Changes
2. Naming Consistency
3. Good DiagnosticsThe new logging and sequence checking is excellent: if (event.rivetMessageIndex !== previousIndex + 1) {
logger().warn({
msg: "websocket message index out of sequence",
sequenceType: event.rivetMessageIndex < previousIndex ? "regressed"
: event.rivetMessageIndex === previousIndex ? "duplicate"
: "gap/skipped",
gap: event.rivetMessageIndex > previousIndex
? event.rivetMessageIndex - previousIndex - 1
: 0,
});
}4. Proper Initial State HandlingThe conversion of const lastMsgIndex = existingWs.msgIndex >= 0n
? Number(existingWs.msgIndex)
: undefined;Issues & Concerns 🔴1. Critical: Race Condition RiskLocation: The msgIndex update happens in the message event handler, but there's no guarantee it will be persisted before actor hibernation: hibernatableWs.msgIndex = BigInt(event.rivetMessageIndex);
hibernatableWs.lastSeenTimestamp = BigInt(Date.now());Problem: These updates directly modify the persisted state, but Scenario:
Recommendation:
2. Medium: WebSocket Close Cleanup MissingLocation: The old code in // OLD CODE (removed):
const wsIndex = this.#persist.hibernatableWebSocket.findIndex((ws) =>
arrayBuffersEqual(ws.requestId, rivetRequestIdLocal)
);
const removed = this.#persist.hibernatableWebSocket.splice(wsIndex, 1);The new code doesn't remove closed hibernatable WebSockets anywhere. Problem: The Recommendation:
3. Medium: Missing Test CoverageThe existing WebSocket tests (
Recommendation: Add tests for hibernatable WebSocket behavior, especially: test("should properly restore msgIndex on reconnection", async (c) => {
// 1. Connect WebSocket
// 2. Send messages 0-4
// 3. Force hibernation
// 4. Reconnect
// 5. Verify messages 5+ arrive without duplicates of 0-4
});4. Low: Inconsistent Error HandlingLocation: The warning when actor is not found should probably prevent the ack from being queued, but it still allows execution to continue: } else {
logger().warn({
msg: "received websocket message but actor not found for hibernatable tracking",
// ...
});
}
// Code continues, but no ack is queued - is this intentional?Recommendation: Either:
Minor Suggestions 💡1. Type SafetyLine 626: const hibernatableWs = actorHandler.actor[PERSIST_SYMBOL]
.hibernatableWebSockets.find((ws: PersistedHibernatableWebSocket) =>
arrayBuffersEqual(ws.requestId, requestIdBuf)
);2. Magic NumberLine 291: const INITIAL_MSG_INDEX = -1n; // Indicates no messages processed yet3. Logging ConsistencyFollowing CLAUDE.md conventions, log messages should be lowercase unless mentioning code symbols. Most are correct, but verify all instances. Performance Considerations1. Ack Queue FlushingThe 1-second flush interval is reasonable given the 30-second gateway timeout. Consider making this configurable if needed for different deployment scenarios. 2. Linear SearchThe Security ConsiderationsNo security issues identified. The changes maintain proper isolation and don't introduce new attack vectors. VerdictApprove with reservations. The core architecture change is sound, but the race condition risk (#1) and missing cleanup logic (#2) need to be addressed or explicitly documented as acceptable trade-offs. Required Changes:
Recommended Changes:
Great work on identifying and fixing this issue! The diagnostic logging additions will be invaluable for debugging production issues. Just need to close the loop on persistence timing and cleanup. |
c7a52ec to
7255d47
Compare
PR Review: Hibernatable WebSocket msgIndex HandlingSummaryThis PR fixes critical issues with message index ( Code Quality ✅Strengths:
Naming Convention:
Architecture Improvements 🎯Better Design:
Potential Issues
|
7255d47 to
22a431f
Compare
Code ReviewOverviewThis PR fixes the handling of Key Changes✅ Positive Changes
Issues & Concerns🔴 Critical Issues
].hibernatableWebSockets.find((ws: any) =>Using Recommendation: ].hibernatableWebSockets.find((ws: PersistedHibernatableWebSocket) =>
Current: hibernatableWs.msgIndex = BigInt(event.rivetMessageIndex);Concern: No locking mechanism ensures atomic updates. Consider if concurrent message handling could cause issues.
|
22a431f to
b540cc6
Compare
Code Review: Hibernatable WebSocket msgIndex HandlingSummaryThis PR fixes the handling of Positive Aspects
Issues & Concerns1. Critical: Potential msgIndex Race ConditionLocation: const lastMsgIndex = existingWs.msgIndex >= 0n
? Number(existingWs.msgIndex)
: undefined;
hibernationConfig = {
enabled: true,
lastMsgIndex,
};Issue: The msgIndex is read during WebSocket open, but could be updated by concurrent message handlers before the WebSocket is fully established. This could lead to messages being replayed or skipped. Recommendation: Consider adding a lock or using atomic operations to ensure msgIndex is consistent during WebSocket restoration. 2. Bug: Duplicate lastSeenTimestamp UpdatesLocation: The code updates
Issue: For an existing WebSocket that reconnects, both code paths execute, causing redundant updates. Recommendation: The open handler should only update timestamp for new WebSockets, not existing ones that already have their timestamp updated by message handlers. 3. Potential Memory Leak: Incomplete CleanupLocation: Issue: If an actor is destroyed or crashes while WebSockets are active, the cleanup logic only runs on socket close/error events. If the actor terminates abnormally, hibernatable WebSocket entries may remain in memory and the ack queue. Recommendation: Add cleanup in actor shutdown/destroy path to ensure all hibernatable WebSocket state is cleared. 4. Missing Test CoverageIssue: No test files were modified, and grep shows no existing tests for hibernatable WebSocket functionality. Recommendation: Add tests covering:
5. Inconsistent Error HandlingLocation: When the actor is not found:
Issue: These warnings provide debugging info but don't handle the actual problem. The ack queue could accumulate entries for non-existent actors. Recommendation: Consider removing stale ack queue entries when actor not found, not just logging. 6. Type Safety: Unsafe toString() on msgIndexLocation: removedMsgIndex: removed[0]?.msgIndex?.toString() ?? "unknown",Issue: Using optional chaining with Minor: This is a minor issue but affects debugging clarity. 7. Schema Version MigrationLocation: The migration from v1 to v2 sets Question: What happens to existing v1 data that might have had hibernatable connections tracked elsewhere? Is there a migration path or is this a breaking change? Recommendation: Add a comment explaining the migration strategy or confirm this is intentional. Performance Considerations
Security ConcernsNo immediate security issues identified. The code properly validates requestId presence and uses ArrayBuffer equality checks. Style & Convention Issues
Recommendations SummaryMust Fix:
Should Fix:
Nice to Have:
VerdictThe PR makes good architectural improvements by centralizing msgIndex tracking, but needs test coverage and has some edge case handling issues that should be addressed before merging. Rating: Needs work Generated by Claude Code |
b540cc6 to
26c1b12
Compare
Code ReviewSummaryThis PR fixes the handling of msgIndex for hibernatable WebSocket reconnection. The changes focus on:
Positive Observations1. Proper Separation of Concerns ✅
2. Better Error Handling ✅
3. Correct msgIndex Initialization ✅
4. Consistent Naming ✅
Issues & Recommendations🔴 Critical: Message Index Update Timing IssueIn actor-driver.ts:631-633, the msgIndex is updated on the message event: hibernatableWs.msgIndex = BigInt(event.rivetMessageIndex);However, this happens before the actor processes the message. If the actor crashes or hibernates immediately after receiving the message but before processing it, the saved msgIndex will point to an unprocessed message. On reconnection, that message would be skipped. Recommendation: Only update msgIndex after successful message processing, or use an acknowledgment mechanism to ensure messages are processed before updating the index. 🟡 Moderate: Race Condition in Actor LookupIn actor-driver.ts:622-624: const actorHandler = this.#actors.get(actorId);
if (actorHandler?.actor) {The actor could be removed from #actors between checking and using it. While the current code handles this with warnings (lines 686-695), consider:
🟡 Moderate: Potential Memory Leak in Ack QueueThe #hibernatableWebSocketAckQueue is only cleared in:
Issue: If a WebSocket is tracked as hibernatable but never receives messages, it won't be added to the ack queue. However, if it does receive messages but then the cleanup fails (e.g., actor already destroyed), the entry could persist in the ack queue. Recommendation: Add defensive cleanup to ensure ack queue entries are removed when hibernatable WebSocket entries are removed from hibernatableWebSockets array. 🟡 Moderate: Inconsistent Error HandlingIn actor-driver.ts:686-695 and actor-driver.ts:764-773, the code warns when an actor isn't found but continues execution. Consider:
🟢 Minor: Code ClarityLine 192-194 in actor-driver.ts: const lastMsgIndex =
existingWs.msgIndex >= 0n
? Number(existingWs.msgIndex)
: undefined;The comment says "treating -1 as undefined" but the condition checks >= 0n. This means -1n → undefined (correct), but it's clearer to explicitly check for -1n: const lastMsgIndex = existingWs.msgIndex === -1n
? undefined
: Number(existingWs.msgIndex);🟢 Minor: Log Message ClarityLine 749 in actor-driver.ts could be more descriptive. Consider separate messages for close vs error events for better log filtering. 🟢 Minor: Removed Code VerificationThe removed code in instance.ts (lines 1675-1766) tracked socket lifecycle events. Verify that all functionality was preserved:
Testing Concerns❗ No test coverage visible - This is a complex change to hibernation logic. Consider adding tests for:
Performance Considerations✅ Ack Queue Batching - The 1-second flush interval (line 316) is well-reasoned given the 30s gateway timeout Security Considerations✅ No security issues identified - proper validation of rivetRequestId and rivetMessageIndex with invariant SummaryThis is a solid refactoring that improves code organization and observability. The main concern is the message index update timing which could cause message loss during failures. Recommendations:
Overall Assessment: Approve with changes requested - the msgIndex timing issue should be addressed before merging. |
26c1b12 to
5eb1e45
Compare
PR Review: Fix hibernatable WebSocket msgIndex handlingOverall, this is a solid fix that properly addresses msgIndex tracking for hibernatable WebSocket reconnections. The refactoring improves code organization and fixes several critical issues. Here are my findings: ✅ Strengths
|

No description provided.