Skip to content

Conversation

@MasterPtato
Copy link
Contributor

No description provided.

@vercel
Copy link

vercel bot commented Nov 11, 2025

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Preview Comments Updated (UTC)
rivet-inspector Ready Ready Preview Comment Nov 11, 2025 1:46am
rivetkit-serverless Ready Ready Preview Comment Nov 11, 2025 1:46am
2 Skipped Deployments
Project Deployment Preview Comments Updated (UTC)
rivet-cloud Ignored Ignored Nov 11, 2025 1:46am
rivet-site Ignored Ignored Preview Nov 11, 2025 1:46am

Copy link
Contributor Author

MasterPtato commented Nov 11, 2025

Warning

This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
Learn more


How to use the Graphite Merge Queue

Add the label merge-queue to this PR to add it to the merge queue.

You must have a Graphite account in order to use the merge queue. Sign up using this link.

An organization admin has enabled the Graphite Merge Queue in this repository.

Please do not merge from GitHub as this will restart CI on PRs being processed by the merge queue.

This stack of pull requests is managed by Graphite. Learn more about stacking.

@claude
Copy link

claude bot commented Nov 11, 2025

PR Review: fix(guard): handle websocket tasks during shutdown

Summary

This PR introduces a TaskGroup abstraction to properly track and wait for WebSocket and analytics tasks during guard shutdown. This is an important fix for ensuring graceful shutdown behavior.

Code Quality & Best Practices ✅

Positives:

  • Well-structured TaskGroup implementation with clear separation of concerns
  • Follows Rust idioms with proper use of Arc, atomics, and async patterns
  • Good use of tracing instrumentation throughout
  • Consistent with project conventions (hard tabs, structured logging)
  • Clean integration into existing codebase

Minor Issues:

  1. Missing Default trait (task_group.rs:13-19)
    The TaskGroup::new() method could benefit from implementing Default:

    impl Default for TaskGroup {
        fn default() -> Arc<Self> {
            Arc::new(Self {
                running_count: AtomicUsize::new(0),
                notify: Notify::new(),
            })
        }
    }
  2. Comment consistency (config/runtime.rs:11-16)
    Good fix changing // to /// for doc comments, but these should be complete sentences:

    /// Time (in seconds) to allow guard to wait for pending requests after receiving SIGTERM.
    /// Defaults to 1 hour.
    

Potential Bugs or Issues ⚠️

  1. Race condition in wait_idle (task_group.rs:42-55)
    There's a potential race condition between lines 44 and 50. A task could complete between the initial check and calling notified(), causing the function to wait unnecessarily for the next notification. Consider this pattern:

    pub async fn wait_idle(&self) {
        loop {
            if self.running_count.load(Ordering::Acquire) == 0 {
                return;
            }
            self.notify.notified().await;
        }
    }

    This eliminates the "fast path" but ensures correctness.

  2. Missing spawn result handling (task_group.rs:28-38)
    The tokio::spawn result (a JoinHandle) is being dropped. While this is intentional for fire-and-forget tasks, consider documenting this behavior or tracking panics:

    let handle = tokio::spawn(/* ... */);
    // Explicitly drop to make intent clear
    drop(handle);
  3. Moved async task (proxy_service.rs:814-819)
    The release_in_flight task was moved from a defer-like macro to after the response. This changes timing - the counter is now released after the response is sent rather than when the function scope ends. This could affect rate limiting accuracy during high load. Was this intentional? Consider documenting the reasoning.

Performance Considerations 🚀

  1. Memory ordering (task_group.rs:25,33)

    • Line 25 uses Ordering::Relaxed for increment, which is fine
    • Line 33 uses Ordering::AcqRel for decrement, which is appropriate
    • Consider using Ordering::Release on increment and Ordering::Acquire on decrement for consistency, though current implementation is correct
  2. Notify overhead (task_group.rs:34)
    notify_waiters() is called on every task completion when count hits zero. This is efficient, but in high-churn scenarios (many tasks starting/stopping), consider if this could cause contention. Current implementation is likely fine for guard's use case.

  3. TaskGroup allocation (proxy_service.rs:382)
    TaskGroup::new() returns an Arc directly, which is ergonomic but means ProxyState holds an Arc<TaskGroup> (line 354). This is fine but adds one extra indirection. Could consider:

    tasks: TaskGroup,  // and adjust new() to not wrap in Arc

    But current design is more flexible for sharing.

Security Concerns 🔒

No security issues identified. The changes improve security by ensuring proper cleanup during shutdown, preventing potential resource leaks.

Test Coverage 📊

Missing:

  • No unit tests for TaskGroup
  • No integration tests verifying shutdown behavior with active WebSocket connections
  • No tests for the race condition scenarios

Recommendations:

  1. Add unit tests for TaskGroup:

    #[tokio::test]
    async fn test_wait_idle_with_no_tasks() {
        let tg = TaskGroup::new();
        tg.wait_idle().await; // Should return immediately
    }
    
    #[tokio::test]
    async fn test_wait_idle_with_tasks() {
        let tg = TaskGroup::new();
        tg.spawn(async { tokio::time::sleep(Duration::from_millis(100)).await });
        tg.wait_idle().await; // Should wait
    }
  2. Add integration test verifying shutdown waits for WebSocket tasks to complete

Additional Notes

  1. Frontend changes (frontend/src/routeTree.gen.ts)
    This appears to be auto-generated code with only formatting changes. Ensure the generator is deterministic to avoid noise in future PRs.

  2. Version bump (openapi.json)
    Version bumped to 2.0.24-rc.1 - appropriate for a bug fix.

  3. Unused variable cleanup (service-manager/src/lib.rs:163, gasoline/src/worker.rs:326)
    Good cleanup of unused variables and improved logging.

Verdict

Overall: Approve with suggestions

This is a solid fix for an important shutdown issue. The TaskGroup abstraction is well-designed and the integration is clean. The main concerns are:

  1. The potential race condition in wait_idle (should be addressed)
  2. Missing test coverage (should be added before merge)
  3. Verify the timing change for release_in_flight is intentional

The code quality is good and follows project conventions. With tests added and the race condition addressed, this will be a strong contribution.

@pkg-pr-new
Copy link

pkg-pr-new bot commented Nov 11, 2025

More templates

@rivetkit/actor

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/actor@3448

@rivetkit/cloudflare-workers

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/cloudflare-workers@3448

@rivetkit/core

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/core@3448

@rivetkit/db

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/db@3448

@rivetkit/framework-base

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/framework-base@3448

@rivetkit/next-js

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/next-js@3448

@rivetkit/react

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/react@3448

rivetkit

pnpm add https://pkg.pr.new/rivet-dev/rivet/rivetkit@3448

@rivetkit/sql-loader

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/sql-loader@3448

@rivetkit/engine-runner

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/engine-runner@3448

@rivetkit/engine-runner-protocol

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/engine-runner-protocol@3448

commit: a353c1b

@claude
Copy link

claude bot commented Nov 11, 2025

PR Review: fix(guard): handle websocket tasks during shutdown

Summary

This PR introduces a TaskGroup abstraction to track WebSocket and other background tasks during guard shutdown, ensuring they complete gracefully before the service terminates. The implementation is solid and addresses a real shutdown race condition.

Positive Aspects

Clean abstraction: The TaskGroup implementation is well-designed and focused
Proper shutdown sequencing: The server now waits for both requests (graceful.shutdown()) and background tasks (wait_idle())
Atomic operations: Uses appropriate memory orderings for the counter
Tracing improvements: Better structured logging (e.g., ?remaining_workflows instead of inline formatting)
Code cleanup: Removed the complex defer! macro in favor of simpler explicit spawning
Documentation fixes: Fixed comment syntax (/////) in runtime.rs:12-13

Issues & Recommendations

🔴 Critical: Panic Handling (task_group.rs:27)

The TODO comment indicates panics aren't handled. If a spawned task panics, the counter will never decrement, causing wait_idle() to hang indefinitely during shutdown.

Recommendation: Wrap the future execution to catch panics:

pub fn spawn<F, O>(self: &Arc<Self>, fut: F)
where
	F: Future<Output = O> + Send + 'static,
{
	self.running_count.fetch_add(1, Ordering::Relaxed);

	let self2 = self.clone();
	tokio::spawn(
		async move {
			let _guard = scopeguard::guard((), |_| {
				if self2.running_count.fetch_sub(1, Ordering::AcqRel) == 1 {
					self2.notify.notify_waiters();
				}
			});
			fut.await;
		}
		.in_current_span(),
	);
}

Or use AssertUnwindSafe + catch_unwind if you need panic details.

⚠️ Medium: Potential Race in wait_idle() (task_group.rs:43-56)

There's a subtle race condition between lines 51-52:

self.notify.notified().await;
if self.running_count.load(Ordering::Acquire) == 0 {

If the last task completes between notified().await returning and the count check, we might wait for another notification that never comes.

Recommendation: Use a more robust pattern:

pub async fn wait_idle(&self) {
	loop {
		if self.running_count.load(Ordering::Acquire) == 0 {
			return;
		}
		self.notify.notified().await;
	}
}

This checks the count before waiting, eliminating the gap.

🟡 Minor: Memory Ordering Inconsistency (task_group.rs:25,34)

  • Line 25: fetch_add(1, Ordering::Relaxed)
  • Line 34: fetch_sub(1, Ordering::AcqRel)

While likely correct, the asymmetry is unusual. Since you're using Acquire in wait_idle(), consider using Ordering::Release for the increment to establish proper happens-before relationships.

Recommendation:

self.running_count.fetch_add(1, Ordering::Release);

🟡 Minor: Unused release_in_flight Task (proxy_service.rs:814-819)

The spawned task for release_in_flight is no longer tracked by TaskGroup. This is probably fine since it's a cleanup operation, but consider documenting this decision or adding it to the task group for consistency.

🟡 Minor: Default Implementation

TaskGroup::new() could implement Default for consistency with Rust conventions.

Testing Concerns

⚠️ The existing test (simple_websocket_test2.rs) doesn't validate the shutdown behavior. Consider adding a test that:

  1. Spawns WebSocket tasks via TaskGroup
  2. Initiates shutdown
  3. Verifies all tasks complete before wait_idle() returns
  4. Tests timeout behavior with the configured guard_shutdown_duration

Security & Performance

✅ No security concerns identified
✅ The atomic counter approach is efficient and lock-free
✅ Proper use of tokio::spawn with tracing context preservation

Code Quality

The code follows the repository's conventions well:

  • Hard tabs for indentation ✅
  • Proper error handling patterns ✅
  • Structured logging ✅
  • Workspace dependencies ✅

Verdict

Approve with recommendations - This is a valuable fix for graceful shutdown. The critical panic handling issue should be addressed before merging, but the overall approach is sound.


Review generated with assistance from Claude Code

@MasterPtato MasterPtato force-pushed the 11-10-fix_guard_handle_websocket_tasks_during_shutdown branch from b7ad73c to a353c1b Compare November 11, 2025 01:44
@MasterPtato MasterPtato force-pushed the 11-10-fix_improve_sigterm_handling_for_the_entire_runtime branch from 7aff577 to 9f71238 Compare November 11, 2025 01:44
@claude
Copy link

claude bot commented Nov 11, 2025

PR Review: fix(guard): handle websocket tasks during shutdown

Summary

This PR improves the guard service's shutdown handling by properly tracking and waiting for background WebSocket and analytics tasks to complete. The implementation adds a new TaskGroup primitive to coordinate graceful shutdown.


Code Quality & Architecture

Strengths:

  • Clean, focused implementation with clear separation of concerns
  • The TaskGroup abstraction is well-designed and reusable
  • Proper use of atomic operations for lock-free task counting
  • Follows the project's error handling and logging patterns

Issues Found:

1. Race Condition in TaskGroup (Critical)

engine/packages/guard-core/src/task_group.rs:50-54

There's a potential race condition in wait_idle():

loop {
    self.notify.notified().await;
    if self.running_count.load(Ordering::Acquire) == 0 {
        break;
    }
}

Problem: If all tasks complete between the initial fast-path check (line 45) and calling notified() (line 51), the notification will be missed, causing wait_idle() to hang indefinitely.

Solution: Subscribe to notifications BEFORE checking the count:

pub async fn wait_idle(&self) {
    loop {
        let notified = self.notify.notified();
        if self.running_count.load(Ordering::Acquire) == 0 {
            return;
        }
        notified.await;
    }
}

This ensures we're listening for notifications before checking the count, preventing the race.

2. Missing Panic Handling (High Priority)

engine/packages/guard-core/src/task_group.rs:27

The TODO comment indicates panic handling is missing. If a spawned task panics:

  • The task counter will never decrement
  • wait_idle() will hang forever during shutdown
  • The guard service will not shut down gracefully

Recommendation: Wrap the future in a panic-catching wrapper or use AssertUnwindSafe with catch_unwind:

pub fn spawn<F, O>(self: &Arc<Self>, fut: F)
where
    F: Future<Output = O> + Send + 'static,
{
    self.running_count.fetch_add(1, Ordering::Relaxed);
    
    let self2 = self.clone();
    tokio::spawn(
        async move {
            let _guard = scopeguard::guard((), |_| {
                if self2.running_count.fetch_sub(1, Ordering::AcqRel) == 1 {
                    self2.notify.notify_waiters();
                }
            });
            fut.await;
        }
        .in_current_span(),
    );
}

Or use the scopeguard crate to ensure the counter always decrements.

3. Ordering Consistency (Low Priority)

engine/packages/guard-core/src/task_group.rs:25,34

The increment uses Relaxed ordering while decrement uses AcqRel. For consistency and to ensure visibility of the count changes, consider using AcqRel for both operations, or at minimum Release for increment and Acquire for decrement.

4. Removed defer! Macro Without Cleanup

engine/packages/guard-core/src/proxy_service.rs:2407-2429

The PR removes the defer! macro entirely but doesn't clean up its definition location. The macro was defined in this file but is no longer needed after moving the cleanup to a post-request spawn.

Good: The change to spawn cleanup after the request completes is correct - it prevents blocking the request response.

Action needed: The old macro definition should be removed if it's no longer used anywhere.


Performance Considerations

Positive:

  • Lock-free atomic operations minimize contention
  • The fast-path check in wait_idle() avoids unnecessary async operations
  • Moving release_in_flight after response is correct - prevents blocking the response

Concern:

  • Each spawned task allocates a new tokio task. For high-frequency WebSocket connections, this could add overhead. However, this is acceptable for the guard service's use case.

Security Concerns

No security issues identified. The changes don't introduce new attack surfaces or bypass existing security measures.


Test Coverage

Missing tests for:

  1. TaskGroup race conditions (especially the one identified above)
  2. Panic handling in spawned tasks
  3. Shutdown with pending WebSocket tasks
  4. Timeout behavior during shutdown with hanging tasks

Recommendation: Add integration tests in engine/packages/guard-core/tests/ to verify:

  • Multiple concurrent tasks complete before wait_idle() returns
  • Tasks spawned during wait_idle() are handled correctly
  • Shutdown completes within the configured timeout
  • Panicking tasks don't prevent shutdown

Minor Issues

  1. Comment formatting (engine/packages/config/src/config/runtime.rs:12,14) - Good catch fixing the comment syntax from // to ///
  2. Logging consistency (engine/packages/gasoline/src/worker.rs:326) - Good improvement using structured logging
  3. Frontend route file (frontend/src/routeTree.gen.ts) - This is auto-generated; changes are expected

Recommendations

Must Fix:

  1. ✅ Fix the race condition in TaskGroup::wait_idle()
  2. ✅ Implement panic handling for spawned tasks

Should Fix:
3. Add test coverage for TaskGroup
4. Consider adding a timeout mechanism inside TaskGroup itself
5. Document the memory ordering guarantees in comments

Optional:
6. Add metrics for tracked task count (useful for debugging)
7. Consider adding task naming/identification for better observability


Verdict

This is a solid improvement to shutdown handling, but the race condition and missing panic handling are blockers that must be addressed before merging. Once those critical issues are fixed, this will significantly improve the reliability of the guard service's shutdown process.

The overall architecture is sound and follows best practices. Great work on identifying and solving the WebSocket task tracking problem!

@claude
Copy link

claude bot commented Nov 11, 2025

PR Review: fix(guard): handle websocket tasks during shutdown

Summary

This PR addresses a critical issue where WebSocket and other spawned tasks were not being properly tracked during guard shutdown, potentially leading to abrupt termination of in-flight operations. The solution introduces a TaskGroup utility to track spawned tasks and ensure graceful shutdown.


Code Quality & Best Practices ✅

Strengths:

  • Clean abstraction: The TaskGroup implementation is well-designed and reusable
  • Proper separation of concerns: Task tracking logic is isolated in its own module
  • Consistent with project style: Uses hard tabs, follows Rust naming conventions
  • Good tracing integration: Uses .in_current_span() to preserve trace context across spawned tasks
  • Documentation fixes: Corrected comment syntax from // to /// in runtime.rs

Minor improvements:

  1. Comment formatting: config/runtime.rs:12,14 - Good catch fixing the doc comment syntax
  2. Structured logging: gasoline/worker.rs:326 - Correctly uses ?remaining_workflows instead of the verbose form
  3. Import organization: server.rs:7-16 - Imports now properly grouped with stdlib first, then external crates, then internal modules

Potential Issues & Bugs 🔍

1. Race condition in TaskGroup::wait_idle() - CRITICAL

Location: task_group.rs:43-56

There's a potential race condition between lines 45-46 (fast path check) and lines 50-54 (notification loop). If a task increments the counter between the fast-path check and the notified().await call, and then completes before we start waiting, we'll miss the notification.

Suggested fix:

pub async fn wait_idle(&self) {
    loop {
        // Subscribe to notifications *before* checking the count
        let notified = self.notify.notified();
        
        if self.running_count.load(Ordering::Acquire) == 0 {
            return;
        }
        
        notified.await;
    }
}

2. Memory ordering inconsistency

Location: task_group.rs:25,34

Using Relaxed for increment but AcqRel for decrement is asymmetric. Consider using Release for the increment as well:

self.running_count.fetch_add(1, Ordering::Release);

3. Panic handling TODO - CRITICAL

Location: task_group.rs:27

If a spawned future panics, the counter won't be decremented, causing wait_idle() to hang indefinitely. Consider using scopeguard or similar pattern to ensure cleanup.

4. Defer macro removal

Location: proxy_service.rs:2420-2432 (removed)

Please verify no other files use this macro before removing it.


Performance Considerations ⚡

Positive:

  • Minimal overhead using atomic operations
  • Smart fast path avoids unnecessary notification setup
  • Moved cleanup task spawn after metrics update (more logical)

Potential concerns:

  • Consider adding impl Drop for TaskGroup that warns if tasks are still running when dropped

Security Concerns 🔒

No critical security issues identified

The existing shutdown timeout mechanism properly prevents indefinite hangs. The changes ensure WebSocket connections are properly tracked during shutdown.


Test Coverage 🧪

Missing: No unit tests for TaskGroup or integration tests for graceful shutdown behavior.

Recommended:

  • Test wait_idle() with multiple concurrent tasks
  • Test panic handling (will currently fail)
  • Test race conditions

Additional Observations

  1. Frontend changes: routeTree.gen.ts (+471/-470) appears to be auto-generated reformatting
  2. Service manager cleanup: Removed unused res variable - good cleanup
  3. Newline fix: Added missing EOF newline in Cargo.toml

Recommendation

Overall assessment: Well-implemented fix for a real issue. Core logic is sound.

Action items before merge:

  1. ✅ Fix race condition in wait_idle() (critical)
  2. ✅ Implement panic handling (critical)
  3. ✅ Consider stronger memory ordering (recommended)
  4. ⚠️ Add unit tests for TaskGroup (recommended)
  5. ⚠️ Verify no other code uses removed defer! macro (recommended)

Great work on identifying and addressing this shutdown issue! 🎉

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants