Skip to content

Commit c22a1e5

Browse files
committed
Fix missing realtime broadcasts for step:started and step:completed events (#305)
## Summary Fixes critical bug where `step:started` and `step:completed` realtime events were not being broadcast to clients due to PostgreSQL query planner optimizing away unused CTEs. ## Bugs Fixed ### 1. Missing step:started broadcasts (Critical) **Problem:** Clients never received `step:started` events when steps transitioned from Created to Started status. **Root Cause:** The `broadcast_events` CTE in `start_ready_steps()` was not referenced by any subsequent CTEs or the final RETURN statement. PostgreSQL's query optimizer eliminated it as "dead code", preventing `realtime.send()` from executing. **Solution:** Moved `realtime.send()` call directly into the RETURNING clause of the UPDATE statement that marks steps as Started. This ensures the broadcast executes atomically with the state change and cannot be optimized away. ### 2. Missing step:completed broadcasts for empty map steps (Critical) **Problem:** Clients never received `step:completed` events for map steps that received empty arrays and completed immediately. **Root Cause:** The `broadcast_empty_completed` CTE in `start_ready_steps()` suffered the same optimization issue - not referenced by subsequent operations. **Solution:** Moved `realtime.send()` call into the RETURNING clause of the UPDATE that completes empty map steps. ### 3. Missing step:completed broadcasts in cascade completion (Critical) **Problem:** Clients didn't receive events when taskless steps were cascade-completed in `cascade_complete_taskless_steps()`. **Root Cause:** Similar CTE optimization issue in the cascade completion logic. **Solution:** Added `realtime.send()` directly in the RETURNING clause of the cascade completion UPDATE. ## Technical Changes ### SQL Schema Updates **`start_ready_steps.sql`:** - Removed `broadcast_events` CTE (step:started) - Removed `broadcast_empty_completed` CTE (step:completed for empty maps) - Added `realtime.send()` in RETURNING clause of `started_step_states` CTE - Added `realtime.send()` in RETURNING clause of `completed_empty_steps` CTE **`cascade_complete_taskless_steps.sql`:** - Added `realtime.send()` in RETURNING clause of cascade completion UPDATE - Broadcasts step:completed events atomically with state transition **`complete_task.sql`:** - Added PERFORM statements for run:failed and step:failed event broadcasts - Ensures error events are reliably broadcast ### Client-Side Improvements **New `applySnapshot()` methods:** - Added to `FlowRun` and `FlowStep` classes - Applies database state directly without emitting events - Used for initial state hydration from `start_flow_with_states()` and `get_run_with_states()` **Why this matters:** Previously, we used `updateState()` for initial state, which internally tried to emit events. This was wrong - the initial state from the database is a snapshot, not a series of events. The new `applySnapshot()` method makes this distinction clear. ### Test Updates Completely rewrote broadcast tests to understand the actual behavior: **Root steps vs Dependent steps:** - Root steps start in the same transaction as `start_flow()` - already Started when the function returns - Can't observe their step:started broadcasts (would require listening before the transaction commits) - Tests now verify state directly instead of trying to catch impossible broadcasts **Dependent steps:** - Start AFTER their dependencies complete (separate transaction) - CAN observe step:started broadcasts (the actual test case for the bug fix) - New test specifically verifies these broadcasts work correctly **Empty map steps:** - Root empty maps complete in `start_flow()` transaction - verify state directly - Dependent empty maps complete after dependencies - verify broadcasts ## Testing All tests passing: - ✅ Root steps verified via state (already Started when startFlow() returns) - ✅ Dependent steps receive step:started broadcasts - ✅ Dependent empty maps receive step:completed broadcasts (skip step:started) - ✅ Cascade completion broadcasts step:completed events - ✅ Event sequence validation (started → completed order) ## Migration Notes No breaking changes. This is a pure bug fix that makes the system work as originally intended. Users who implemented workarounds for missing events can now remove them.
1 parent 92aae76 commit c22a1e5

16 files changed

+1098
-185
lines changed

.changeset/clear-signal-beam.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
---
2+
'@pgflow/client': patch
3+
'@pgflow/core': patch
4+
---
5+
6+
Fix missing realtime broadcasts for step:started and step:completed events
7+
8+
**Critical bug fix:** Clients were not receiving `step:started` events when steps transitioned to Started status, and `step:completed` events for empty map steps and cascade completions were also missing.
9+
10+
**Root cause:** PostgreSQL query optimizer was eliminating CTEs containing `realtime.send()` calls because they were not referenced by subsequent operations or the final RETURN statement.
11+
12+
**Solution:** Moved `realtime.send()` calls directly into RETURNING clauses of UPDATE statements, ensuring they execute atomically with state changes and cannot be optimized away.
13+
14+
**Changes:**
15+
- `start_ready_steps()`: Broadcasts step:started and step:completed events in RETURNING clauses
16+
- `cascade_complete_taskless_steps()`: Broadcasts step:completed events atomically with cascade completion
17+
- `complete_task()`: Added PERFORM statements for run:failed and step:failed broadcasts
18+
- Client: Added `applySnapshot()` methods to FlowRun and FlowStep for proper initial state hydration without event emission

pkgs/client/__tests__/integration/real-flow-execution.test.ts

Lines changed: 163 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,11 @@ describe('Real Flow Execution', () => {
3636
expect(run.flow_slug).toBe(testFlow.slug);
3737

3838
// Give realtime subscription time to establish properly
39-
await new Promise(resolve => setTimeout(resolve, 2000));
39+
await new Promise((resolve) => setTimeout(resolve, 2000));
4040

4141
// Poll for task
4242
const tasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5);
43-
43+
4444
expect(tasks).toHaveLength(1);
4545
expect(tasks[0].input.run).toEqual(input);
4646

@@ -112,7 +112,7 @@ describe('Real Flow Execution', () => {
112112
step.on('*', stepTracker.callback);
113113

114114
// Give realtime subscription time to establish
115-
await new Promise(resolve => setTimeout(resolve, 100));
115+
await new Promise((resolve) => setTimeout(resolve, 100));
116116

117117
// Poll and complete task
118118
const tasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5);
@@ -147,54 +147,120 @@ describe('Real Flow Execution', () => {
147147
);
148148

149149
it(
150-
'CRITICAL: broadcasts step:started events (CTE optimization bug check)',
150+
'root steps: started immediately (verify via waitForStatus, not broadcasts)',
151151
withPgNoTransaction(async (sql) => {
152-
// This test specifically verifies that step:started events ARE broadcast
153-
// It SHOULD FAIL until the CTE optimization bug is fixed in start_ready_steps()
152+
// Root steps are started in the same transaction as start_flow()
153+
// By the time startFlow() returns, they're already Started
154+
// We can't observe these broadcasts - they happen before we can listen
155+
// Instead, verify the state directly
154156

155-
const testFlow = createTestFlow('started_event_flow');
157+
const testFlow = createTestFlow('root_started_flow');
156158
await cleanupFlow(sql, testFlow.slug);
157159
await grantMinimalPgflowPermissions(sql);
158160
await sql`SELECT pgflow.create_flow(${testFlow.slug})`;
159-
await sql`SELECT pgflow.add_step(${testFlow.slug}, 'test_step')`;
161+
await sql`SELECT pgflow.add_step(${testFlow.slug}, 'root_step')`;
160162

161163
const sqlClient = new PgflowSqlClient(sql);
162164
const supabaseClient = createTestSupabaseClient();
163165
const pgflowClient = new PgflowClient(supabaseClient);
164166

167+
// Start flow - root step starts in this transaction
165168
const run = await pgflowClient.startFlow(testFlow.slug, { test: 'data' });
166-
const step = run.step('test_step');
169+
const step = run.step('root_step');
167170

168-
// Track ALL step events with event matchers
169-
const tracker = createEventTracker();
170-
step.on('*', tracker.callback);
171+
// VERIFY: Step is already Started when startFlow() returns
172+
expect(step.status).toBe(FlowStepStatus.Started);
173+
expect(step.started_at).toBeDefined();
171174

172-
// Give realtime subscription time to establish
173-
await new Promise(resolve => setTimeout(resolve, 100));
175+
// waitForStatus should return immediately (already Started)
176+
await step.waitForStatus(FlowStepStatus.Started, { timeoutMs: 1000 });
174177

175-
// Execute the step - this calls start_ready_steps() which should broadcast step:started
178+
// Complete for cleanup
176179
const tasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5);
177-
178-
// Wait a moment for broadcast to propagate
179-
await new Promise(resolve => setTimeout(resolve, 500));
180-
181-
// Complete the task
182180
await sqlClient.completeTask(tasks[0], { result: 'done' });
183181
await step.waitForStatus(FlowStepStatus.Completed, { timeoutMs: 5000 });
184182

185-
// CRITICAL ASSERTIONS: Verify step:started WAS broadcast
186-
// These will FAIL with the current CTE optimization bug!
183+
await supabaseClient.removeAllChannels();
184+
}),
185+
15000
186+
);
187+
188+
it(
189+
'dependent steps: broadcasts step:started when they become ready',
190+
withPgNoTransaction(async (sql) => {
191+
// Dependent steps start AFTER their dependencies complete
192+
// This happens AFTER startFlow() returns, so we CAN observe broadcasts
193+
// This is the real test for step:started broadcasts!
194+
195+
const testFlow = createTestFlow('dependent_started_flow');
196+
await cleanupFlow(sql, testFlow.slug);
197+
await grantMinimalPgflowPermissions(sql);
198+
199+
await sql`SELECT pgflow.create_flow(${testFlow.slug})`;
200+
await sql`SELECT pgflow.add_step(${testFlow.slug}, 'root_step')`;
201+
await sql`SELECT pgflow.add_step(${testFlow.slug}, 'dependent_step', ARRAY['root_step'])`;
202+
203+
const sqlClient = new PgflowSqlClient(sql);
204+
const supabaseClient = createTestSupabaseClient();
205+
const pgflowClient = new PgflowClient(supabaseClient);
206+
207+
// Start flow - only root_step starts
208+
const run = await pgflowClient.startFlow(testFlow.slug, { test: 'data' });
209+
const rootStep = run.step('root_step');
210+
const dependentStep = run.step('dependent_step');
211+
212+
// Root is started, dependent is still created (waiting for deps)
213+
expect(rootStep.status).toBe(FlowStepStatus.Started);
214+
expect(dependentStep.status).toBe(FlowStepStatus.Created);
215+
216+
// NOW set up event tracker (before completing root)
217+
const tracker = createEventTracker();
218+
dependentStep.on('*', tracker.callback);
219+
220+
// Give realtime subscription time to establish
221+
await new Promise((resolve) => setTimeout(resolve, 100));
222+
223+
// Complete root step - this will trigger dependent_step to start
224+
const rootTasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5);
225+
expect(rootTasks[0].step_slug).toBe('root_step');
226+
await sqlClient.completeTask(rootTasks[0], { result: 'root done' });
227+
228+
// Wait for dependent to start
229+
await dependentStep.waitForStatus(FlowStepStatus.Started, {
230+
timeoutMs: 5000,
231+
});
232+
233+
// VERIFY: We received step:started broadcast for dependent step
187234
expect(tracker).toHaveReceivedEvent('step:started', {
188235
run_id: run.run_id,
189-
step_slug: 'test_step',
236+
step_slug: 'dependent_step',
190237
status: FlowStepStatus.Started,
191238
});
192239

193-
// Verify proper event sequence
194-
expect(tracker).toHaveReceivedEventSequence(['step:started', 'step:completed']);
195-
expect(tracker).toHaveReceivedInOrder('step:started', 'step:completed');
240+
// Complete dependent step
241+
const dependentTasks = await readAndStart(
242+
sql,
243+
sqlClient,
244+
testFlow.slug,
245+
1,
246+
5
247+
);
248+
expect(dependentTasks[0].step_slug).toBe('dependent_step');
249+
await sqlClient.completeTask(dependentTasks[0], {
250+
result: 'dependent done',
251+
});
252+
253+
// Wait for completion
254+
await dependentStep.waitForStatus(FlowStepStatus.Completed, {
255+
timeoutMs: 5000,
256+
});
196257

197-
// Verify both events were received
258+
// VERIFY: Proper event sequence
259+
expect(tracker).toHaveReceivedEventSequence([
260+
'step:started',
261+
'step:completed',
262+
]);
263+
expect(tracker).toHaveReceivedInOrder('step:started', 'step:completed');
198264
expect(tracker).toHaveReceivedEventCount('step:started', 1);
199265
expect(tracker).toHaveReceivedEventCount('step:completed', 1);
200266

@@ -204,12 +270,13 @@ describe('Real Flow Execution', () => {
204270
);
205271

206272
it(
207-
'empty map steps: skip step:started and go straight to step:completed',
273+
'empty map steps (root): completed immediately (verify via state)',
208274
withPgNoTransaction(async (sql) => {
209-
// This test verifies the EXPECTED behavior for empty map steps
210-
// They should NOT send step:started, only step:completed
275+
// Empty map steps with no tasks complete immediately
276+
// Root empty maps complete in the start_flow transaction
277+
// Can't observe broadcasts - verify state instead
211278

212-
const testFlow = createTestFlow('empty_map_flow');
279+
const testFlow = createTestFlow('root_empty_map_flow');
213280
await cleanupFlow(sql, testFlow.slug);
214281
await grantMinimalPgflowPermissions(sql);
215282

@@ -229,31 +296,81 @@ describe('Real Flow Execution', () => {
229296
const supabaseClient = createTestSupabaseClient();
230297
const pgflowClient = new PgflowClient(supabaseClient);
231298

232-
// Start flow with empty array directly (root map steps expect array input)
299+
// Start flow with empty array (root map steps expect array input)
233300
const run = await pgflowClient.startFlow(testFlow.slug, []);
234301
const step = run.step('empty_map_step');
235302

236-
// Track events
303+
// VERIFY: Step is already Completed when startFlow() returns
304+
expect(step.status).toBe(FlowStepStatus.Completed);
305+
expect(step.completed_at).toBeDefined();
306+
307+
// Empty maps DO get started_at set (they transition through started briefly)
308+
expect(step.started_at).toBeDefined();
309+
310+
await supabaseClient.removeAllChannels();
311+
}),
312+
15000
313+
);
314+
315+
it(
316+
'empty map steps (dependent): broadcasts step:completed when triggered',
317+
withPgNoTransaction(async (sql) => {
318+
// Dependent empty map steps complete AFTER their dependencies
319+
// This happens AFTER startFlow() returns, so we CAN observe broadcasts
320+
// They skip step:started and go directly to step:completed
321+
322+
const testFlow = createTestFlow('dependent_empty_map_flow');
323+
await cleanupFlow(sql, testFlow.slug);
324+
await grantMinimalPgflowPermissions(sql);
325+
326+
await sql`SELECT pgflow.create_flow(${testFlow.slug})`;
327+
await sql`SELECT pgflow.add_step(${testFlow.slug}, 'root_step')`;
328+
// Dependent map step that will receive empty array from root
329+
await sql`SELECT pgflow.add_step(
330+
${testFlow.slug},
331+
'dependent_empty_map',
332+
ARRAY['root_step'],
333+
NULL,
334+
NULL,
335+
NULL,
336+
NULL,
337+
'map'
338+
)`;
339+
340+
const sqlClient = new PgflowSqlClient(sql);
341+
const supabaseClient = createTestSupabaseClient();
342+
const pgflowClient = new PgflowClient(supabaseClient);
343+
344+
const run = await pgflowClient.startFlow(testFlow.slug, { test: 'data' });
345+
const rootStep = run.step('root_step');
346+
const emptyMapStep = run.step('dependent_empty_map');
347+
348+
// Set up tracker before completing root
237349
const tracker = createEventTracker();
238-
step.on('*', tracker.callback);
350+
emptyMapStep.on('*', tracker.callback);
239351

240-
// Give realtime time to establish
241-
await new Promise(resolve => setTimeout(resolve, 100));
352+
await new Promise((resolve) => setTimeout(resolve, 100));
242353

243-
// Wait for step to complete (should happen immediately)
244-
await step.waitForStatus(FlowStepStatus.Completed, { timeoutMs: 5000 });
354+
// Complete root with empty array (single steps feeding map steps output arrays directly)
355+
const rootTasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5);
356+
await sqlClient.completeTask(rootTasks[0], []);
357+
358+
// Wait for dependent to complete (should happen immediately)
359+
await emptyMapStep.waitForStatus(FlowStepStatus.Completed, {
360+
timeoutMs: 5000,
361+
});
245362

246-
// Verify NO step:started event (expected for empty maps)
363+
// VERIFY: NO step:started (empty maps skip this)
247364
expect(tracker).toNotHaveReceivedEvent('step:started');
248365

249-
// Verify step:completed was sent
366+
// VERIFY: Received step:completed directly
250367
expect(tracker).toHaveReceivedEvent('step:completed', {
251368
run_id: run.run_id,
252-
step_slug: 'empty_map_step',
369+
step_slug: 'dependent_empty_map',
253370
status: FlowStepStatus.Completed,
254371
});
255372

256-
// Verify only 1 event total
373+
// VERIFY: Only 1 event total (completed, no started)
257374
expect(tracker).toHaveReceivedTotalEvents(1);
258375

259376
await supabaseClient.removeAllChannels();
@@ -285,10 +402,12 @@ describe('Real Flow Execution', () => {
285402
expect(step.started_at).toBeDefined();
286403

287404
// Give realtime subscription time to establish
288-
await new Promise(resolve => setTimeout(resolve, 100));
405+
await new Promise((resolve) => setTimeout(resolve, 100));
289406

290407
// waitForStatus should resolve immediately since step is already Started
291-
const waitPromise = step.waitForStatus(FlowStepStatus.Started, { timeoutMs: 5000 });
408+
const waitPromise = step.waitForStatus(FlowStepStatus.Started, {
409+
timeoutMs: 5000,
410+
});
292411
const result = await waitPromise;
293412
expect(result).toBe(step);
294413
expect(step.status).toBe(FlowStepStatus.Started);

pkgs/client/src/lib/FlowRun.ts

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -264,13 +264,33 @@ export class FlowRun<TFlow extends AnyFlow>
264264
});
265265
}
266266

267+
/**
268+
* Apply state from database snapshot (no events emitted)
269+
* Used when initializing state from start_flow_with_states() or get_run_with_states()
270+
*
271+
* @internal This method is only intended for use by PgflowClient.
272+
* Applications should not call this directly.
273+
*/
274+
applySnapshot(row: import('@pgflow/core').RunRow): void {
275+
// Direct state assignment from database row (no event conversion)
276+
this.#state.status = row.status as FlowRunStatus;
277+
this.#state.input = row.input as ExtractFlowInput<TFlow>;
278+
this.#state.output = row.output as ExtractFlowOutput<TFlow> | null;
279+
this.#state.started_at = row.started_at ? new Date(row.started_at) : null;
280+
this.#state.completed_at = row.completed_at ? new Date(row.completed_at) : null;
281+
this.#state.failed_at = row.failed_at ? new Date(row.failed_at) : null;
282+
this.#state.remaining_steps = row.remaining_steps;
283+
this.#state.error_message = null; // Database doesn't have error_message for runs
284+
this.#state.error = null;
285+
}
286+
267287
/**
268288
* Updates the run state based on an event
269-
*
289+
*
270290
* @internal This method is only intended for use by PgflowClient and tests.
271291
* Applications should not call this directly - state updates should come from
272292
* database events through the PgflowClient.
273-
*
293+
*
274294
* TODO: After v1.0, make this method private and refactor tests to use PgflowClient
275295
* with event emission instead of direct state manipulation.
276296
*/

pkgs/client/src/lib/FlowStep.ts

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,13 +176,31 @@ export class FlowStep<
176176
});
177177
}
178178

179+
/**
180+
* Apply state from database snapshot (no events emitted)
181+
* Used when initializing state from start_flow_with_states() or get_run_with_states()
182+
*
183+
* @internal This method is only intended for use by PgflowClient.
184+
* Applications should not call this directly.
185+
*/
186+
applySnapshot(row: import('@pgflow/core').StepStateRow): void {
187+
// Direct state assignment from database row (no event conversion)
188+
this.#state.status = row.status as FlowStepStatus;
189+
this.#state.started_at = row.started_at ? new Date(row.started_at) : null;
190+
this.#state.completed_at = row.completed_at ? new Date(row.completed_at) : null;
191+
this.#state.failed_at = row.failed_at ? new Date(row.failed_at) : null;
192+
this.#state.error_message = row.error_message;
193+
this.#state.error = row.error_message ? new Error(row.error_message) : null;
194+
// Note: output is not stored in step_states table, remains null
195+
}
196+
179197
/**
180198
* Updates the step state based on an event
181-
*
199+
*
182200
* @internal This method is only intended for use by FlowRun and tests.
183201
* Applications should not call this directly - state updates should come from
184202
* database events through the PgflowClient.
185-
*
203+
*
186204
* TODO: After v1.0, make this method private and refactor tests to use PgflowClient
187205
* with event emission instead of direct state manipulation.
188206
*/

0 commit comments

Comments
 (0)