From 1f9043ebf97ca67000b0a930f552b1be88b05329 Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Tue, 11 Nov 2025 11:47:39 +0100 Subject: [PATCH] feat: make stabilization delay configurable --- ...oadcastAdapter.stabilization-delay.test.ts | 141 ++++++++++++++++++ .../integration/concurrent-operations.test.ts | 26 +++- .../integration/flow-lifecycle.test.ts | 48 ++++-- .../integration/full-stack-dsl.test.ts | 4 +- .../integration/happy-path-e2e.test.ts | 4 +- .../integration/input-validation.test.ts | 24 ++- .../integration/network-resilience.test.ts | 12 +- .../integration/real-flow-execution.test.ts | 28 +++- .../integration/reconnection.test.ts | 20 ++- .../regressions/step-failed-event-bug.test.ts | 4 +- .../wait-for-status-failure.test.ts | 16 +- pkgs/client/src/lib/PgflowClient.ts | 12 +- .../src/lib/SupabaseBroadcastAdapter.ts | 12 +- 13 files changed, 299 insertions(+), 52 deletions(-) create mode 100644 pkgs/client/__tests__/SupabaseBroadcastAdapter.stabilization-delay.test.ts diff --git a/pkgs/client/__tests__/SupabaseBroadcastAdapter.stabilization-delay.test.ts b/pkgs/client/__tests__/SupabaseBroadcastAdapter.stabilization-delay.test.ts new file mode 100644 index 000000000..a9ca9003f --- /dev/null +++ b/pkgs/client/__tests__/SupabaseBroadcastAdapter.stabilization-delay.test.ts @@ -0,0 +1,141 @@ +import { describe, test, expect, vi, beforeEach, afterEach } from 'vitest'; +import { SupabaseBroadcastAdapter } from '../src/lib/SupabaseBroadcastAdapter'; +import { + createMockClient, +} from './helpers/test-utils'; +import { RUN_ID } from './fixtures'; +import { mockChannelSubscription } from './mocks'; + +/** + * Tests for configurable stabilization delay + * Uses fake timers to verify the delay behavior without actual waiting + */ +describe('SupabaseBroadcastAdapter - Configurable Stabilization Delay', () => { + beforeEach(() => { + vi.useFakeTimers(); + // Silence console logs/errors in tests + vi.spyOn(console, 'error').mockImplementation(() => { /* intentionally empty */ }); + vi.spyOn(console, 'log').mockImplementation(() => { /* intentionally empty */ }); + }); + + afterEach(() => { + vi.useRealTimers(); + vi.restoreAllMocks(); + }); + + test('should wait for custom delay before subscription completes', async () => { + const customDelay = 500; + const { client, mocks } = createMockClient(); + + const adapter = new SupabaseBroadcastAdapter(client, { + stabilizationDelayMs: customDelay + }); + + // Setup channel subscription that emits SUBSCRIBED immediately + mockChannelSubscription(mocks); + + // Start subscription (returns promise) + const subscribePromise = adapter.subscribeToRun(RUN_ID); + + // Track whether promise has resolved + let isResolved = false; + subscribePromise.then(() => { isResolved = true; }); + + // Flush only microtasks (not timers) to process the SUBSCRIBED event + await Promise.resolve(); + + // At this point, SUBSCRIBED has been received but we should still be waiting + // for the stabilization delay + expect(isResolved).toBe(false); + + // Advance time by less than custom delay + await vi.advanceTimersByTimeAsync(customDelay - 100); + expect(isResolved).toBe(false); // Still waiting + + // Advance past the custom delay + await vi.advanceTimersByTimeAsync(100); + expect(isResolved).toBe(true); // Now it's ready! + }); + + test('should use default 300ms delay when not configured', async () => { + const { client, mocks } = createMockClient(); + + const adapter = new SupabaseBroadcastAdapter(client); + + mockChannelSubscription(mocks); + + const subscribePromise = adapter.subscribeToRun(RUN_ID); + + let isResolved = false; + subscribePromise.then(() => { isResolved = true; }); + + // Flush only microtasks + await Promise.resolve(); + + // Should NOT be ready before 300ms + await vi.advanceTimersByTimeAsync(299); + expect(isResolved).toBe(false); + + // Should be ready after 300ms + await vi.advanceTimersByTimeAsync(1); + expect(isResolved).toBe(true); + }); + + test('should be immediately ready when delay is 0', async () => { + const { client, mocks } = createMockClient(); + + const adapter = new SupabaseBroadcastAdapter(client, { + stabilizationDelayMs: 0 + }); + + mockChannelSubscription(mocks); + + const subscribePromise = adapter.subscribeToRun(RUN_ID); + + let isResolved = false; + subscribePromise.then(() => { isResolved = true; }); + + // Flush microtasks and timers + await vi.runAllTimersAsync(); + + // Should be ready immediately + expect(isResolved).toBe(true); + }); + + test('should allow different delays for different adapter instances', async () => { + const { client: client1, mocks: mocks1 } = createMockClient(); + const { client: client2, mocks: mocks2 } = createMockClient(); + + const adapter1 = new SupabaseBroadcastAdapter(client1, { + stabilizationDelayMs: 200 + }); + + const adapter2 = new SupabaseBroadcastAdapter(client2, { + stabilizationDelayMs: 400 + }); + + mockChannelSubscription(mocks1); + mockChannelSubscription(mocks2); + + const promise1 = adapter1.subscribeToRun('run-1'); + const promise2 = adapter2.subscribeToRun('run-2'); + + let resolved1 = false; + let resolved2 = false; + promise1.then(() => { resolved1 = true; }); + promise2.then(() => { resolved2 = true; }); + + // Flush microtasks only + await Promise.resolve(); + + // After 200ms, adapter1 should be ready but adapter2 should not + await vi.advanceTimersByTimeAsync(200); + expect(resolved1).toBe(true); + expect(resolved2).toBe(false); + + // After 400ms total, both should be ready + await vi.advanceTimersByTimeAsync(200); + expect(resolved1).toBe(true); + expect(resolved2).toBe(true); + }); +}); diff --git a/pkgs/client/__tests__/integration/concurrent-operations.test.ts b/pkgs/client/__tests__/integration/concurrent-operations.test.ts index 589fecc54..3b613618d 100644 --- a/pkgs/client/__tests__/integration/concurrent-operations.test.ts +++ b/pkgs/client/__tests__/integration/concurrent-operations.test.ts @@ -31,7 +31,9 @@ describe('Concurrent Operations Tests', () => { const sqlClient = new PgflowSqlClient(sql); const supabaseClient = createTestSupabaseClient(); - const pgflowClient = new PgflowClient(supabaseClient); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); // Start flows sequentially to avoid overwhelming the system console.log('=== Starting flows ==='); @@ -100,10 +102,16 @@ describe('Concurrent Operations Tests', () => { const supabaseClient1 = createTestSupabaseClient(); const supabaseClient2 = createTestSupabaseClient(); const supabaseClient3 = createTestSupabaseClient(); - - const pgflowClient1 = new PgflowClient(supabaseClient1); - const pgflowClient2 = new PgflowClient(supabaseClient2); - const pgflowClient3 = new PgflowClient(supabaseClient3); + + const pgflowClient1 = new PgflowClient(supabaseClient1, { + realtimeStabilizationDelayMs: 1000, + }); + const pgflowClient2 = new PgflowClient(supabaseClient2, { + realtimeStabilizationDelayMs: 1000, + }); + const pgflowClient3 = new PgflowClient(supabaseClient3, { + realtimeStabilizationDelayMs: 1000, + }); // Client 1 starts the flow const input = { data: 'multi-client-test' }; @@ -186,7 +194,9 @@ describe('Concurrent Operations Tests', () => { const sqlClient = new PgflowSqlClient(sql); const supabaseClient = createTestSupabaseClient(); - const pgflowClient = new PgflowClient(supabaseClient); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); // Start fewer runs to reduce system load const runs = await Promise.all([ @@ -268,7 +278,9 @@ describe('Concurrent Operations Tests', () => { const sqlClient = new PgflowSqlClient(sql); const supabaseClient = createTestSupabaseClient(); - const pgflowClient = new PgflowClient(supabaseClient); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); // Start flows sequentially for reliability const runA = await pgflowClient.startFlow(flowA.slug, { type: 'flow-a' }); diff --git a/pkgs/client/__tests__/integration/flow-lifecycle.test.ts b/pkgs/client/__tests__/integration/flow-lifecycle.test.ts index 4d0304ba1..f8413aea6 100644 --- a/pkgs/client/__tests__/integration/flow-lifecycle.test.ts +++ b/pkgs/client/__tests__/integration/flow-lifecycle.test.ts @@ -23,7 +23,9 @@ describe('Flow Lifecycle Integration', () => { await sql`SELECT pgflow.add_step(${testFlow.slug}, 'first_step')`; const supabaseClient = createTestSupabaseClient(); - const pgflowClient = new PgflowClient(supabaseClient); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); const input = { url: 'https://example.com' }; const run = await pgflowClient.startFlow(testFlow.slug, input); @@ -51,7 +53,9 @@ describe('Flow Lifecycle Integration', () => { await sql`SELECT pgflow.add_step(${testFlow.slug}, 'step_two')`; const supabaseClient = createTestSupabaseClient(); - const pgflowClient = new PgflowClient(supabaseClient); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); const run = await pgflowClient.startFlow(testFlow.slug, { data: 'test', @@ -83,7 +87,9 @@ describe('Flow Lifecycle Integration', () => { await sql`SELECT pgflow.add_step(${testFlow.slug}, 'step_two')`; const supabaseClient = createTestSupabaseClient(); - const pgflowClient = new PgflowClient(supabaseClient); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); const run = await pgflowClient.startFlow(testFlow.slug, { data: 'consistency-test', @@ -119,7 +125,9 @@ describe('Flow Lifecycle Integration', () => { const sqlClient = new PgflowSqlClient(sql); const supabaseClient = createTestSupabaseClient(); - const pgflowClient = new PgflowClient(supabaseClient); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); const input = { data: 'lifecycle-test' }; const run = await pgflowClient.startFlow(testFlow.slug, input); @@ -154,7 +162,9 @@ describe('Flow Lifecycle Integration', () => { await grantMinimalPgflowPermissions(sql); const supabaseClient = createTestSupabaseClient(); - const pgflowClient = new PgflowClient(supabaseClient); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); await expect( pgflowClient.startFlow('nonexistent-flow', { data: 'test' }) @@ -178,7 +188,9 @@ describe('Flow Lifecycle Integration', () => { const sqlClient = new PgflowSqlClient(sql); const supabaseClient = createTestSupabaseClient(); - const pgflowClient = new PgflowClient(supabaseClient); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); const run = await pgflowClient.startFlow(testFlow.slug, { data: 'will-fail', @@ -212,7 +224,9 @@ describe('Flow Lifecycle Integration', () => { const sqlClient = new PgflowSqlClient(sql); const supabaseClient = createTestSupabaseClient(); - const pgflowClient = new PgflowClient(supabaseClient); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); const originalRun = await pgflowClient.startFlow(testFlow.slug, { data: 'retrieve-test', @@ -255,7 +269,9 @@ describe('Flow Lifecycle Integration', () => { await grantMinimalPgflowPermissions(sql); const supabaseClient = createTestSupabaseClient(); - const pgflowClient = new PgflowClient(supabaseClient); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); const run = await pgflowClient.getRun( '00000000-0000-0000-0000-000000000000' @@ -279,7 +295,9 @@ describe('Flow Lifecycle Integration', () => { await sql`SELECT pgflow.add_step(${testFlow.slug}, 'cached_step')`; const supabaseClient = createTestSupabaseClient(); - const pgflowClient = new PgflowClient(supabaseClient); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); const run1 = await pgflowClient.startFlow(testFlow.slug, { data: 'cache-test', @@ -312,7 +330,9 @@ describe('Flow Lifecycle Integration', () => { await sql`SELECT pgflow.add_step(${testFlow2.slug}, 'step2')`; const supabaseClient = createTestSupabaseClient(); - const pgflowClient = new PgflowClient(supabaseClient); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); const run1 = await pgflowClient.startFlow(testFlow1.slug, { data: 'flow1', @@ -354,7 +374,9 @@ describe('Flow Lifecycle Integration', () => { await sql`SELECT pgflow.add_step(${testFlow.slug}, 'custom_step')`; const supabaseClient = createTestSupabaseClient(); - const pgflowClient = new PgflowClient(supabaseClient); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); const customRunId = `12345678-1234-1234-1234-${Date.now() .toString() @@ -384,7 +406,9 @@ describe('Flow Lifecycle Integration', () => { await sql`SELECT pgflow.add_step(${testFlow.slug}, 'complex_step')`; const supabaseClient = createTestSupabaseClient(); - const pgflowClient = new PgflowClient(supabaseClient); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); const complexInput = { user: { diff --git a/pkgs/client/__tests__/integration/full-stack-dsl.test.ts b/pkgs/client/__tests__/integration/full-stack-dsl.test.ts index a985d0858..b65988803 100644 --- a/pkgs/client/__tests__/integration/full-stack-dsl.test.ts +++ b/pkgs/client/__tests__/integration/full-stack-dsl.test.ts @@ -78,7 +78,9 @@ describe('Full Stack DSL Integration', () => { // 5. Start flow via client const sqlClient = new PgflowSqlClient(sql); const supabaseClient = createTestSupabaseClient(); - const pgflowClient = new PgflowClient(supabaseClient); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); const input = { url: 'https://api.example.com/test' }; const run = await pgflowClient.startFlow(SimpleFlow.slug, input); diff --git a/pkgs/client/__tests__/integration/happy-path-e2e.test.ts b/pkgs/client/__tests__/integration/happy-path-e2e.test.ts index 05d86ac78..512f06bd1 100644 --- a/pkgs/client/__tests__/integration/happy-path-e2e.test.ts +++ b/pkgs/client/__tests__/integration/happy-path-e2e.test.ts @@ -27,7 +27,9 @@ describe('Happy Path E2E Integration', () => { const sqlClient = new PgflowSqlClient(sql); const supabaseClient = createTestSupabaseClient(); - const pgflowClient = new PgflowClient(supabaseClient); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); // Track all events received const receivedRunEvents: any[] = []; diff --git a/pkgs/client/__tests__/integration/input-validation.test.ts b/pkgs/client/__tests__/integration/input-validation.test.ts index 719e9cc46..3e2bc19b1 100644 --- a/pkgs/client/__tests__/integration/input-validation.test.ts +++ b/pkgs/client/__tests__/integration/input-validation.test.ts @@ -32,7 +32,9 @@ describe('Input Validation', () => { )`; const supabaseClient = createTestSupabaseClient(); - const pgflowClient = new PgflowClient(supabaseClient); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); // Act & Assert: Should throw validation error await expect( @@ -77,7 +79,9 @@ describe('Input Validation', () => { )`; const supabaseClient = createTestSupabaseClient(); - const pgflowClient = new PgflowClient(supabaseClient); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); // Act: Should succeed with empty array const run = await pgflowClient.startFlow(testFlow.slug, []); // ✓ Valid array @@ -118,7 +122,9 @@ describe('Input Validation', () => { )`; const supabaseClient = createTestSupabaseClient(); - const pgflowClient = new PgflowClient(supabaseClient); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); // Act: Should succeed with array of items const input = [{ id: 1 }, { id: 2 }, { id: 3 }]; @@ -153,7 +159,9 @@ describe('Input Validation', () => { await sql`SELECT pgflow.add_step(${testFlow.slug}, 'regular_step')`; // Single step const supabaseClient = createTestSupabaseClient(); - const pgflowClient = new PgflowClient(supabaseClient); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); // Act: Should succeed with object input (no root map validation) const input = { items: [], foo: 'bar' }; @@ -205,7 +213,9 @@ describe('Input Validation', () => { const sqlClient = new PgflowSqlClient(sql); const supabaseClient = createTestSupabaseClient(); - const pgflowClient = new PgflowClient(supabaseClient); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); const run = await pgflowClient.startFlow(testFlow.slug, { data: 'test' }); @@ -287,7 +297,9 @@ describe('Input Validation', () => { const sqlClient = new PgflowSqlClient(sql); const supabaseClient = createTestSupabaseClient(); - const pgflowClient = new PgflowClient(supabaseClient); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); const run = await pgflowClient.startFlow(testFlow.slug, { data: 'test' }); diff --git a/pkgs/client/__tests__/integration/network-resilience.test.ts b/pkgs/client/__tests__/integration/network-resilience.test.ts index 29ebf5f15..9166f639a 100644 --- a/pkgs/client/__tests__/integration/network-resilience.test.ts +++ b/pkgs/client/__tests__/integration/network-resilience.test.ts @@ -26,7 +26,9 @@ describe('Network Resilience Tests', () => { const sqlClient = new PgflowSqlClient(sql); const supabaseClient = createTestSupabaseClient(); - const pgflowClient = new PgflowClient(supabaseClient); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); const input = { data: 'resilience-test' }; const run = await pgflowClient.startFlow(testFlow.slug, input); @@ -131,7 +133,9 @@ describe('Network Resilience Tests', () => { const sqlClient = new PgflowSqlClient(sql); const supabaseClient = createTestSupabaseClient(); - const pgflowClient = new PgflowClient(supabaseClient); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); // Track subscription status changes const connectionEvents: string[] = []; @@ -182,7 +186,9 @@ describe('Network Resilience Tests', () => { const sqlClient = new PgflowSqlClient(sql); const supabaseClient = createTestSupabaseClient(); - const pgflowClient = new PgflowClient(supabaseClient); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); const input = { data: 'poor-network-test' }; const run = await pgflowClient.startFlow(testFlow.slug, input); diff --git a/pkgs/client/__tests__/integration/real-flow-execution.test.ts b/pkgs/client/__tests__/integration/real-flow-execution.test.ts index 50e286475..915ce22f3 100644 --- a/pkgs/client/__tests__/integration/real-flow-execution.test.ts +++ b/pkgs/client/__tests__/integration/real-flow-execution.test.ts @@ -27,7 +27,9 @@ describe('Real Flow Execution', () => { // Create PgflowClient and start flow const supabaseClient = createTestSupabaseClient(); - const pgflowClient = new PgflowClient(supabaseClient); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); const input = { data: 'test-input' }; const run = await pgflowClient.startFlow(testFlow.slug, input); @@ -95,7 +97,9 @@ describe('Real Flow Execution', () => { // Create PgflowClient and start flow const supabaseClient = createTestSupabaseClient(); - const pgflowClient = new PgflowClient(supabaseClient); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); const input = { foo: 'bar' }; const run = await pgflowClient.startFlow(testFlow.slug, input); @@ -156,7 +160,9 @@ describe('Real Flow Execution', () => { const sqlClient = new PgflowSqlClient(sql); const supabaseClient = createTestSupabaseClient(); - const pgflowClient = new PgflowClient(supabaseClient); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); // Start flow - root step starts in this transaction const run = await pgflowClient.startFlow(testFlow.slug, { test: 'data' }); @@ -196,7 +202,9 @@ describe('Real Flow Execution', () => { const sqlClient = new PgflowSqlClient(sql); const supabaseClient = createTestSupabaseClient(); - const pgflowClient = new PgflowClient(supabaseClient); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); // Start flow - only root_step starts const run = await pgflowClient.startFlow(testFlow.slug, { test: 'data' }); @@ -285,7 +293,9 @@ describe('Real Flow Execution', () => { )`; const supabaseClient = createTestSupabaseClient(); - const pgflowClient = new PgflowClient(supabaseClient); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); // Start flow with empty array (root map steps expect array input) const run = await pgflowClient.startFlow(testFlow.slug, []); @@ -330,7 +340,9 @@ describe('Real Flow Execution', () => { const sqlClient = new PgflowSqlClient(sql); const supabaseClient = createTestSupabaseClient(); - const pgflowClient = new PgflowClient(supabaseClient); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); const run = await pgflowClient.startFlow(testFlow.slug, { test: 'data' }); const rootStep = run.step('root_step'); @@ -383,7 +395,9 @@ describe('Real Flow Execution', () => { const sqlClient = new PgflowSqlClient(sql); const supabaseClient = createTestSupabaseClient(); - const pgflowClient = new PgflowClient(supabaseClient); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); const run = await pgflowClient.startFlow(testFlow.slug, { test: 'data' }); const step = run.step('test_step'); diff --git a/pkgs/client/__tests__/integration/reconnection.test.ts b/pkgs/client/__tests__/integration/reconnection.test.ts index 31e9d6e84..98914c19a 100644 --- a/pkgs/client/__tests__/integration/reconnection.test.ts +++ b/pkgs/client/__tests__/integration/reconnection.test.ts @@ -21,7 +21,9 @@ describe('Reconnection Integration Tests', () => { const sqlClient = new PgflowSqlClient(sql); const supabaseClient = createTestSupabaseClient(); - const pgflowClient = new PgflowClient(supabaseClient); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); // Start a real flow to have something to subscribe to const input = { url: 'https://example.com', data: 'reconnection-test' }; @@ -46,7 +48,9 @@ describe('Reconnection Integration Tests', () => { // Simulate network interruption by creating a new client // This forces the underlying channel to be recreated const newSupabaseClient = createTestSupabaseClient(); - const newPgflowClient = new PgflowClient(newSupabaseClient); + const newPgflowClient = new PgflowClient(newSupabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); // Subscribe with the new client to the same run const newRun = await newPgflowClient.getRun(run.run_id); @@ -97,7 +101,9 @@ describe('Reconnection Integration Tests', () => { const sqlClient = new PgflowSqlClient(sql); const supabaseClient = createTestSupabaseClient(); - const pgflowClient = new PgflowClient(supabaseClient); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); // Start a flow const input = { data: 'state-recovery-test' }; @@ -115,7 +121,9 @@ describe('Reconnection Integration Tests', () => { // Create new client instance (simulates reconnection) const newSupabaseClient = createTestSupabaseClient(); - const newPgflowClient = new PgflowClient(newSupabaseClient); + const newPgflowClient = new PgflowClient(newSupabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); // Retrieve the same run const reconnectedRun = await newPgflowClient.getRun(run.run_id); @@ -162,7 +170,9 @@ describe('Reconnection Integration Tests', () => { const sqlClient = new PgflowSqlClient(sql); const supabaseClient = createTestSupabaseClient(); - const pgflowClient = new PgflowClient(supabaseClient); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); // Start a flow const input = { data: 'rapid-test' }; diff --git a/pkgs/client/__tests__/integration/regressions/step-failed-event-bug.test.ts b/pkgs/client/__tests__/integration/regressions/step-failed-event-bug.test.ts index d18b0454f..cc8f01606 100644 --- a/pkgs/client/__tests__/integration/regressions/step-failed-event-bug.test.ts +++ b/pkgs/client/__tests__/integration/regressions/step-failed-event-bug.test.ts @@ -25,7 +25,9 @@ describe('Step Failed Event Broadcasting', () => { // Create clients const supabaseClient = createTestSupabaseClient(); - const pgflowClient = new PgflowClient(supabaseClient); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); const sqlClient = new PgflowSqlClient(sql); // Start the flow diff --git a/pkgs/client/__tests__/integration/wait-for-status-failure.test.ts b/pkgs/client/__tests__/integration/wait-for-status-failure.test.ts index 59ec8fd78..0fcaa1795 100644 --- a/pkgs/client/__tests__/integration/wait-for-status-failure.test.ts +++ b/pkgs/client/__tests__/integration/wait-for-status-failure.test.ts @@ -25,7 +25,9 @@ describe('waitForStatus - Failure Scenarios', () => { const sqlClient = new PgflowSqlClient(sql); const supabaseClient = createTestSupabaseClient(); - const pgflowClient = new PgflowClient(supabaseClient); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); const run = await pgflowClient.startFlow(testFlow.slug, { test: 'will-fail' }); const step = run.step('failing_step'); @@ -79,7 +81,9 @@ describe('waitForStatus - Failure Scenarios', () => { const sqlClient = new PgflowSqlClient(sql); const supabaseClient = createTestSupabaseClient(); - const pgflowClient = new PgflowClient(supabaseClient); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); const run = await pgflowClient.startFlow(testFlow.slug, { test: 'run-will-fail' }); @@ -128,7 +132,9 @@ describe('waitForStatus - Failure Scenarios', () => { await sql`SELECT pgflow.add_step(${testFlow.slug}, 'normal_step')`; const supabaseClient = createTestSupabaseClient(); - const pgflowClient = new PgflowClient(supabaseClient); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); const run = await pgflowClient.startFlow(testFlow.slug, { test: 'timeout' }); const step = run.step('normal_step'); @@ -166,7 +172,9 @@ describe('waitForStatus - Failure Scenarios', () => { const sqlClient = new PgflowSqlClient(sql); const supabaseClient = createTestSupabaseClient(); - const pgflowClient = new PgflowClient(supabaseClient); + const pgflowClient = new PgflowClient(supabaseClient, { + realtimeStabilizationDelayMs: 1000, + }); const run = await pgflowClient.startFlow(testFlow.slug, { test: 'multi-fail' }); const step = run.step('step_one'); diff --git a/pkgs/client/src/lib/PgflowClient.ts b/pkgs/client/src/lib/PgflowClient.ts index 7843fd45d..ce50ec455 100644 --- a/pkgs/client/src/lib/PgflowClient.ts +++ b/pkgs/client/src/lib/PgflowClient.ts @@ -28,10 +28,18 @@ export class PgflowClient implements IFlowClien * Creates a new PgflowClient instance * * @param supabaseClient - Supabase client instance + * @param opts - Optional configuration */ - constructor(supabaseClient: SupabaseClient) { + constructor( + supabaseClient: SupabaseClient, + opts: { + realtimeStabilizationDelayMs?: number; + } = {} + ) { this.#supabase = supabaseClient; - this.#realtimeAdapter = new SupabaseBroadcastAdapter(supabaseClient); + this.#realtimeAdapter = new SupabaseBroadcastAdapter(supabaseClient, { + stabilizationDelayMs: opts.realtimeStabilizationDelayMs, + }); // Set up global event listeners - properly typed this.#realtimeAdapter.onRunEvent((event: BroadcastRunEvent) => { diff --git a/pkgs/client/src/lib/SupabaseBroadcastAdapter.ts b/pkgs/client/src/lib/SupabaseBroadcastAdapter.ts index 802771909..711a586f4 100644 --- a/pkgs/client/src/lib/SupabaseBroadcastAdapter.ts +++ b/pkgs/client/src/lib/SupabaseBroadcastAdapter.ts @@ -22,8 +22,9 @@ export class SupabaseBroadcastAdapter implements IFlowRealtime { #channels: Map = new Map(); #emitter = createNanoEvents(); #reconnectionDelay: number; + #stabilizationDelay: number; #schedule: typeof setTimeout; - + /** * Creates a new instance of SupabaseBroadcastAdapter @@ -32,10 +33,15 @@ export class SupabaseBroadcastAdapter implements IFlowRealtime { */ constructor( supabase: SupabaseClient, - opts: { reconnectDelayMs?: number; schedule?: typeof setTimeout } = {} + opts: { + reconnectDelayMs?: number; + stabilizationDelayMs?: number; + schedule?: typeof setTimeout; + } = {} ) { this.#supabase = supabase; this.#reconnectionDelay = opts.reconnectDelayMs ?? 2000; + this.#stabilizationDelay = opts.stabilizationDelayMs ?? 300; this.#schedule = opts.schedule ?? setTimeout; } @@ -351,7 +357,7 @@ export class SupabaseBroadcastAdapter implements IFlowRealtime { // The SUBSCRIBED event is emitted before backend routing is fully established. // This delay ensures the backend can receive messages sent immediately after subscription. // See: https://github.com/supabase/supabase-js/issues/1599 - await new Promise(resolve => setTimeout(resolve, 300)); + await new Promise(resolve => this.#schedule(resolve, this.#stabilizationDelay)); this.#channels.set(run_id, channel);