Skip to content

Commit b531d64

Browse files
committed
fix: remove deprecated create_realtime_partition function and update test references
- Deleted the create_realtime_partition SQL function and related comments - Updated test scripts to no longer call create_realtime_partition - Replaced pgflow.read_with_poll with pgmq.read_with_poll in client code - Ensured tests focus on existing functionality without partition creation - Minor adjustments to test setup for consistency and clarity
1 parent 9f66fe6 commit b531d64

34 files changed

+470
-415
lines changed

.changeset/pgmq-version-bump.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
---
2+
'@pgflow/core': minor
3+
'@pgflow/edge-worker': minor
4+
---
5+
6+
BREAKING CHANGE: This version requires pgmq 1.5.1 or higher and will NOT work with pgmq 1.4.4.
7+
8+
The code now depends on schema changes introduced in pgmq 1.5.0+ (specifically the headers column in message_record type). The compatibility layer that allowed pgflow to work with pgmq 1.4.4 has been removed.
9+
10+
If you are using Supabase, pgmq 1.5.1 is included by default in recent versions. If you are self-hosting, you must upgrade pgmq to 1.5.1 or higher before upgrading pgflow.

.claude/skills/migration-management/SKILL.md

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,20 @@ cd pkgs/core
6565
./scripts/atlas-migrate-diff your_feature_name # NO pgflow_ prefix (auto-added)
6666
# Creates: supabase/migrations/TIMESTAMP_pgflow_your_feature_name.sql
6767

68+
# ⚠️ CHECK: If changing function signatures, review the migration for CREATE OR REPLACE
69+
# See Troubleshooting section if you get "cannot change return type" errors
70+
6871
pnpm nx verify-migrations core # Validates migration + checks schemas synced
6972
pnpm nx gen-types core # Regenerate TypeScript types
7073
pnpm nx test:pgtap core # Verify everything works
7174
```
7275

76+
**If you manually edit a migration** (e.g., to add DROP FUNCTION):
77+
```bash
78+
./scripts/atlas-migrate-hash --yes # Regenerate checksums after manual edits
79+
pnpm nx verify-migrations core # Then verify it works
80+
```
81+
7382
### Naming Conventions
7483

7584
- snake_case (e.g., `add_root_map_support`)
@@ -180,6 +189,33 @@ git commit -m "feat: consolidate step validation and error handling"
180189

181190
## Troubleshooting
182191

192+
### ⚠️ CRITICAL: Atlas Limitation with Function Signature Changes
193+
194+
**Atlas DOES NOT detect function return type or parameter type changes!**
195+
196+
If Atlas generates `CREATE OR REPLACE FUNCTION` and the function signature changed, the migration will fail with:
197+
```
198+
ERROR: cannot change return type of existing function
199+
```
200+
201+
**Solution:** Manually edit the generated migration to add DROP first:
202+
```sql
203+
-- WRONG (Atlas generates this):
204+
CREATE OR REPLACE FUNCTION pgflow.my_func() RETURNS NEW_TYPE ...
205+
206+
-- CORRECT (manually fix to):
207+
DROP FUNCTION IF EXISTS pgflow.my_func(param_types_here);
208+
CREATE FUNCTION pgflow.my_func() RETURNS NEW_TYPE ...
209+
```
210+
211+
**When this happens:**
212+
- Changing `RETURNS SETOF type` to `RETURNS TABLE(...)`
213+
- Changing `RETURNS type1` to `RETURNS type2`
214+
- Changing parameter types
215+
- Adding/removing parameters
216+
217+
**Always test migrations with `pnpm nx supabase:reset core` to catch this!**
218+
183219
### Migration name exists
184220

185221
```bash
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
-- Modify "set_vt_batch" function
2+
CREATE OR REPLACE FUNCTION "pgflow"."set_vt_batch" ("queue_name" text, "msg_ids" bigint[], "vt_offsets" integer[]) RETURNS SETOF pgmq.message_record LANGUAGE plpgsql AS $$
3+
DECLARE
4+
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
5+
sql TEXT;
6+
BEGIN
7+
/* ---------- safety checks ---------------------------------------------------- */
8+
IF msg_ids IS NULL OR vt_offsets IS NULL OR array_length(msg_ids, 1) = 0 THEN
9+
RETURN; -- nothing to do, return empty set
10+
END IF;
11+
12+
IF array_length(msg_ids, 1) IS DISTINCT FROM array_length(vt_offsets, 1) THEN
13+
RAISE EXCEPTION
14+
'msg_ids length (%) must equal vt_offsets length (%)',
15+
array_length(msg_ids, 1), array_length(vt_offsets, 1);
16+
END IF;
17+
18+
/* ---------- dynamic statement ------------------------------------------------ */
19+
/* One UPDATE joins with the unnested arrays */
20+
sql := format(
21+
$FMT$
22+
WITH input (msg_id, vt_offset) AS (
23+
SELECT unnest($1)::bigint
24+
, unnest($2)::int
25+
)
26+
UPDATE pgmq.%I q
27+
SET vt = clock_timestamp() + make_interval(secs => input.vt_offset),
28+
read_ct = read_ct -- no change, but keeps RETURNING list aligned
29+
FROM input
30+
WHERE q.msg_id = input.msg_id
31+
RETURNING q.msg_id,
32+
q.read_ct,
33+
q.enqueued_at,
34+
q.vt,
35+
q.message,
36+
q.headers
37+
$FMT$,
38+
qtable
39+
);
40+
41+
RETURN QUERY EXECUTE sql USING msg_ids, vt_offsets;
42+
END;
43+
$$;
44+
-- Drop "read_with_poll" function
45+
DROP FUNCTION "pgflow"."read_with_poll";

pkgs/core/README.md

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ SELECT pgflow.add_step(
140140
#### Root Map vs Dependent Map
141141

142142
**Root Map Steps** process the flow's input array directly:
143+
143144
```sql
144145
-- Root map: no dependencies, processes flow input
145146
SELECT pgflow.add_step(
@@ -156,6 +157,7 @@ SELECT pgflow.start_flow(
156157
```
157158

158159
**Dependent Map Steps** process another step's array output:
160+
159161
```sql
160162
-- Dependent map: processes the array from 'fetch_items'
161163
SELECT pgflow.add_step(
@@ -169,6 +171,7 @@ SELECT pgflow.add_step(
169171
#### Edge Cases and Special Behaviors
170172

171173
1. **Empty Array Cascade**: When a map step receives an empty array (`[]`):
174+
172175
- The SQL core completes it immediately without creating tasks
173176
- The completed map step outputs an empty array
174177
- Any dependent map steps also receive empty arrays and complete immediately
@@ -184,12 +187,14 @@ SELECT pgflow.add_step(
184187
#### Implementation Details
185188

186189
Map steps utilize several database fields for state management:
190+
187191
- `initial_tasks`: Number of tasks to create (NULL until array size is known)
188192
- `remaining_tasks`: Tracks incomplete tasks for the step
189193
- `task_index`: Identifies which array element each task processes
190194
- `step_type`: Column value 'map' triggers map behavior
191195

192196
The aggregation process ensures:
197+
193198
- **Order Preservation**: Task outputs maintain array element ordering
194199
- **NULL Handling**: NULL outputs are included in the aggregated array
195200
- **Atomicity**: Aggregation occurs within the same transaction as task completion
@@ -262,15 +267,17 @@ When a workflow starts:
262267
The Edge Worker uses a two-phase approach to retrieve and start tasks:
263268

264269
**Phase 1 - Reserve Messages:**
270+
265271
```sql
266-
SELECT * FROM pgflow.read_with_poll(
272+
SELECT * FROM pgmq.read_with_poll(
267273
queue_name => 'analyze_website',
268274
vt => 60, -- visibility timeout in seconds
269275
qty => 5 -- maximum number of messages to fetch
270276
);
271277
```
272278

273279
**Phase 2 - Start Tasks:**
280+
274281
```sql
275282
SELECT * FROM pgflow.start_tasks(
276283
flow_slug => 'analyze_website',
@@ -379,6 +386,7 @@ Timeouts are enforced by setting the message visibility timeout to the step's ti
379386
The SQL Core is the DAG orchestration engine that handles dependency resolution, step state management, and task spawning. However, workflows are defined using the TypeScript Flow DSL, which compiles user intent into the SQL primitives that populate the definition tables (`flows`, `steps`, `deps`).
380387

381388
See the [@pgflow/dsl package](../dsl/README.md) for complete documentation on:
389+
382390
- Expressing workflows with type-safe method chaining
383391
- Step types (`.step()`, `.array()`, `.map()`)
384392
- Compilation to SQL migrations
@@ -441,6 +449,7 @@ Map step tasks receive a fundamentally different input structure than single ste
441449
```
442450

443451
This means:
452+
444453
- Map handlers process individual elements in isolation
445454
- Map handlers cannot access the original flow input (`run`)
446455
- Map handlers cannot access other dependencies
@@ -456,8 +465,10 @@ When a step depends on a map step, it receives the aggregated array output:
456465

457466
// A step depending on 'process_users' receives:
458467
{
459-
"run": { /* original flow input */ },
460-
"process_users": [{"name": "Alice"}, {"name": "Bob"}] // Full array
468+
"run": {
469+
/* original flow input */
470+
},
471+
"process_users": [{ "name": "Alice" }, { "name": "Bob" }] // Full array
461472
}
462473
```
463474

pkgs/core/schemas/0080_function_read_with_poll.sql

Lines changed: 0 additions & 72 deletions
This file was deleted.

pkgs/core/schemas/0110_function_set_vt_batch.sql

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,14 @@ create or replace function pgflow.set_vt_batch(
1515
msg_ids BIGINT [],
1616
vt_offsets INTEGER []
1717
)
18-
returns setof PGMQ.MESSAGE_RECORD
18+
returns table(
19+
msg_id bigint,
20+
read_ct integer,
21+
enqueued_at timestamp with time zone,
22+
vt timestamp with time zone,
23+
message jsonb,
24+
headers jsonb
25+
)
1926
language plpgsql as
2027
$$
2128
DECLARE
@@ -50,7 +57,8 @@ BEGIN
5057
q.read_ct,
5158
q.enqueued_at,
5259
q.vt,
53-
q.message
60+
q.message,
61+
q.headers
5462
$FMT$,
5563
qtable
5664
);

pkgs/core/src/PgflowSqlClient.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ export class PgflowSqlClient<TFlow extends AnyFlow>
2626
): Promise<MessageRecord[]> {
2727
return await this.sql<MessageRecord[]>`
2828
SELECT *
29-
FROM pgflow.read_with_poll(
29+
FROM pgmq.read_with_poll(
3030
queue_name => ${queueName},
3131
vt => ${visibilityTimeout},
3232
qty => ${batchSize},

0 commit comments

Comments
 (0)