Skip to content

Commit b3ba62a

Browse files
committed
Fix Orphaned Messages on Run Failure (#220)
This PR fixes a critical issue where messages for pending tasks remain in the queue indefinitely when a run fails, causing performance degradation and resource waste. ### Problem - Failed runs left queued messages orphaned, causing workers to poll them forever - Map steps with N tasks would leave N-1 messages orphaned when one task failed - Type constraint violations would retry unnecessarily despite being deterministic failures ### Solution - Archive all queued messages when a run fails - Handle type violations gracefully (fail immediately, no retries) - Prevent any retries when the run is already failed - Add index for efficient message archiving ### Testing - Added comprehensive tests for map task failures and type violations - All existing tests pass without regression
1 parent a4562bb commit b3ba62a

21 files changed

+1760
-272
lines changed

.changeset/hungry-cloths-hunt.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
---
2+
'@pgflow/core': patch
3+
---
4+
5+
Improve failure handling and prevent orphaned messages in queue
6+
7+
- Archive all queued messages when a run fails to prevent resource waste
8+
- Handle type constraint violations gracefully without exceptions
9+
- Store output on failed tasks (including type violations) for debugging
10+
- Add performance index for efficient message archiving
11+
- Prevent retries on already-failed runs
12+
- Update table constraint to allow output storage on failed tasks

.claude/commands/fix-sql-tests.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
Your job is to fix SQL tests, either by fixing the tests if those are invalid,
2+
or updating the SQL functions in pkgs/core/schemas/ and trying again.
3+
4+
If updating functions, load them with psql.
5+
6+
!`pnpm nx supabase:status core --output env | grep DB_URL`
7+
PWD: !`pwd`
8+
9+
To rerun the test(s), run this command from `pkgs/core` directory:
10+
11+
`scripts/run-test-with-colors supabase/tests/<testfile>`
12+
13+
Do not create any migratons or try to run tests with nx.
14+
15+
<test_failures>
16+
!`pnpm nx test:pgtap core`
17+
</test_failures>

.claude/commands/test-first-sql.md

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
Your job is to implement the feature below in a test-first manner.
2+
First, you must idenfity what things you want to test for.
3+
Then you must write one test at a time, from the simplest, more generic,
4+
to more precise (if applicable, sometimes you only need to write one test per
5+
thing, without multiple per thing).
6+
7+
To run the test(s), run this command from `pkgs/core` directory:
8+
9+
`scripts/run-test-with-colors supabase/tests/<testfile>`
10+
11+
The newly written test must fail for the correct reasons.
12+
13+
In order to make the test pass, you need to update function
14+
code in pkgs/core/schemas/.
15+
16+
After updating you should use `psql` to execute function file
17+
and update function in database.
18+
19+
!`pnpm nx supabase:status core --output env | grep DB_URL`
20+
PWD: !`pwd`
21+
22+
Repeat until all the added tests are passing.
23+
24+
When they do, run all the tests like this:
25+
26+
`scripts/run-test-with-colors supabase/tests/`
27+
28+
Do not create any migratons or try to run tests with nx.
29+
30+
Never use any INSERTs or UPDATEs to prepare or mutate state for the test.
31+
Instead, use regular pgflow.\* SQL functions or functions that are
32+
available in pkgs/core/supabase/tests/seed.sql:
33+
34+
!`grep 'function.*pgflow_tests' pkgs/core/supabase/seed.sql -A7`
35+
36+
Check how they are used in other tests.
37+
38+
<feature_to_implement>
39+
$ARGUMENTS
40+
</feature_to_implement>

PLAN.md

Lines changed: 36 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
-**DONE**: Array element extraction - tasks receive individual array elements
1111
-**DONE**: Output aggregation - inline implementation aggregates map task outputs for dependents
1212
-**DONE**: DSL support for `.map()` for defining map steps with compile-time duplicate detection
13-
- **TODO**: Fix orphaned messages on run failure
13+
- **DONE**: Fix orphaned messages on run failure
1414
-**TODO**: Performance optimization with step_states.output column
1515

1616
### Chores
@@ -106,18 +106,28 @@
106106
- Updated DSL README with .map() documentation
107107
- Created detailed changeset
108108

109+
- [x] **PR #219: Fix Orphaned Messages on Run Failure** - `09-18-fix-orphaned-messages-on-fail` ✅ COMPLETED
110+
111+
- Archives all queued messages when run fails (prevents orphaned messages)
112+
- Handles type constraint violations gracefully without exceptions
113+
- Added guards to prevent any mutations on failed runs:
114+
- complete_task returns unchanged
115+
- start_ready_steps exits early
116+
- cascade_complete_taskless_steps returns 0
117+
- Added performance index for efficient message archiving
118+
- Tests unstashed and passing (archive_sibling_map_tasks, archive_messages_on_type_constraint_failure)
119+
- Updated core README with failure handling mentions
120+
- **Critical fix: prevents queue performance degradation in production**
121+
109122
#### ❌ Remaining Work (Priority Order)
110123

111-
- [ ] **Priority 1: Fix Orphaned Messages on Run Failure** 🚨 CRITICAL
124+
- [ ] **Integration Tests**
112125

113-
- Archive all pending messages when run fails
114-
- Handle map sibling tasks specially
115-
- Fix type constraint violations to fail immediately without retries
116-
- See detailed plan: [PLAN_orphaned_messages.md](./PLAN_orphaned_messages.md)
117-
- **Critical for production: prevents queue performance degradation**
118-
- Tests already written (stashed) that document the problem
126+
- End-to-end workflows with real array data
127+
- Basic happy path coverage
128+
- This should be minimal and added to the Edge Worker integration test suite for now
119129

120-
- [ ] **Priority 2: Performance Optimization - step_states.output Column**
130+
- [ ] **Performance Optimization - step_states.output Column**
121131

122132
- Migrate from inline aggregation to storing outputs in step_states
123133
- See detailed plan: [PLAN_step_output.md](./PLAN_step_output.md)
@@ -132,43 +142,33 @@
132142
- Update all aggregation tests (~17 files)
133143
- **Note**: This is an optimization that should be done after core functionality is stable
134144

135-
- [ ] **Priority 3: Integration Tests**
136-
137-
- End-to-end workflows with real array data
138-
- Basic happy path coverage
139-
- This should be minimal and added to the Edge Worker integration test suite for now
140-
141-
- [ ] **Priority 4: Update core README**
142-
143-
- `pkgs/core/README.md`
144-
145-
- Add new section describing the step types
146-
- Describe single step briefly, focus on describing map step type and how it differs
147-
- Make sure to mention that maps are constrained to have exactly one dependency
148-
- Show multiple cases of inputs -> task creation
149-
- Explain edge cases (empty array propagation, invalid array input)
150-
- Explain root map vs dependent map and how it gets handled and what restrictions those apply on the Flow input
151-
- Explain cascade completion of taskless steps and its limitations
145+
- [ ] **Update `pkgs/core/README.md`**
152146

153-
- [ ] **Priority 5: Add docs page**
147+
- Add new section describing the step types
148+
- Describe single step briefly, focus on describing map step type and how it differs
149+
- Make sure to mention that maps are constrained to have exactly one dependency
150+
- Show multiple cases of inputs -> task creation
151+
- Explain edge cases (empty array propagation, invalid array input)
152+
- Explain root map vs dependent map and how it gets handled and what restrictions those apply on the Flow input
153+
- Explain cascade completion of taskless steps and its limitations
154154

155-
- **Add basic docs page**
155+
- [ ] **Add docs page**
156156

157-
- put it into `pkgs/website/src/content/docs/concepts/array-and-map-steps.mdx`
158-
- describe the DSL and how the map works and why we need it
159-
- show example usage of root map
160-
- show example usage of dependent map
161-
- focus mostly on how to use it, instead of how it works under the hood
162-
- link to the README's for more details
157+
- put it into `pkgs/website/src/content/docs/concepts/array-and-map-steps.mdx`
158+
- describe the DSL and how the map works and why we need it
159+
- show example usage of root map
160+
- show example usage of dependent map
161+
- focus mostly on how to use it, instead of how it works under the hood
162+
- link to the README's for more details
163163

164-
- [ ] **Priority 6: Migration Consolidation** (Do this last before merge!)
164+
- [ ] **Migration Consolidation**
165165

166166
- Remove all temporary/incremental migrations from feature branches
167167
- Generate a single consolidated migration for the entire map infrastructure
168168
- Ensure clean migration path from current production schema
169169
- If NULL improvement is done, include it in the consolidated migration
170170

171-
- [ ] **Priority 7: Graphite Stack Merge**
171+
- [ ] **Graphite Stack Merge**
172172

173173
- Configure Graphite merge queue for the complete PR stack
174174
- Ensure all PRs in sequence can be merged together

PLAN_orphaned_messages.md

Lines changed: 0 additions & 184 deletions
This file was deleted.

0 commit comments

Comments
 (0)