Skip to content

Commit 771a319

Browse files
committed
feat: make stabilization delay configurable
1 parent 1da476a commit 771a319

13 files changed

+299
-52
lines changed
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
import { describe, test, expect, vi, beforeEach, afterEach } from 'vitest';
2+
import { SupabaseBroadcastAdapter } from '../src/lib/SupabaseBroadcastAdapter';
3+
import {
4+
createMockClient,
5+
} from './helpers/test-utils';
6+
import { RUN_ID } from './fixtures';
7+
import { mockChannelSubscription } from './mocks';
8+
9+
/**
10+
* Tests for configurable stabilization delay
11+
* Uses fake timers to verify the delay behavior without actual waiting
12+
*/
13+
describe('SupabaseBroadcastAdapter - Configurable Stabilization Delay', () => {
14+
beforeEach(() => {
15+
vi.useFakeTimers();
16+
// Silence console logs/errors in tests
17+
vi.spyOn(console, 'error').mockImplementation(() => { /* intentionally empty */ });
18+
vi.spyOn(console, 'log').mockImplementation(() => { /* intentionally empty */ });
19+
});
20+
21+
afterEach(() => {
22+
vi.useRealTimers();
23+
vi.restoreAllMocks();
24+
});
25+
26+
test('should wait for custom delay before subscription completes', async () => {
27+
const customDelay = 500;
28+
const { client, mocks } = createMockClient();
29+
30+
const adapter = new SupabaseBroadcastAdapter(client, {
31+
stabilizationDelayMs: customDelay
32+
});
33+
34+
// Setup channel subscription that emits SUBSCRIBED immediately
35+
mockChannelSubscription(mocks);
36+
37+
// Start subscription (returns promise)
38+
const subscribePromise = adapter.subscribeToRun(RUN_ID);
39+
40+
// Track whether promise has resolved
41+
let isResolved = false;
42+
subscribePromise.then(() => { isResolved = true; });
43+
44+
// Flush only microtasks (not timers) to process the SUBSCRIBED event
45+
await Promise.resolve();
46+
47+
// At this point, SUBSCRIBED has been received but we should still be waiting
48+
// for the stabilization delay
49+
expect(isResolved).toBe(false);
50+
51+
// Advance time by less than custom delay
52+
await vi.advanceTimersByTimeAsync(customDelay - 100);
53+
expect(isResolved).toBe(false); // Still waiting
54+
55+
// Advance past the custom delay
56+
await vi.advanceTimersByTimeAsync(100);
57+
expect(isResolved).toBe(true); // Now it's ready!
58+
});
59+
60+
test('should use default 300ms delay when not configured', async () => {
61+
const { client, mocks } = createMockClient();
62+
63+
const adapter = new SupabaseBroadcastAdapter(client);
64+
65+
mockChannelSubscription(mocks);
66+
67+
const subscribePromise = adapter.subscribeToRun(RUN_ID);
68+
69+
let isResolved = false;
70+
subscribePromise.then(() => { isResolved = true; });
71+
72+
// Flush only microtasks
73+
await Promise.resolve();
74+
75+
// Should NOT be ready before 300ms
76+
await vi.advanceTimersByTimeAsync(299);
77+
expect(isResolved).toBe(false);
78+
79+
// Should be ready after 300ms
80+
await vi.advanceTimersByTimeAsync(1);
81+
expect(isResolved).toBe(true);
82+
});
83+
84+
test('should be immediately ready when delay is 0', async () => {
85+
const { client, mocks } = createMockClient();
86+
87+
const adapter = new SupabaseBroadcastAdapter(client, {
88+
stabilizationDelayMs: 0
89+
});
90+
91+
mockChannelSubscription(mocks);
92+
93+
const subscribePromise = adapter.subscribeToRun(RUN_ID);
94+
95+
let isResolved = false;
96+
subscribePromise.then(() => { isResolved = true; });
97+
98+
// Flush microtasks and timers
99+
await vi.runAllTimersAsync();
100+
101+
// Should be ready immediately
102+
expect(isResolved).toBe(true);
103+
});
104+
105+
test('should allow different delays for different adapter instances', async () => {
106+
const { client: client1, mocks: mocks1 } = createMockClient();
107+
const { client: client2, mocks: mocks2 } = createMockClient();
108+
109+
const adapter1 = new SupabaseBroadcastAdapter(client1, {
110+
stabilizationDelayMs: 200
111+
});
112+
113+
const adapter2 = new SupabaseBroadcastAdapter(client2, {
114+
stabilizationDelayMs: 400
115+
});
116+
117+
mockChannelSubscription(mocks1);
118+
mockChannelSubscription(mocks2);
119+
120+
const promise1 = adapter1.subscribeToRun('run-1');
121+
const promise2 = adapter2.subscribeToRun('run-2');
122+
123+
let resolved1 = false;
124+
let resolved2 = false;
125+
promise1.then(() => { resolved1 = true; });
126+
promise2.then(() => { resolved2 = true; });
127+
128+
// Flush microtasks only
129+
await Promise.resolve();
130+
131+
// After 200ms, adapter1 should be ready but adapter2 should not
132+
await vi.advanceTimersByTimeAsync(200);
133+
expect(resolved1).toBe(true);
134+
expect(resolved2).toBe(false);
135+
136+
// After 400ms total, both should be ready
137+
await vi.advanceTimersByTimeAsync(200);
138+
expect(resolved1).toBe(true);
139+
expect(resolved2).toBe(true);
140+
});
141+
});

pkgs/client/__tests__/integration/concurrent-operations.test.ts

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ describe('Concurrent Operations Tests', () => {
3131

3232
const sqlClient = new PgflowSqlClient(sql);
3333
const supabaseClient = createTestSupabaseClient();
34-
const pgflowClient = new PgflowClient(supabaseClient);
34+
const pgflowClient = new PgflowClient(supabaseClient, {
35+
realtimeStabilizationDelayMs: 1000,
36+
});
3537

3638
// Start flows sequentially to avoid overwhelming the system
3739
console.log('=== Starting flows ===');
@@ -100,10 +102,16 @@ describe('Concurrent Operations Tests', () => {
100102
const supabaseClient1 = createTestSupabaseClient();
101103
const supabaseClient2 = createTestSupabaseClient();
102104
const supabaseClient3 = createTestSupabaseClient();
103-
104-
const pgflowClient1 = new PgflowClient(supabaseClient1);
105-
const pgflowClient2 = new PgflowClient(supabaseClient2);
106-
const pgflowClient3 = new PgflowClient(supabaseClient3);
105+
106+
const pgflowClient1 = new PgflowClient(supabaseClient1, {
107+
realtimeStabilizationDelayMs: 1000,
108+
});
109+
const pgflowClient2 = new PgflowClient(supabaseClient2, {
110+
realtimeStabilizationDelayMs: 1000,
111+
});
112+
const pgflowClient3 = new PgflowClient(supabaseClient3, {
113+
realtimeStabilizationDelayMs: 1000,
114+
});
107115

108116
// Client 1 starts the flow
109117
const input = { data: 'multi-client-test' };
@@ -186,7 +194,9 @@ describe('Concurrent Operations Tests', () => {
186194

187195
const sqlClient = new PgflowSqlClient(sql);
188196
const supabaseClient = createTestSupabaseClient();
189-
const pgflowClient = new PgflowClient(supabaseClient);
197+
const pgflowClient = new PgflowClient(supabaseClient, {
198+
realtimeStabilizationDelayMs: 1000,
199+
});
190200

191201
// Start fewer runs to reduce system load
192202
const runs = await Promise.all([
@@ -268,7 +278,9 @@ describe('Concurrent Operations Tests', () => {
268278

269279
const sqlClient = new PgflowSqlClient(sql);
270280
const supabaseClient = createTestSupabaseClient();
271-
const pgflowClient = new PgflowClient(supabaseClient);
281+
const pgflowClient = new PgflowClient(supabaseClient, {
282+
realtimeStabilizationDelayMs: 1000,
283+
});
272284

273285
// Start flows sequentially for reliability
274286
const runA = await pgflowClient.startFlow(flowA.slug, { type: 'flow-a' });

pkgs/client/__tests__/integration/flow-lifecycle.test.ts

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ describe('Flow Lifecycle Integration', () => {
2323
await sql`SELECT pgflow.add_step(${testFlow.slug}, 'first_step')`;
2424

2525
const supabaseClient = createTestSupabaseClient();
26-
const pgflowClient = new PgflowClient(supabaseClient);
26+
const pgflowClient = new PgflowClient(supabaseClient, {
27+
realtimeStabilizationDelayMs: 1000,
28+
});
2729

2830
const input = { url: 'https://example.com' };
2931
const run = await pgflowClient.startFlow(testFlow.slug, input);
@@ -51,7 +53,9 @@ describe('Flow Lifecycle Integration', () => {
5153
await sql`SELECT pgflow.add_step(${testFlow.slug}, 'step_two')`;
5254

5355
const supabaseClient = createTestSupabaseClient();
54-
const pgflowClient = new PgflowClient(supabaseClient);
56+
const pgflowClient = new PgflowClient(supabaseClient, {
57+
realtimeStabilizationDelayMs: 1000,
58+
});
5559

5660
const run = await pgflowClient.startFlow(testFlow.slug, {
5761
data: 'test',
@@ -83,7 +87,9 @@ describe('Flow Lifecycle Integration', () => {
8387
await sql`SELECT pgflow.add_step(${testFlow.slug}, 'step_two')`;
8488

8589
const supabaseClient = createTestSupabaseClient();
86-
const pgflowClient = new PgflowClient(supabaseClient);
90+
const pgflowClient = new PgflowClient(supabaseClient, {
91+
realtimeStabilizationDelayMs: 1000,
92+
});
8793

8894
const run = await pgflowClient.startFlow(testFlow.slug, {
8995
data: 'consistency-test',
@@ -119,7 +125,9 @@ describe('Flow Lifecycle Integration', () => {
119125

120126
const sqlClient = new PgflowSqlClient(sql);
121127
const supabaseClient = createTestSupabaseClient();
122-
const pgflowClient = new PgflowClient(supabaseClient);
128+
const pgflowClient = new PgflowClient(supabaseClient, {
129+
realtimeStabilizationDelayMs: 1000,
130+
});
123131

124132
const input = { data: 'lifecycle-test' };
125133
const run = await pgflowClient.startFlow(testFlow.slug, input);
@@ -154,7 +162,9 @@ describe('Flow Lifecycle Integration', () => {
154162
await grantMinimalPgflowPermissions(sql);
155163

156164
const supabaseClient = createTestSupabaseClient();
157-
const pgflowClient = new PgflowClient(supabaseClient);
165+
const pgflowClient = new PgflowClient(supabaseClient, {
166+
realtimeStabilizationDelayMs: 1000,
167+
});
158168

159169
await expect(
160170
pgflowClient.startFlow('nonexistent-flow', { data: 'test' })
@@ -178,7 +188,9 @@ describe('Flow Lifecycle Integration', () => {
178188

179189
const sqlClient = new PgflowSqlClient(sql);
180190
const supabaseClient = createTestSupabaseClient();
181-
const pgflowClient = new PgflowClient(supabaseClient);
191+
const pgflowClient = new PgflowClient(supabaseClient, {
192+
realtimeStabilizationDelayMs: 1000,
193+
});
182194

183195
const run = await pgflowClient.startFlow(testFlow.slug, {
184196
data: 'will-fail',
@@ -212,7 +224,9 @@ describe('Flow Lifecycle Integration', () => {
212224

213225
const sqlClient = new PgflowSqlClient(sql);
214226
const supabaseClient = createTestSupabaseClient();
215-
const pgflowClient = new PgflowClient(supabaseClient);
227+
const pgflowClient = new PgflowClient(supabaseClient, {
228+
realtimeStabilizationDelayMs: 1000,
229+
});
216230

217231
const originalRun = await pgflowClient.startFlow(testFlow.slug, {
218232
data: 'retrieve-test',
@@ -255,7 +269,9 @@ describe('Flow Lifecycle Integration', () => {
255269
await grantMinimalPgflowPermissions(sql);
256270

257271
const supabaseClient = createTestSupabaseClient();
258-
const pgflowClient = new PgflowClient(supabaseClient);
272+
const pgflowClient = new PgflowClient(supabaseClient, {
273+
realtimeStabilizationDelayMs: 1000,
274+
});
259275

260276
const run = await pgflowClient.getRun(
261277
'00000000-0000-0000-0000-000000000000'
@@ -279,7 +295,9 @@ describe('Flow Lifecycle Integration', () => {
279295
await sql`SELECT pgflow.add_step(${testFlow.slug}, 'cached_step')`;
280296

281297
const supabaseClient = createTestSupabaseClient();
282-
const pgflowClient = new PgflowClient(supabaseClient);
298+
const pgflowClient = new PgflowClient(supabaseClient, {
299+
realtimeStabilizationDelayMs: 1000,
300+
});
283301

284302
const run1 = await pgflowClient.startFlow(testFlow.slug, {
285303
data: 'cache-test',
@@ -312,7 +330,9 @@ describe('Flow Lifecycle Integration', () => {
312330
await sql`SELECT pgflow.add_step(${testFlow2.slug}, 'step2')`;
313331

314332
const supabaseClient = createTestSupabaseClient();
315-
const pgflowClient = new PgflowClient(supabaseClient);
333+
const pgflowClient = new PgflowClient(supabaseClient, {
334+
realtimeStabilizationDelayMs: 1000,
335+
});
316336

317337
const run1 = await pgflowClient.startFlow(testFlow1.slug, {
318338
data: 'flow1',
@@ -354,7 +374,9 @@ describe('Flow Lifecycle Integration', () => {
354374
await sql`SELECT pgflow.add_step(${testFlow.slug}, 'custom_step')`;
355375

356376
const supabaseClient = createTestSupabaseClient();
357-
const pgflowClient = new PgflowClient(supabaseClient);
377+
const pgflowClient = new PgflowClient(supabaseClient, {
378+
realtimeStabilizationDelayMs: 1000,
379+
});
358380

359381
const customRunId = `12345678-1234-1234-1234-${Date.now()
360382
.toString()
@@ -384,7 +406,9 @@ describe('Flow Lifecycle Integration', () => {
384406
await sql`SELECT pgflow.add_step(${testFlow.slug}, 'complex_step')`;
385407

386408
const supabaseClient = createTestSupabaseClient();
387-
const pgflowClient = new PgflowClient(supabaseClient);
409+
const pgflowClient = new PgflowClient(supabaseClient, {
410+
realtimeStabilizationDelayMs: 1000,
411+
});
388412

389413
const complexInput = {
390414
user: {

pkgs/client/__tests__/integration/full-stack-dsl.test.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,9 @@ describe('Full Stack DSL Integration', () => {
7878
// 5. Start flow via client
7979
const sqlClient = new PgflowSqlClient(sql);
8080
const supabaseClient = createTestSupabaseClient();
81-
const pgflowClient = new PgflowClient(supabaseClient);
81+
const pgflowClient = new PgflowClient(supabaseClient, {
82+
realtimeStabilizationDelayMs: 1000,
83+
});
8284

8385
const input = { url: 'https://api.example.com/test' };
8486
const run = await pgflowClient.startFlow(SimpleFlow.slug, input);

pkgs/client/__tests__/integration/happy-path-e2e.test.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ describe('Happy Path E2E Integration', () => {
2727

2828
const sqlClient = new PgflowSqlClient(sql);
2929
const supabaseClient = createTestSupabaseClient();
30-
const pgflowClient = new PgflowClient(supabaseClient);
30+
const pgflowClient = new PgflowClient(supabaseClient, {
31+
realtimeStabilizationDelayMs: 1000,
32+
});
3133

3234
// Track all events received
3335
const receivedRunEvents: any[] = [];

0 commit comments

Comments
 (0)