From 20fcd48030f1fc4037229dd32365d89858f73c02 Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Sun, 2 Nov 2025 21:09:52 +0100 Subject: [PATCH] fix: remove deprecated create_realtime_partition function and update test references - Deleted the create_realtime_partition SQL function and related comments - Updated test scripts to no longer call create_realtime_partition - Replaced pgflow.read_with_poll with pgmq.read_with_poll in client code - Ensured tests focus on existing functionality without partition creation - Minor adjustments to test setup for consistency and clarity --- .changeset/pgmq-version-bump.md | 10 + PLAN.md | 568 ++---------------- ...251102201302_pgflow_upgrade_pgmq_1_5_1.sql | 93 +++ pkgs/client/__tests__/helpers/permissions.ts | 3 - .../integration/realtime-send.test.ts | 5 +- pkgs/client/package.json | 2 +- pkgs/client/project.json | 4 +- pkgs/client/scripts/performance-benchmark.mjs | 3 - pkgs/client/vitest.global-setup.ts | 4 - pkgs/core/README.md | 17 +- pkgs/core/package.json | 2 +- .../schemas/0080_function_read_with_poll.sql | 72 --- .../schemas/0110_function_set_vt_batch.sql | 12 +- pkgs/core/src/PgflowSqlClient.ts | 2 +- pkgs/core/src/database-types.ts | 394 +++++++----- .../20250429164909_pgflow_initial.sql | 2 +- ...251104080523_pgflow_upgrade_pgmq_1_5_1.sql | 93 +++ pkgs/core/supabase/migrations/atlas.sum | 25 +- pkgs/core/supabase/seed.sql | 75 +-- .../broadcast_order.test.sql | 3 - .../cascade_event_payload.test.sql | 3 - .../deep_cascade_events.test.sql | 3 - .../dependent_map_cascade_events.test.sql | 3 - .../empty_root_map_cascade_events.test.sql | 3 - .../mixed_cascade_events.test.sql | 3 - .../single_step_cascade_events.test.sql | 3 - .../broadcast_event_fixed.test.sql | 5 +- .../create_realtime_partition.test.sql | 83 --- .../realtime/complete_task_events.test.sql | 3 - .../empty_map_completion_events.test.sql | 3 - .../tests/realtime/fail_task_events.test.sql | 3 - .../tests/realtime/full_flow_events.test.sql | 3 - .../maybe_complete_run_events.test.sql | 3 - .../tests/realtime/start_flow_events.test.sql | 3 - .../start_ready_steps_events.test.sql | 3 - .../realtime/type_violation_events.test.sql | 3 - .../step_failed_event_bug.test.sql | 3 - .../set_vt_batch/headers_handling.test.sql | 57 ++ .../tests/set_vt_batch/return_format.test.sql | 12 +- .../start_tasks/basic_start_tasks.test.sql | 4 +- ...ds_proper_input_from_deps_outputs.test.sql | 4 +- .../multiple_task_processing.test.sql | 2 +- .../start_tasks/returns_task_index.test.sql | 10 +- .../started_at_timestamps.test.sql | 2 +- .../start_tasks/status_transitions.test.sql | 2 +- .../task_index_returned_correctly.test.sql | 2 +- .../start_tasks/worker_tracking.test.sql | 4 +- pkgs/edge-worker/package.json | 2 +- pkgs/edge-worker/src/queue/Queue.ts | 2 +- pkgs/edge-worker/src/queue/types.ts | 1 + .../messageExecutorContext.test.ts | 4 + ...messageExecutorContextWorkerConfig.test.ts | 5 +- .../tests/unit/contextUtils.test.ts | 2 + .../tests/unit/workerConfigContext.test.ts | 5 +- pnpm-lock.yaml | 89 +-- 55 files changed, 663 insertions(+), 1068 deletions(-) create mode 100644 .changeset/pgmq-version-bump.md create mode 100644 examples/playground/supabase/migrations/20251102211603_20251102201302_pgflow_upgrade_pgmq_1_5_1.sql delete mode 100644 pkgs/core/schemas/0080_function_read_with_poll.sql create mode 100644 pkgs/core/supabase/migrations/20251104080523_pgflow_upgrade_pgmq_1_5_1.sql delete mode 100644 pkgs/core/supabase/tests/pgflow_tests/create_realtime_partition.test.sql create mode 100644 pkgs/core/supabase/tests/set_vt_batch/headers_handling.test.sql diff --git a/.changeset/pgmq-version-bump.md b/.changeset/pgmq-version-bump.md new file mode 100644 index 000000000..8639eaef1 --- /dev/null +++ b/.changeset/pgmq-version-bump.md @@ -0,0 +1,10 @@ +--- +'@pgflow/core': minor +'@pgflow/edge-worker': minor +--- + +BREAKING CHANGE: This version requires pgmq 1.5.0 or higher and will NOT work with pgmq 1.4.x. + +The code now depends on schema changes introduced in pgmq 1.5.0 (specifically the headers column in message_record type). The compatibility layer that allowed pgflow to work with pgmq 1.4.x has been removed. + +If you are using Supabase, pgmq 1.5.0+ is included by default in recent versions. If you are self-hosting, you must upgrade pgmq to version 1.5.0 or higher before upgrading pgflow. diff --git a/PLAN.md b/PLAN.md index 359a0ea66..33f4eebaf 100644 --- a/PLAN.md +++ b/PLAN.md @@ -1,533 +1,81 @@ -# PGMQ 1.5.1 Upgrade Plan +# Plan: Complete pgmq 1.5.0+ Upgrade Documentation and Communication -This document tracks the work needed to fully upgrade to PGMQ 1.5.1 and remove compatibility workarounds that are no longer needed. +## Completed Tasks -## Context +✅ Core migration changes with compatibility check +✅ Updated `set_vt_batch` to use RETURNS TABLE (future-proof) +✅ Added optional `headers` field to TypeScript `PgmqMessageRecord` +✅ Updated all test mock messages +✅ Created changeset with breaking change warning +✅ Manual testing verified migration fails gracefully on pgmq 1.4.4 -### Current State -- **PGMQ Version**: 1.5.1 (confirmed via psql) -- **Supabase CLI**: v2.54.11 -- **Postgres**: 17 -- **Version Pin Removed**: Commit a71b371 removed PGMQ version constraint +## Remaining Tasks -### Why This Upgrade? +### 1. Create News Article -**PGMQ 1.5.0 Changes:** -- Added `headers` column to `message_record` type (now 6 columns: msg_id, read_ct, enqueued_at, vt, message, headers) -- Improved `read_with_poll` function with bug fixes +**File:** `pkgs/website/src/content/docs/news/pgmq-1-5-0-upgrade.mdx` (or similar) -**Supabase Realtime v2.35.4+ Changes:** -- Fixed issue #1369: Janitor now creates partitions immediately on startup -- Auto-creates 7 partitions (3 days back, today, 3 days forward) -- **Verified**: After `db reset`, partitions are created automatically +Create a news article announcing: +- pgflow 0.8.0 requires pgmq 1.5.0+ +- Breaking change details +- Migration instructions +- Benefits of the upgrade (future-proofing against pgmq changes) -### Breaking Changes -- Functions returning `SETOF pgmq.message_record` must return 6 columns (was 5) -- The `pgflow.read_with_poll` backport is obsolete -- The `pgflow_tests.create_realtime_partition` helper is obsolete +### 2. Update Supabase CLI Version Requirements in Docs ---- +**Files to review and update:** +- `pkgs/website/src/content/docs/get-started/installation.mdx` +- Other getting started guides +- Any tutorial pages mentioning Supabase CLI version -## [ ] Task 1: Fix set_vt_batch Function +**Action:** Update minimum Supabase CLI version requirement to the version that includes pgmq 1.5.0+ -**Priority**: HIGH (blocking 3 test files) +### 3. Update READMEs -### Why -The function returns `SETOF pgmq.message_record` but only returns 5 columns. PGMQ 1.5.0+ expects 6 columns (added `headers`). - -### Current Error -``` -ERROR: structure of query does not match function result type -DETAIL: Number of returned columns (5) does not match expected column count (6). -``` - -### What to Change - -**File**: `pkgs/core/schemas/0110_function_set_vt_batch.sql` - -**Line 49-53**, change: -```sql -RETURNING q.msg_id, - q.read_ct, - q.enqueued_at, - q.vt, - q.message -``` - -To: -```sql -RETURNING q.msg_id, - q.read_ct, - q.enqueued_at, - q.vt, - q.message, - q.headers -``` - -### Affected Tests -- `supabase/tests/set_vt_batch/basic_batch_update.test.sql` -- `supabase/tests/set_vt_batch/queue_security.test.sql` -- `supabase/tests/set_vt_batch/return_format.test.sql` - -### Verification -```bash -./scripts/run-test-with-colors supabase/tests/set_vt_batch/*.test.sql -``` - ---- - -## [ ] Task 2: Remove read_with_poll Backport - -**Priority**: MEDIUM (cleanup, not blocking) - -### Why -The `pgflow.read_with_poll` function was a backport from PGMQ 1.5.0 with headers removed for 1.4.4 compatibility. The comment in the file says: - -> "This is a backport of the pgmq.read_with_poll function from version 1.5.0. It is required because it fixes a bug with high CPU usage and Supabase is still using version 1.4.4. It is slightly modified (removed headers which are not available in 1.4.4). **This will be removed once Supabase upgrades to 1.5.0 or higher.**" - -We're now on 1.5.1, so this should be removed. - -### What to Delete - -**File**: `pkgs/core/schemas/0080_function_read_with_poll.sql` (entire file) - -### Migration Impact -**IMPORTANT**: Do NOT edit migration files manually. Existing migrations that reference this function will remain unchanged (they're historical). - -The deletion of `schemas/0080_function_read_with_poll.sql` will cause Atlas to generate a `DROP FUNCTION pgflow.read_with_poll` statement in the new migration (Task 5). - -Historical migrations that created this function: -- `supabase/migrations/20250429164909_pgflow_initial.sql` (CREATE FUNCTION statement) -- `supabase/migrations/20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql` (DROP + CREATE) -- `supabase/migrations/20250627090700_pgflow_fix_function_search_paths.sql` (DROP + CREATE) - -These remain untouched - migrations are append-only. - ---- - -## [ ] Task 3: Update Code to Use pgmq.read_with_poll - -**Priority**: MEDIUM (required after Task 2) - -### Why -After removing `pgflow.read_with_poll`, code needs to call PGMQ's native `pgmq.read_with_poll()` instead. - -### Affected Packages -- `pkgs/core` - SQL client + tests -- `pkgs/edge-worker` - Queue implementation - -### What to Change - -#### 3.1 Core Package - TypeScript Client - -**File**: `pkgs/core/src/PgflowSqlClient.ts` (line 29) - -Change: -```typescript -FROM pgflow.read_with_poll( -``` - -To: -```typescript -FROM pgmq.read_with_poll( -``` - -#### 3.2 Core Package - Test Files - -The following test files call `pgflow.read_with_poll` and need updating: - -**Location**: `pkgs/core/supabase/tests/start_tasks/` -``` -builds_proper_input_from_deps_outputs.test.sql -multiple_task_processing.test.sql -returns_task_index.test.sql -started_at_timestamps.test.sql -status_transitions.test.sql -task_index_returned_correctly.test.sql -worker_tracking.test.sql -basic_start_tasks.test.sql -``` - -Bulk update command: -```bash -cd pkgs/core -rg -l "pgflow\.read_with_poll" supabase/tests/ | xargs sed -i 's/pgflow\.read_with_poll/pgmq.read_with_poll/g' -``` - -#### 3.3 Edge Worker Package - Queue Class - -**File**: `pkgs/edge-worker/src/queue/Queue.ts` (line 82) - -Change: -```typescript -return await this.sql[]>` - SELECT * - FROM pgflow.read_with_poll( - queue_name => ${this.queueName}, - vt => ${visibilityTimeout}, - qty => ${batchSize}, - max_poll_seconds => ${maxPollSeconds}, - poll_interval_ms => ${pollIntervalMs} - ); -`; -``` - -To: -```typescript -return await this.sql[]>` - SELECT * - FROM pgmq.read_with_poll( - queue_name => ${this.queueName}, - vt => ${visibilityTimeout}, - qty => ${batchSize}, - max_poll_seconds => ${maxPollSeconds}, - poll_interval_ms => ${pollIntervalMs} - ); -`; -``` - -### Verification - -#### Verify Core Package -```bash -cd pkgs/core -./scripts/run-test-with-colors supabase/tests/start_tasks/*.test.sql -``` - -#### Verify Edge Worker Package -```bash -cd pkgs/edge-worker -pnpm nx test:unit edge-worker -pnpm nx test:integration edge-worker -``` - ---- - -## [ ] Task 4: Remove Realtime Partition Helpers - -**Priority**: LOW (cleanup, not blocking) - -### Why -Supabase Realtime v2.35.4+ fixed issue #1369. The janitor now creates partitions immediately on startup. **Verified**: After `pnpm supabase db reset`, 7 partitions are created automatically (3 days back, today, 3 days forward). - -The helper function was a workaround that is no longer needed. - -### What to Delete - -#### 4.1 Helper Function - -**File**: `supabase/seed.sql` (lines ~540-580) - -Delete the entire `pgflow_tests.create_realtime_partition()` function definition. - -#### 4.2 Helper Test File - -**File**: `supabase/tests/pgflow_tests/create_realtime_partition.test.sql` (entire file) - -This test fails with: -``` -ERROR: must be owner of table messages_2025_11_02 -``` - -The test tries to DROP partitions owned by `supabase_admin`, which causes permission errors. Since the helper is obsolete, delete the test. - -#### 4.3 Remove Helper Calls from Tests - -The following test files call `pgflow_tests.create_realtime_partition()` at the start: - -``` -supabase/tests/regressions/step_failed_event_bug.test.sql (line 5) -supabase/tests/realtime/start_ready_steps_events.test.sql (line 7) -supabase/tests/realtime/complete_task_events.test.sql (line 5) -supabase/tests/realtime/start_flow_events.test.sql (line 5) -supabase/tests/realtime/maybe_complete_run_events.test.sql (line 5) -supabase/tests/realtime/full_flow_events.test.sql (line 5) -supabase/tests/realtime/fail_task_events.test.sql (line 5) -supabase/tests/map_output_aggregation/broadcast_event_fixed.test.sql (line 11) -``` - -Remove the `select pgflow_tests.create_realtime_partition();` lines from all these files. - -### Verification - -#### Verify partitions auto-create: -```bash -pnpm supabase db reset -PGPASSWORD=postgres psql -h 127.0.0.1 -p 54322 -U postgres -d postgres -c \ - "SELECT COUNT(*) as partition_count FROM pg_tables WHERE schemaname = 'realtime' AND tablename LIKE 'messages_%'" -``` - -Expected: ~7 partitions - -#### Verify tests still pass: -```bash -./scripts/run-test-with-colors supabase/tests/realtime/*.test.sql -./scripts/run-test-with-colors supabase/tests/map_output_aggregation/broadcast_event_fixed.test.sql -./scripts/run-test-with-colors supabase/tests/regressions/step_failed_event_bug.test.sql -``` - ---- - -## [ ] Task 5: Generate Migration from Schema Changes - -**Priority**: HIGH (required after schema changes) - -### Why -Tasks 1 and 2 modify schema files in `pkgs/core/schemas/`. We need to generate a new migration from these schema changes. **NEVER manually edit migrations** - always generate them from schema files. - -### Schema Files Modified -- `schemas/0110_function_set_vt_batch.sql` - Added `headers` column to RETURNING -- `schemas/0080_function_read_with_poll.sql` - Deleted (entire file) - -### Migration Generation Process - -Follow the workflow from `.claude/reference/schema_development.md`: - -```bash -cd pkgs/core - -# Generate migration from schema changes -./scripts/atlas-migrate-diff upgrade_pgmq_1_5_1 # NO pgflow_ prefix (auto-added) - -# This creates: supabase/migrations/TIMESTAMP_pgflow_upgrade_pgmq_1_5_1.sql - -# Verify migration is valid -pnpm nx verify-migrations core - -# Test that migration applies cleanly -pnpm nx supabase:reset core -pnpm nx test:pgtap core -``` - -### Expected Migration Contents -The generated migration should contain: -- `DROP FUNCTION pgflow.read_with_poll(...)` - Removing backport -- Updated `CREATE FUNCTION pgflow.set_vt_batch(...)` - With `headers` in RETURNING clause -- Updated `atlas.sum` checksums - -### Verification -```bash -# Clean slate test -pnpm nx supabase:reset core - -# All tests should pass -pnpm nx test:pgtap core - -# Specifically verify the previously failing tests -./scripts/run-test-with-colors supabase/tests/set_vt_batch/*.test.sql -``` - -All tests should pass after migrations are applied. - -### Migration Sync to Other Packages - -After generating the migration in core, it needs to be synced to other packages: - -#### Auto-sync (via Nx tasks) -These packages automatically copy migrations from core when their Supabase tasks run: - -**pkgs/client** -- Syncs via: `nx run client:supabase:prepare` -- Copies: `../core/supabase/migrations/*.sql` → `supabase/migrations/` -- Runs before: Supabase database operations - -**pkgs/edge-worker** -- Syncs via: `nx run edge-worker:supabase:reset` -- Copies: `../core/supabase/migrations/*.sql` → `supabase/migrations/` -- Runs during: `pnpm nx supabase:reset edge-worker` - -**No action needed** - these will pick up the new migration automatically on next run. - -#### Manual sync required - -**examples/playground** -- Has diverged migrations (not identical to core) -- No auto-sync mechanism -- Must manually copy the new migration file - -```bash -# After generating migration in core, copy to playground -cp pkgs/core/supabase/migrations/*_pgflow_upgrade_pgmq_1_5_1.sql \ - examples/playground/supabase/migrations/ -``` - -### Important Notes -- Migration name: `upgrade_pgmq_1_5_1` (Atlas auto-adds `pgflow_` prefix) -- Do NOT use `TEMP_` prefix (this is final migration for merging to main) -- Commit both schema changes AND generated migration together -- Remember to copy migration to playground manually - ---- - -## [ ] Task 6: Update Documentation - -**Priority**: LOW (good practice) - -### What to Update - -#### 6.1 CHANGELOG.md -Add entry under `[Unreleased]`: -```markdown -### Changed -- Upgraded to PGMQ 1.5.1, adding support for message headers -- Removed `pgflow.read_with_poll` backport (use `pgmq.read_with_poll` instead) -- Removed `pgflow_tests.create_realtime_partition` helper (partitions auto-created by Realtime v2.35.4+) - -### Breaking Changes -- Requires PGMQ 1.5.0 or higher (Supabase ships 1.5.1 by default) -- Code calling `pgflow.read_with_poll` must update to `pgmq.read_with_poll` -``` - -#### 6.2 Documentation Files - -Update references from `pgflow.read_with_poll` to `pgmq.read_with_poll` in: - -**Website docs**: -- `pkgs/website/src/content/docs/reference/queue-worker/configuration.mdx` - -**Architecture diagrams**: -- `pkgs/website/src/assets/architecture-diagrams/task-execution.mermaid` -- `pkgs/core/assets/flow-lifecycle.mermaid` -- `pkgs/core/assets/flow-lifecycle.svg` - -**Guides**: -- `ARCHITECTURE_GUIDE.md` - -**Package READMEs**: +**Files to review and update:** +- Root `README.md` - `pkgs/core/README.md` -- `pkgs/core/CHANGELOG.md` -- `pkgs/edge-worker/CHANGELOG.md` -- `pkgs/website/CHANGELOG.md` - -Use global search to find all references: -```bash -rg "pgflow\.read_with_poll" --type md --type mermaid -``` - -#### 6.3 Migration Guide (if needed) -If this affects external users, add a migration guide to the website docs explaining: -- How to update from PGMQ 1.4.4 to 1.5.1 -- What code changes are needed -- Why the breaking changes were made - ---- - -## [ ] Task 7: Final Verification - -**Priority**: CRITICAL (before merging) - -### Full Test Suite -```bash -cd pkgs/core -pnpm nx test core -``` - -This runs: -- All pgTAP tests -- All Vitest tests -- Type checks - -### Specific Regression Tests - -#### 7.1 set_vt_batch -```bash -./scripts/run-test-with-colors supabase/tests/set_vt_batch/*.test.sql -``` -Expected: All pass (previously failing) - -#### 7.2 start_tasks (uses read_with_poll) -```bash -./scripts/run-test-with-colors supabase/tests/start_tasks/*.test.sql -``` -Expected: All pass - -#### 7.3 Realtime (no longer needs partition helper) -```bash -./scripts/run-test-with-colors supabase/tests/realtime/*.test.sql -``` -Expected: All pass - -### Edge Cases - -#### Fresh Install Test -```bash -pnpm supabase db reset -# Immediately test realtime.send without waiting -PGPASSWORD=postgres psql -h 127.0.0.1 -p 54322 -U postgres -d postgres -c \ - "SELECT realtime.send(jsonb_build_object('test', 'data'), 'test', 'test', false)" -# Check message was stored -PGPASSWORD=postgres psql -h 127.0.0.1 -p 54322 -U postgres -d postgres -c \ - "SELECT COUNT(*) FROM realtime.messages WHERE payload->>'test' = 'data'" -``` -Expected: Message successfully stored (partition exists) - ---- - -## Summary Checklist - -Before creating PR: -- [ ] All schema changes completed (Tasks 1-2) -- [ ] All code updates completed (Task 3) -- [ ] All cleanup completed (Task 4) -- [ ] Migrations regenerated (Task 5) -- [ ] Documentation updated (Task 6) -- [ ] All tests passing (Task 7) -- [ ] No references to `pgflow.read_with_poll` remain -- [ ] No references to `create_realtime_partition` remain -- [ ] Fresh `db reset` creates partitions automatically - -## Files Changed Summary - -### pkgs/core - -**Modified**: -- `schemas/0110_function_set_vt_batch.sql` - Add headers column to RETURNING -- `src/PgflowSqlClient.ts` - Change pgflow.read_with_poll → pgmq.read_with_poll -- `supabase/tests/start_tasks/*.test.sql` (8 files) - Change pgflow.read_with_poll → pgmq.read_with_poll -- `supabase/tests/realtime/*.test.sql` (5 files) - Remove partition helper calls -- `supabase/tests/map_output_aggregation/broadcast_event_fixed.test.sql` - Remove partition helper call -- `supabase/tests/regressions/step_failed_event_bug.test.sql` - Remove partition helper call -- `CHANGELOG.md` - Document changes -- `README.md` - Update references -- `assets/flow-lifecycle.mermaid` - Update diagram -- `assets/flow-lifecycle.svg` - Update diagram - -**Deleted**: -- `schemas/0080_function_read_with_poll.sql` - Backport no longer needed -- `supabase/tests/pgflow_tests/create_realtime_partition.test.sql` - Helper test obsolete -- `supabase/seed.sql` - Remove `create_realtime_partition` function (~40 lines) - -**Generated**: -- New migration file via Atlas: `supabase/migrations/*_pgflow_upgrade_pgmq_1_5_1.sql` -- Updated `supabase/migrations/atlas.sum` +- `pkgs/edge-worker/README.md` +- `pkgs/cli/README.md` +- Any other package READMEs mentioning Supabase versions -### pkgs/edge-worker +**Action:** Ensure all READMEs mention the pgmq 1.5.0+ requirement -**Modified**: -- `src/queue/Queue.ts` - Change pgflow.read_with_poll → pgmq.read_with_poll (line 82) -- `CHANGELOG.md` - Document changes +### 4. Improve Update pgflow Docs Page -**Auto-synced** (via Nx tasks): -- `supabase/migrations/*` - Auto-copied from core on `nx supabase:reset` +**File:** Look for existing update/upgrade documentation page -### pkgs/client +**Actions:** +- Add section about breaking changes in 0.8.0 +- Document migration path from 0.7.x to 0.8.0 +- Include pgmq version check instructions +- Add troubleshooting section for migration failures -**Auto-synced** (via Nx tasks): -- `supabase/migrations/*` - Auto-copied from core on `nx supabase:prepare` +### 5. Review All Docs Pages for Version References -### pkgs/website +**Action:** Comprehensive audit of all documentation for: +- Outdated Supabase CLI version numbers +- Missing pgmq version requirements +- Installation/setup instructions that need updating +- Migration guides that need breaking change warnings -**Modified**: -- `src/content/docs/reference/queue-worker/configuration.mdx` - Update references -- `src/assets/architecture-diagrams/task-execution.mermaid` - Update diagram -- `CHANGELOG.md` - Document changes +**Files to check:** +- All files in `pkgs/website/src/content/docs/` +- All READMEs across packages +- Any deployment guides +- Troubleshooting pages -### examples/playground +## Testing Checklist -**Manual sync required**: -- `supabase/migrations/*_pgflow_upgrade_pgmq_1_5_1.sql` - Copy from core manually +After documentation updates: +- [ ] Build website locally and verify all pages render correctly +- [ ] Check all internal links still work +- [ ] Verify code examples are still accurate +- [ ] Review for consistency in version numbering -### Root +## Notes -**Modified**: -- `ARCHITECTURE_GUIDE.md` - Update references +- Keep documentation aligned with MVP philosophy (concise, clear, actionable) +- Follow Diataxis framework for documentation organization +- Use clear warnings for breaking changes +- Provide migration instructions, not just "upgrade" diff --git a/examples/playground/supabase/migrations/20251102211603_20251102201302_pgflow_upgrade_pgmq_1_5_1.sql b/examples/playground/supabase/migrations/20251102211603_20251102201302_pgflow_upgrade_pgmq_1_5_1.sql new file mode 100644 index 000000000..ed67df611 --- /dev/null +++ b/examples/playground/supabase/migrations/20251102211603_20251102201302_pgflow_upgrade_pgmq_1_5_1.sql @@ -0,0 +1,93 @@ +-- Migration tested 2025-11-02: +-- Successfully verified that this migration fails on pgmq 1.4.4 (Supabase CLI 2.0.2) +-- with clear error message guiding users to upgrade pgmq to 1.5.0+ +-- +-- Compatibility check: Ensure pgmq.message_record has headers column (pgmq 1.5.0+) +DO $$ +DECLARE + has_headers BOOLEAN; +BEGIN + SELECT EXISTS ( + SELECT 1 + FROM pg_type t + JOIN pg_namespace n ON t.typnamespace = n.oid + JOIN pg_attribute a ON a.attrelid = t.typrelid + WHERE n.nspname = 'pgmq' + AND t.typname = 'message_record' + AND a.attname = 'headers' + AND a.attnum > 0 + AND NOT a.attisdropped + ) INTO has_headers; + + IF NOT has_headers THEN + RAISE EXCEPTION E'INCOMPATIBLE PGMQ VERSION DETECTED\n\n' + 'This migration is part of pgflow 0.8.0+, which requires pgmq 1.5.0 or higher.\n' + 'The pgmq.message_record type is missing the "headers" column, which indicates you are running pgmq < 1.5.0.\n\n' + 'pgflow 0.8.0+ is NOT compatible with pgmq versions below 1.5.0.\n\n' + 'Action required:\n' + ' - If using Supabase: Ensure you are running a recent version that includes pgmq 1.5.0+\n' + ' - If self-hosting: Upgrade pgmq to version 1.5.0 or higher before running this migration\n\n' + 'Migration aborted to prevent runtime failures.'; + END IF; +END $$; + +-- Modify "set_vt_batch" function +-- Must drop first because we're changing the return type from SETOF to TABLE +DROP FUNCTION IF EXISTS "pgflow"."set_vt_batch"(text, bigint[], integer[]); +CREATE FUNCTION "pgflow"."set_vt_batch" ( + "queue_name" text, + "msg_ids" bigint[], + "vt_offsets" integer[] +) +RETURNS TABLE( + msg_id bigint, + read_ct integer, + enqueued_at timestamp with time zone, + vt timestamp with time zone, + message jsonb, + headers jsonb +) +LANGUAGE plpgsql AS $$ +DECLARE + qtable TEXT := pgmq.format_table_name(queue_name, 'q'); + sql TEXT; +BEGIN + /* ---------- safety checks ---------------------------------------------------- */ + IF msg_ids IS NULL OR vt_offsets IS NULL OR array_length(msg_ids, 1) = 0 THEN + RETURN; -- nothing to do, return empty set + END IF; + + IF array_length(msg_ids, 1) IS DISTINCT FROM array_length(vt_offsets, 1) THEN + RAISE EXCEPTION + 'msg_ids length (%) must equal vt_offsets length (%)', + array_length(msg_ids, 1), array_length(vt_offsets, 1); + END IF; + + /* ---------- dynamic statement ------------------------------------------------ */ + /* One UPDATE joins with the unnested arrays */ + sql := format( + $FMT$ + WITH input (msg_id, vt_offset) AS ( + SELECT unnest($1)::bigint + , unnest($2)::int + ) + UPDATE pgmq.%I q + SET vt = clock_timestamp() + make_interval(secs => input.vt_offset), + read_ct = read_ct -- no change, but keeps RETURNING list aligned + FROM input + WHERE q.msg_id = input.msg_id + RETURNING q.msg_id, + q.read_ct, + q.enqueued_at, + q.vt, + q.message, + q.headers + $FMT$, + qtable + ); + + RETURN QUERY EXECUTE sql USING msg_ids, vt_offsets; +END; +$$; +-- Drop "read_with_poll" function +DROP FUNCTION "pgflow"."read_with_poll"; diff --git a/pkgs/client/__tests__/helpers/permissions.ts b/pkgs/client/__tests__/helpers/permissions.ts index d3d56d6e2..30c369f6e 100644 --- a/pkgs/client/__tests__/helpers/permissions.ts +++ b/pkgs/client/__tests__/helpers/permissions.ts @@ -9,9 +9,6 @@ import type { Sql } from 'postgres'; export async function grantMinimalPgflowPermissions(sql: Sql) { // Clean up any leftover data from previous tests // await sql`SELECT pgflow_tests.reset_db()`; // MOVED TO ONE-TIME SETUP - - // Ensure realtime partition exists (required after db reset) - safe to call multiple times - try { await sql`SELECT pgflow_tests.create_realtime_partition()`; } catch { /* ignore errors */ } // Grant minimal permissions to service_role (used by tests) - ignore if already granted try { await sql`GRANT USAGE ON SCHEMA pgflow TO service_role`; } catch { /* ignore errors */ } diff --git a/pkgs/client/__tests__/integration/realtime-send.test.ts b/pkgs/client/__tests__/integration/realtime-send.test.ts index 077a8cdfe..f1c5e6caf 100644 --- a/pkgs/client/__tests__/integration/realtime-send.test.ts +++ b/pkgs/client/__tests__/integration/realtime-send.test.ts @@ -6,10 +6,7 @@ describe('Realtime Send Integration', () => { it( 'receives events sent via realtime.send() SQL function', withPgNoTransaction(async (sql) => { - // Create realtime partition using clean core function - await sql`SELECT pgflow_tests.create_realtime_partition()`; - - // 2. Create Supabase client with real credentials + // 1. Create Supabase client with real credentials const supabaseClient = createTestSupabaseClient(); const testChannel = 'test-realtime-sql-send'; diff --git a/pkgs/client/package.json b/pkgs/client/package.json index 313d9a0ee..3f7a0fe76 100644 --- a/pkgs/client/package.json +++ b/pkgs/client/package.json @@ -44,7 +44,7 @@ "@pgflow/dsl": "workspace:*", "@types/uuid": "^10.0.0", "postgres": "^3.4.5", - "supabase": "2.21.1", + "supabase": "^2.34.3", "terser": "^5.43.0", "vite-plugin-dts": "~3.8.1", "vitest": "1.3.1" diff --git a/pkgs/client/project.json b/pkgs/client/project.json index 6254edd44..d8f969c3c 100644 --- a/pkgs/client/project.json +++ b/pkgs/client/project.json @@ -62,6 +62,7 @@ "executor": "nx:run-commands", "local": true, "cache": false, + "dependsOn": ["supabase:prepare"], "options": { "cwd": "{projectRoot}", "commands": [ @@ -147,7 +148,6 @@ "commands": [ "../../scripts/supabase-start-locked.sh .", "psql 'postgresql://postgres:postgres@localhost:50522/postgres' -c 'SELECT pgflow_tests.reset_db()'", - "psql 'postgresql://postgres:postgres@localhost:50522/postgres' -c 'SELECT pgflow_tests.create_realtime_partition()'", "vitest run __tests__/integration/" ], "parallel": false @@ -173,7 +173,6 @@ "commands": [ "../../scripts/supabase-start-locked.sh .", "psql 'postgresql://postgres:postgres@localhost:50522/postgres' -c 'SELECT pgflow_tests.reset_db()'", - "psql 'postgresql://postgres:postgres@localhost:50522/postgres' -c 'SELECT pgflow_tests.create_realtime_partition()'", "vitest run __tests__/" ], "parallel": false @@ -194,7 +193,6 @@ "commands": [ "../../scripts/supabase-start-locked.sh .", "psql 'postgresql://postgres:postgres@localhost:50522/postgres' -c 'SELECT pgflow_tests.reset_db()'", - "psql 'postgresql://postgres:postgres@localhost:50522/postgres' -c 'SELECT pgflow_tests.create_realtime_partition()'", "node scripts/performance-benchmark.mjs" ], "parallel": false diff --git a/pkgs/client/scripts/performance-benchmark.mjs b/pkgs/client/scripts/performance-benchmark.mjs index 6a00c6142..fbd0b7db0 100644 --- a/pkgs/client/scripts/performance-benchmark.mjs +++ b/pkgs/client/scripts/performance-benchmark.mjs @@ -49,9 +49,6 @@ function getMemoryUsage() { } async function grantMinimalPgflowPermissions(sql) { - // Ensure realtime partition exists - try { await sql`SELECT pgflow_tests.create_realtime_partition()`; } catch { /* ignore errors */ } - // Grant minimal permissions to service_role try { await sql`GRANT USAGE ON SCHEMA pgflow TO service_role`; } catch { /* ignore errors */ } try { await sql`GRANT EXECUTE ON FUNCTION pgflow.start_flow_with_states(text, jsonb, uuid) TO service_role`; } catch { /* ignore errors */ } diff --git a/pkgs/client/vitest.global-setup.ts b/pkgs/client/vitest.global-setup.ts index 20ee7160b..dffdbad7c 100644 --- a/pkgs/client/vitest.global-setup.ts +++ b/pkgs/client/vitest.global-setup.ts @@ -46,10 +46,6 @@ export async function setup() { throw pgError; } - console.log('[GLOBAL SETUP] Creating realtime partition...'); - await sql`SELECT pgflow_tests.create_realtime_partition()`; - console.log('[GLOBAL SETUP] ✓ Realtime partition created'); - console.log('[GLOBAL SETUP] Setting up broadcast listener...'); channel.on('broadcast', { event: '*' }, (p) => { console.log('[GLOBAL SETUP] Received broadcast event:', p); diff --git a/pkgs/core/README.md b/pkgs/core/README.md index 1434126c3..d046d4dac 100644 --- a/pkgs/core/README.md +++ b/pkgs/core/README.md @@ -140,6 +140,7 @@ SELECT pgflow.add_step( #### Root Map vs Dependent Map **Root Map Steps** process the flow's input array directly: + ```sql -- Root map: no dependencies, processes flow input SELECT pgflow.add_step( @@ -156,6 +157,7 @@ SELECT pgflow.start_flow( ``` **Dependent Map Steps** process another step's array output: + ```sql -- Dependent map: processes the array from 'fetch_items' SELECT pgflow.add_step( @@ -169,6 +171,7 @@ SELECT pgflow.add_step( #### Edge Cases and Special Behaviors 1. **Empty Array Cascade**: When a map step receives an empty array (`[]`): + - The SQL core completes it immediately without creating tasks - The completed map step outputs an empty array - Any dependent map steps also receive empty arrays and complete immediately @@ -184,12 +187,14 @@ SELECT pgflow.add_step( #### Implementation Details Map steps utilize several database fields for state management: + - `initial_tasks`: Number of tasks to create (NULL until array size is known) - `remaining_tasks`: Tracks incomplete tasks for the step - `task_index`: Identifies which array element each task processes - `step_type`: Column value 'map' triggers map behavior The aggregation process ensures: + - **Order Preservation**: Task outputs maintain array element ordering - **NULL Handling**: NULL outputs are included in the aggregated array - **Atomicity**: Aggregation occurs within the same transaction as task completion @@ -262,8 +267,9 @@ When a workflow starts: The Edge Worker uses a two-phase approach to retrieve and start tasks: **Phase 1 - Reserve Messages:** + ```sql -SELECT * FROM pgflow.read_with_poll( +SELECT * FROM pgmq.read_with_poll( queue_name => 'analyze_website', vt => 60, -- visibility timeout in seconds qty => 5 -- maximum number of messages to fetch @@ -271,6 +277,7 @@ SELECT * FROM pgflow.read_with_poll( ``` **Phase 2 - Start Tasks:** + ```sql SELECT * FROM pgflow.start_tasks( flow_slug => 'analyze_website', @@ -379,6 +386,7 @@ Timeouts are enforced by setting the message visibility timeout to the step's ti The SQL Core is the DAG orchestration engine that handles dependency resolution, step state management, and task spawning. However, workflows are defined using the TypeScript Flow DSL, which compiles user intent into the SQL primitives that populate the definition tables (`flows`, `steps`, `deps`). See the [@pgflow/dsl package](../dsl/README.md) for complete documentation on: + - Expressing workflows with type-safe method chaining - Step types (`.step()`, `.array()`, `.map()`) - Compilation to SQL migrations @@ -441,6 +449,7 @@ Map step tasks receive a fundamentally different input structure than single ste ``` This means: + - Map handlers process individual elements in isolation - Map handlers cannot access the original flow input (`run`) - Map handlers cannot access other dependencies @@ -456,8 +465,10 @@ When a step depends on a map step, it receives the aggregated array output: // A step depending on 'process_users' receives: { - "run": { /* original flow input */ }, - "process_users": [{"name": "Alice"}, {"name": "Bob"}] // Full array + "run": { + /* original flow input */ + }, + "process_users": [{ "name": "Alice" }, { "name": "Bob" }] // Full array } ``` diff --git a/pkgs/core/package.json b/pkgs/core/package.json index a62eb8432..8364b6986 100644 --- a/pkgs/core/package.json +++ b/pkgs/core/package.json @@ -20,7 +20,7 @@ }, "devDependencies": { "@types/node": "^22.14.1", - "supabase": "2.54.11" + "supabase": "^2.34.3" }, "dependencies": { "@pgflow/dsl": "workspace:*", diff --git a/pkgs/core/schemas/0080_function_read_with_poll.sql b/pkgs/core/schemas/0080_function_read_with_poll.sql deleted file mode 100644 index 35eef6e57..000000000 --- a/pkgs/core/schemas/0080_function_read_with_poll.sql +++ /dev/null @@ -1,72 +0,0 @@ --------------------------------------------------------------------------------- --- Read With Poll -------------------------------------------------------------- --- -- --- This is a backport of the pgmq.read_with_poll function from version 1.5.0 -- --- It is required because it fixes a bug with high CPU usage and Supabase -- --- is still using version 1.4.4. -- --- -- --- It is slightly modified (removed headers which are not available in 1.4.4) -- --- -- --- This will be removed once Supabase upgrades to 1.5.0 or higher. -- --------------------------------------------------------------------------------- -create function pgflow.read_with_poll( - queue_name TEXT, - vt INTEGER, - qty INTEGER, - max_poll_seconds INTEGER default 5, - poll_interval_ms INTEGER default 100, - conditional JSONB default '{}' -) -returns setof PGMQ.MESSAGE_RECORD -set search_path = '' -as $$ -DECLARE - r pgmq.message_record; - stop_at TIMESTAMP; - sql TEXT; - qtable TEXT := pgmq.format_table_name(queue_name, 'q'); -BEGIN - stop_at := clock_timestamp() + make_interval(secs => max_poll_seconds); - LOOP - IF (SELECT clock_timestamp() >= stop_at) THEN - RETURN; - END IF; - - sql := FORMAT( - $QUERY$ - WITH cte AS - ( - SELECT msg_id - FROM pgmq.%I - WHERE vt <= clock_timestamp() AND CASE - WHEN %L != '{}'::jsonb THEN (message @> %2$L)::integer - ELSE 1 - END = 1 - ORDER BY msg_id ASC - LIMIT $1 - FOR UPDATE SKIP LOCKED - ) - UPDATE pgmq.%I m - SET - vt = clock_timestamp() + %L, - read_ct = read_ct + 1 - FROM cte - WHERE m.msg_id = cte.msg_id - RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.message; - $QUERY$, - qtable, conditional, qtable, make_interval(secs => vt) - ); - - FOR r IN - EXECUTE sql USING qty - LOOP - RETURN NEXT r; - END LOOP; - IF FOUND THEN - RETURN; - ELSE - PERFORM pg_sleep(poll_interval_ms::numeric / 1000); - END IF; - END LOOP; -END; -$$ language plpgsql; diff --git a/pkgs/core/schemas/0110_function_set_vt_batch.sql b/pkgs/core/schemas/0110_function_set_vt_batch.sql index a1fdcab2e..1e11c08ed 100644 --- a/pkgs/core/schemas/0110_function_set_vt_batch.sql +++ b/pkgs/core/schemas/0110_function_set_vt_batch.sql @@ -15,7 +15,14 @@ create or replace function pgflow.set_vt_batch( msg_ids BIGINT [], vt_offsets INTEGER [] ) -returns setof PGMQ.MESSAGE_RECORD +returns table ( + msg_id BIGINT, + read_ct INTEGER, + enqueued_at TIMESTAMP WITH TIME ZONE, + vt TIMESTAMP WITH TIME ZONE, + message JSONB, + headers JSONB +) language plpgsql as $$ DECLARE @@ -50,7 +57,8 @@ BEGIN q.read_ct, q.enqueued_at, q.vt, - q.message + q.message, + q.headers $FMT$, qtable ); diff --git a/pkgs/core/src/PgflowSqlClient.ts b/pkgs/core/src/PgflowSqlClient.ts index 47a80bb06..e32e701b1 100644 --- a/pkgs/core/src/PgflowSqlClient.ts +++ b/pkgs/core/src/PgflowSqlClient.ts @@ -26,7 +26,7 @@ export class PgflowSqlClient ): Promise { return await this.sql` SELECT * - FROM pgflow.read_with_poll( + FROM pgmq.read_with_poll( queue_name => ${queueName}, vt => ${visibilityTimeout}, qty => ${batchSize}, diff --git a/pkgs/core/src/database-types.ts b/pkgs/core/src/database-types.ts index 179139ddb..f2e7d3326 100644 --- a/pkgs/core/src/database-types.ts +++ b/pkgs/core/src/database-types.ts @@ -348,14 +348,14 @@ export type Database = { Functions: { add_step: { Args: { - step_type?: string - flow_slug: string - step_slug: string + base_delay?: number deps_slugs?: string[] + flow_slug: string max_attempts?: number - base_delay?: number - timeout?: number start_delay?: number + step_slug: string + step_type?: string + timeout?: number } Returns: { created_at: string @@ -369,6 +369,12 @@ export type Database = { step_slug: string step_type: string } + SetofOptions: { + from: "*" + to: "steps" + isOneToOne: true + isSetofReturn: false + } } calculate_retry_delay: { Args: { attempts_count: number; base_delay: number } @@ -380,10 +386,10 @@ export type Database = { } complete_task: { Args: { + output: Json run_id: string step_slug: string task_index: number - output: Json } Returns: { attempts_count: number @@ -401,13 +407,19 @@ export type Database = { step_slug: string task_index: number }[] + SetofOptions: { + from: "*" + to: "step_tasks" + isOneToOne: false + isSetofReturn: true + } } create_flow: { Args: { - timeout?: number + base_delay?: number flow_slug: string max_attempts?: number - base_delay?: number + timeout?: number } Returns: { created_at: string @@ -416,13 +428,19 @@ export type Database = { opt_max_attempts: number opt_timeout: number } + SetofOptions: { + from: "*" + to: "flows" + isOneToOne: true + isSetofReturn: false + } } fail_task: { Args: { + error_message: string run_id: string step_slug: string task_index: number - error_message: string } Returns: { attempts_count: number @@ -440,46 +458,45 @@ export type Database = { step_slug: string task_index: number }[] + SetofOptions: { + from: "*" + to: "step_tasks" + isOneToOne: false + isSetofReturn: true + } } - get_run_with_states: { - Args: { run_id: string } - Returns: Json - } - is_valid_slug: { - Args: { slug: string } - Returns: boolean - } - maybe_complete_run: { - Args: { run_id: string } - Returns: undefined - } + get_run_with_states: { Args: { run_id: string }; Returns: Json } + is_valid_slug: { Args: { slug: string }; Returns: boolean } + maybe_complete_run: { Args: { run_id: string }; Returns: undefined } poll_for_tasks: { Args: { - vt: number - poll_interval_ms?: number max_poll_seconds?: number + poll_interval_ms?: number qty: number queue_name: string + vt: number } Returns: Database["pgflow"]["CompositeTypes"]["step_task_record"][] - } - read_with_poll: { - Args: { - qty: number - queue_name: string - vt: number - conditional?: Json - max_poll_seconds?: number - poll_interval_ms?: number + SetofOptions: { + from: "*" + to: "step_task_record" + isOneToOne: false + isSetofReturn: true } - Returns: Database["pgmq"]["CompositeTypes"]["message_record"][] } set_vt_batch: { - Args: { vt_offsets: number[]; queue_name: string; msg_ids: number[] } - Returns: Database["pgmq"]["CompositeTypes"]["message_record"][] + Args: { msg_ids: number[]; queue_name: string; vt_offsets: number[] } + Returns: { + enqueued_at: string + headers: Json + message: Json + msg_id: number + read_ct: number + vt: string + }[] } start_flow: { - Args: { flow_slug: string; run_id?: string; input: Json } + Args: { flow_slug: string; input: Json; run_id?: string } Returns: { completed_at: string | null failed_at: string | null @@ -491,18 +508,27 @@ export type Database = { started_at: string status: string }[] + SetofOptions: { + from: "*" + to: "runs" + isOneToOne: false + isSetofReturn: true + } } start_flow_with_states: { - Args: { flow_slug: string; run_id?: string; input: Json } + Args: { flow_slug: string; input: Json; run_id?: string } Returns: Json } - start_ready_steps: { - Args: { run_id: string } - Returns: undefined - } + start_ready_steps: { Args: { run_id: string }; Returns: undefined } start_tasks: { - Args: { worker_id: string; flow_slug: string; msg_ids: number[] } + Args: { flow_slug: string; msg_ids: number[]; worker_id: string } Returns: Database["pgflow"]["CompositeTypes"]["step_task_record"][] + SetofOptions: { + from: "*" + to: "step_task_record" + isOneToOne: false + isSetofReturn: true + } } } Enums: { @@ -547,129 +573,203 @@ export type Database = { [_ in never]: never } Functions: { - _belongs_to_pgmq: { - Args: { table_name: string } - Returns: boolean - } - _ensure_pg_partman_installed: { - Args: Record - Returns: undefined - } + _belongs_to_pgmq: { Args: { table_name: string }; Returns: boolean } + _ensure_pg_partman_installed: { Args: never; Returns: undefined } + _extension_exists: { Args: { extension_name: string }; Returns: boolean } _get_partition_col: { Args: { partition_interval: string } Returns: string } - _get_pg_partman_major_version: { - Args: Record - Returns: number - } - _get_pg_partman_schema: { - Args: Record - Returns: string - } - archive: { - Args: - | { msg_id: number; queue_name: string } - | { msg_ids: number[]; queue_name: string } - Returns: boolean - } + _get_pg_partman_major_version: { Args: never; Returns: number } + _get_pg_partman_schema: { Args: never; Returns: string } + archive: + | { Args: { msg_id: number; queue_name: string }; Returns: boolean } + | { Args: { msg_ids: number[]; queue_name: string }; Returns: number[] } convert_archive_partitioned: { Args: { - table_name: string - retention_interval?: string leading_partition?: number partition_interval?: string + retention_interval?: string + table_name: string } Returns: undefined } - create: { - Args: { queue_name: string } - Returns: undefined - } + create: { Args: { queue_name: string }; Returns: undefined } create_non_partitioned: { Args: { queue_name: string } Returns: undefined } create_partitioned: { Args: { - retention_interval?: string - queue_name: string partition_interval?: string + queue_name: string + retention_interval?: string } Returns: undefined } - create_unlogged: { - Args: { queue_name: string } - Returns: undefined - } - delete: { - Args: - | { msg_id: number; queue_name: string } - | { queue_name: string; msg_ids: number[] } - Returns: boolean - } - detach_archive: { - Args: { queue_name: string } - Returns: undefined - } - drop_queue: { - Args: { queue_name: string } - Returns: boolean - } + create_unlogged: { Args: { queue_name: string }; Returns: undefined } + delete: + | { Args: { msg_id: number; queue_name: string }; Returns: boolean } + | { Args: { msg_ids: number[]; queue_name: string }; Returns: number[] } + detach_archive: { Args: { queue_name: string }; Returns: undefined } + drop_queue: + | { Args: { queue_name: string }; Returns: boolean } + | { + Args: { partitioned: boolean; queue_name: string } + Returns: boolean + } format_table_name: { Args: { prefix: string; queue_name: string } Returns: string } list_queues: { - Args: Record + Args: never Returns: Database["pgmq"]["CompositeTypes"]["queue_record"][] + SetofOptions: { + from: "*" + to: "queue_record" + isOneToOne: false + isSetofReturn: true + } } metrics: { Args: { queue_name: string } Returns: Database["pgmq"]["CompositeTypes"]["metrics_result"] + SetofOptions: { + from: "*" + to: "metrics_result" + isOneToOne: true + isSetofReturn: false + } } metrics_all: { - Args: Record + Args: never Returns: Database["pgmq"]["CompositeTypes"]["metrics_result"][] + SetofOptions: { + from: "*" + to: "metrics_result" + isOneToOne: false + isSetofReturn: true + } } pop: { Args: { queue_name: string } Returns: Database["pgmq"]["CompositeTypes"]["message_record"][] + SetofOptions: { + from: "*" + to: "message_record" + isOneToOne: false + isSetofReturn: true + } } - purge_queue: { - Args: { queue_name: string } - Returns: number - } + purge_queue: { Args: { queue_name: string }; Returns: number } read: { - Args: { queue_name: string; qty: number; vt: number } + Args: { + conditional?: Json + qty: number + queue_name: string + vt: number + } Returns: Database["pgmq"]["CompositeTypes"]["message_record"][] + SetofOptions: { + from: "*" + to: "message_record" + isOneToOne: false + isSetofReturn: true + } } read_with_poll: { Args: { - qty: number + conditional?: Json max_poll_seconds?: number poll_interval_ms?: number + qty: number queue_name: string vt: number } Returns: Database["pgmq"]["CompositeTypes"]["message_record"][] - } - send: { - Args: { delay?: number; msg: Json; queue_name: string } - Returns: number[] - } - send_batch: { - Args: { delay?: number; queue_name: string; msgs: Json[] } - Returns: number[] - } + SetofOptions: { + from: "*" + to: "message_record" + isOneToOne: false + isSetofReturn: true + } + } + send: + | { + Args: { + delay: string + headers: Json + msg: Json + queue_name: string + } + Returns: number[] + } + | { + Args: { delay: string; msg: Json; queue_name: string } + Returns: number[] + } + | { Args: { msg: Json; queue_name: string }; Returns: number[] } + | { + Args: { delay: number; msg: Json; queue_name: string } + Returns: number[] + } + | { + Args: { headers: Json; msg: Json; queue_name: string } + Returns: number[] + } + | { + Args: { + delay: number + headers: Json + msg: Json + queue_name: string + } + Returns: number[] + } + send_batch: + | { + Args: { + delay: string + headers: Json[] + msgs: Json[] + queue_name: string + } + Returns: number[] + } + | { + Args: { delay: string; msgs: Json[]; queue_name: string } + Returns: number[] + } + | { Args: { msgs: Json[]; queue_name: string }; Returns: number[] } + | { + Args: { delay: number; msgs: Json[]; queue_name: string } + Returns: number[] + } + | { + Args: { headers: Json[]; msgs: Json[]; queue_name: string } + Returns: number[] + } + | { + Args: { + delay: number + headers: Json[] + msgs: Json[] + queue_name: string + } + Returns: number[] + } set_vt: { - Args: { queue_name: string; vt: number; msg_id: number } + Args: { msg_id: number; queue_name: string; vt: number } Returns: Database["pgmq"]["CompositeTypes"]["message_record"][] + SetofOptions: { + from: "*" + to: "message_record" + isOneToOne: false + isSetofReturn: true + } } - validate_queue_name: { - Args: { queue_name: string } - Returns: undefined - } + validate_queue_name: { Args: { queue_name: string }; Returns: undefined } } Enums: { [_ in never]: never @@ -681,6 +781,7 @@ export type Database = { enqueued_at: string | null vt: string | null message: Json | null + headers: Json | null } metrics_result: { queue_name: string | null @@ -689,6 +790,7 @@ export type Database = { oldest_msg_age_sec: number | null total_messages: number | null scrape_time: string | null + queue_visible_length: number | null } queue_record: { queue_name: string | null @@ -700,21 +802,25 @@ export type Database = { } } -type DefaultSchema = Database[Extract] +type DatabaseWithoutInternals = Omit + +type DefaultSchema = DatabaseWithoutInternals[Extract] export type Tables< DefaultSchemaTableNameOrOptions extends | keyof (DefaultSchema["Tables"] & DefaultSchema["Views"]) - | { schema: keyof Database }, + | { schema: keyof DatabaseWithoutInternals }, TableName extends DefaultSchemaTableNameOrOptions extends { - schema: keyof Database + schema: keyof DatabaseWithoutInternals } - ? keyof (Database[DefaultSchemaTableNameOrOptions["schema"]]["Tables"] & - Database[DefaultSchemaTableNameOrOptions["schema"]]["Views"]) + ? keyof (DatabaseWithoutInternals[DefaultSchemaTableNameOrOptions["schema"]]["Tables"] & + DatabaseWithoutInternals[DefaultSchemaTableNameOrOptions["schema"]]["Views"]) : never = never, -> = DefaultSchemaTableNameOrOptions extends { schema: keyof Database } - ? (Database[DefaultSchemaTableNameOrOptions["schema"]]["Tables"] & - Database[DefaultSchemaTableNameOrOptions["schema"]]["Views"])[TableName] extends { +> = DefaultSchemaTableNameOrOptions extends { + schema: keyof DatabaseWithoutInternals +} + ? (DatabaseWithoutInternals[DefaultSchemaTableNameOrOptions["schema"]]["Tables"] & + DatabaseWithoutInternals[DefaultSchemaTableNameOrOptions["schema"]]["Views"])[TableName] extends { Row: infer R } ? R @@ -732,14 +838,16 @@ export type Tables< export type TablesInsert< DefaultSchemaTableNameOrOptions extends | keyof DefaultSchema["Tables"] - | { schema: keyof Database }, + | { schema: keyof DatabaseWithoutInternals }, TableName extends DefaultSchemaTableNameOrOptions extends { - schema: keyof Database + schema: keyof DatabaseWithoutInternals } - ? keyof Database[DefaultSchemaTableNameOrOptions["schema"]]["Tables"] + ? keyof DatabaseWithoutInternals[DefaultSchemaTableNameOrOptions["schema"]]["Tables"] : never = never, -> = DefaultSchemaTableNameOrOptions extends { schema: keyof Database } - ? Database[DefaultSchemaTableNameOrOptions["schema"]]["Tables"][TableName] extends { +> = DefaultSchemaTableNameOrOptions extends { + schema: keyof DatabaseWithoutInternals +} + ? DatabaseWithoutInternals[DefaultSchemaTableNameOrOptions["schema"]]["Tables"][TableName] extends { Insert: infer I } ? I @@ -755,14 +863,16 @@ export type TablesInsert< export type TablesUpdate< DefaultSchemaTableNameOrOptions extends | keyof DefaultSchema["Tables"] - | { schema: keyof Database }, + | { schema: keyof DatabaseWithoutInternals }, TableName extends DefaultSchemaTableNameOrOptions extends { - schema: keyof Database + schema: keyof DatabaseWithoutInternals } - ? keyof Database[DefaultSchemaTableNameOrOptions["schema"]]["Tables"] + ? keyof DatabaseWithoutInternals[DefaultSchemaTableNameOrOptions["schema"]]["Tables"] : never = never, -> = DefaultSchemaTableNameOrOptions extends { schema: keyof Database } - ? Database[DefaultSchemaTableNameOrOptions["schema"]]["Tables"][TableName] extends { +> = DefaultSchemaTableNameOrOptions extends { + schema: keyof DatabaseWithoutInternals +} + ? DatabaseWithoutInternals[DefaultSchemaTableNameOrOptions["schema"]]["Tables"][TableName] extends { Update: infer U } ? U @@ -778,14 +888,16 @@ export type TablesUpdate< export type Enums< DefaultSchemaEnumNameOrOptions extends | keyof DefaultSchema["Enums"] - | { schema: keyof Database }, + | { schema: keyof DatabaseWithoutInternals }, EnumName extends DefaultSchemaEnumNameOrOptions extends { - schema: keyof Database + schema: keyof DatabaseWithoutInternals } - ? keyof Database[DefaultSchemaEnumNameOrOptions["schema"]]["Enums"] + ? keyof DatabaseWithoutInternals[DefaultSchemaEnumNameOrOptions["schema"]]["Enums"] : never = never, -> = DefaultSchemaEnumNameOrOptions extends { schema: keyof Database } - ? Database[DefaultSchemaEnumNameOrOptions["schema"]]["Enums"][EnumName] +> = DefaultSchemaEnumNameOrOptions extends { + schema: keyof DatabaseWithoutInternals +} + ? DatabaseWithoutInternals[DefaultSchemaEnumNameOrOptions["schema"]]["Enums"][EnumName] : DefaultSchemaEnumNameOrOptions extends keyof DefaultSchema["Enums"] ? DefaultSchema["Enums"][DefaultSchemaEnumNameOrOptions] : never @@ -793,14 +905,16 @@ export type Enums< export type CompositeTypes< PublicCompositeTypeNameOrOptions extends | keyof DefaultSchema["CompositeTypes"] - | { schema: keyof Database }, + | { schema: keyof DatabaseWithoutInternals }, CompositeTypeName extends PublicCompositeTypeNameOrOptions extends { - schema: keyof Database + schema: keyof DatabaseWithoutInternals } - ? keyof Database[PublicCompositeTypeNameOrOptions["schema"]]["CompositeTypes"] + ? keyof DatabaseWithoutInternals[PublicCompositeTypeNameOrOptions["schema"]]["CompositeTypes"] : never = never, -> = PublicCompositeTypeNameOrOptions extends { schema: keyof Database } - ? Database[PublicCompositeTypeNameOrOptions["schema"]]["CompositeTypes"][CompositeTypeName] +> = PublicCompositeTypeNameOrOptions extends { + schema: keyof DatabaseWithoutInternals +} + ? DatabaseWithoutInternals[PublicCompositeTypeNameOrOptions["schema"]]["CompositeTypes"][CompositeTypeName] : PublicCompositeTypeNameOrOptions extends keyof DefaultSchema["CompositeTypes"] ? DefaultSchema["CompositeTypes"][PublicCompositeTypeNameOrOptions] : never diff --git a/pkgs/core/supabase/migrations/20250429164909_pgflow_initial.sql b/pkgs/core/supabase/migrations/20250429164909_pgflow_initial.sql index 69150cd89..804b91943 100644 --- a/pkgs/core/supabase/migrations/20250429164909_pgflow_initial.sql +++ b/pkgs/core/supabase/migrations/20250429164909_pgflow_initial.sql @@ -1,5 +1,5 @@ -- Add new schema named "pgflow" -CREATE SCHEMA "pgflow"; +CREATE SCHEMA IF NOT EXISTS "pgflow"; -- Add new schema named "pgmq" CREATE SCHEMA IF NOT EXISTS "pgmq"; -- Create extension "pgmq" diff --git a/pkgs/core/supabase/migrations/20251104080523_pgflow_upgrade_pgmq_1_5_1.sql b/pkgs/core/supabase/migrations/20251104080523_pgflow_upgrade_pgmq_1_5_1.sql new file mode 100644 index 000000000..ed67df611 --- /dev/null +++ b/pkgs/core/supabase/migrations/20251104080523_pgflow_upgrade_pgmq_1_5_1.sql @@ -0,0 +1,93 @@ +-- Migration tested 2025-11-02: +-- Successfully verified that this migration fails on pgmq 1.4.4 (Supabase CLI 2.0.2) +-- with clear error message guiding users to upgrade pgmq to 1.5.0+ +-- +-- Compatibility check: Ensure pgmq.message_record has headers column (pgmq 1.5.0+) +DO $$ +DECLARE + has_headers BOOLEAN; +BEGIN + SELECT EXISTS ( + SELECT 1 + FROM pg_type t + JOIN pg_namespace n ON t.typnamespace = n.oid + JOIN pg_attribute a ON a.attrelid = t.typrelid + WHERE n.nspname = 'pgmq' + AND t.typname = 'message_record' + AND a.attname = 'headers' + AND a.attnum > 0 + AND NOT a.attisdropped + ) INTO has_headers; + + IF NOT has_headers THEN + RAISE EXCEPTION E'INCOMPATIBLE PGMQ VERSION DETECTED\n\n' + 'This migration is part of pgflow 0.8.0+, which requires pgmq 1.5.0 or higher.\n' + 'The pgmq.message_record type is missing the "headers" column, which indicates you are running pgmq < 1.5.0.\n\n' + 'pgflow 0.8.0+ is NOT compatible with pgmq versions below 1.5.0.\n\n' + 'Action required:\n' + ' - If using Supabase: Ensure you are running a recent version that includes pgmq 1.5.0+\n' + ' - If self-hosting: Upgrade pgmq to version 1.5.0 or higher before running this migration\n\n' + 'Migration aborted to prevent runtime failures.'; + END IF; +END $$; + +-- Modify "set_vt_batch" function +-- Must drop first because we're changing the return type from SETOF to TABLE +DROP FUNCTION IF EXISTS "pgflow"."set_vt_batch"(text, bigint[], integer[]); +CREATE FUNCTION "pgflow"."set_vt_batch" ( + "queue_name" text, + "msg_ids" bigint[], + "vt_offsets" integer[] +) +RETURNS TABLE( + msg_id bigint, + read_ct integer, + enqueued_at timestamp with time zone, + vt timestamp with time zone, + message jsonb, + headers jsonb +) +LANGUAGE plpgsql AS $$ +DECLARE + qtable TEXT := pgmq.format_table_name(queue_name, 'q'); + sql TEXT; +BEGIN + /* ---------- safety checks ---------------------------------------------------- */ + IF msg_ids IS NULL OR vt_offsets IS NULL OR array_length(msg_ids, 1) = 0 THEN + RETURN; -- nothing to do, return empty set + END IF; + + IF array_length(msg_ids, 1) IS DISTINCT FROM array_length(vt_offsets, 1) THEN + RAISE EXCEPTION + 'msg_ids length (%) must equal vt_offsets length (%)', + array_length(msg_ids, 1), array_length(vt_offsets, 1); + END IF; + + /* ---------- dynamic statement ------------------------------------------------ */ + /* One UPDATE joins with the unnested arrays */ + sql := format( + $FMT$ + WITH input (msg_id, vt_offset) AS ( + SELECT unnest($1)::bigint + , unnest($2)::int + ) + UPDATE pgmq.%I q + SET vt = clock_timestamp() + make_interval(secs => input.vt_offset), + read_ct = read_ct -- no change, but keeps RETURNING list aligned + FROM input + WHERE q.msg_id = input.msg_id + RETURNING q.msg_id, + q.read_ct, + q.enqueued_at, + q.vt, + q.message, + q.headers + $FMT$, + qtable + ); + + RETURN QUERY EXECUTE sql USING msg_ids, vt_offsets; +END; +$$; +-- Drop "read_with_poll" function +DROP FUNCTION "pgflow"."read_with_poll"; diff --git a/pkgs/core/supabase/migrations/atlas.sum b/pkgs/core/supabase/migrations/atlas.sum index d1dbb5669..5ac2fd2d8 100644 --- a/pkgs/core/supabase/migrations/atlas.sum +++ b/pkgs/core/supabase/migrations/atlas.sum @@ -1,12 +1,13 @@ -h1:FClxPy988QMOIEYi66F38g1A8YpcTqps8VPEfuMG2Z4= -20250429164909_pgflow_initial.sql h1:pPSR5ej8e7VUxSam0jUyUWQguMenrmsd9gS2dpi9QHM= -20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:560T1AOw2XW+Hem7ZTt67Otl6PEdNwq4xFQVOnGY2Dk= -20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:7Ki2Jn9LhPAAGLs0ypwPfMn+9cPZ4ED9y23HWIa/O40= -20250610180554_pgflow_add_set_vt_batch_and_use_it_in_start_tasks.sql h1:j63WQ4272fMlFe48rJVIpbLHuzu0cCfhd4Glbtm/eKw= -20250614124241_pgflow_add_realtime.sql h1:yh6poH6mapmY/RZXkBzZFSv9ylFBtVdHcDd7kbR/Zdk= -20250619195327_pgflow_fix_fail_task_missing_realtime_event.sql h1:DkCnr8aKycwBRB95ZGdQsP0yhfW+tTZcUROFgtDgE2I= -20250627090700_pgflow_fix_function_search_paths.sql h1:0UQlJ7LF9PZ7Hje/r83XmWw66gDbfn4lg7ho9WT3wYM= -20250707210212_pgflow_add_opt_start_delay.sql h1:hRR3baDRd0V41sVUq/l6z3fkAaNCSNLe8m0BvRIYpEU= -20250719205006_pgflow_worker_deprecation.sql h1:VIJnWcNC2tbbc5UV9kAnQP5SNlr7N9LLEUSB3/KWoU4= -20251006073122_pgflow_add_map_step_type.sql h1:glsi7ti7BWa7UwWZze8zDjlNZBNOv4+nPRKLYNRUOro= -20251103222045_pgflow_fix_broadcast_order_and_timestamp_handling.sql h1:mP0YyOTmCmPIhvwC1d80RUs4syigRzYsmeeWi3i6piU= +h1:SeEHqz8uDIqu6Px6f1IajB3NfXOYmetcSxFbp9PiYXY= +20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s= +20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY= +20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg= +20250610180554_pgflow_add_set_vt_batch_and_use_it_in_start_tasks.sql h1:Mpf0PImrTsaZaIEPTQpONi5Ad0/RNfnatfc1WglG360= +20250614124241_pgflow_add_realtime.sql h1:jhMIt4OkrPf9o4hqcBZWBxxaQk94l252XRwjpEr51aA= +20250619195327_pgflow_fix_fail_task_missing_realtime_event.sql h1:n9kZFdfkYWaLCKvOHOSbJl6PFUhOiocdPVBcqyA4Ew4= +20250627090700_pgflow_fix_function_search_paths.sql h1:AEqyLIzjuInSmGrUwKhw5jv6S1iY3yaiKNKaLWjGbys= +20250707210212_pgflow_add_opt_start_delay.sql h1:MGzPX/E9NCr0w4LZR/Iw1vRxo/Gp+WpPO01r7giDulY= +20250719205006_pgflow_worker_deprecation.sql h1:pL5cR1/Oag993yWN6sUpE/U3LmVSzAlnoRXJRBtErU8= +20251006073122_pgflow_add_map_step_type.sql h1:D/skgKpaVg5TM8bPovo9FUutQfg35/AzkxEcasYwytY= +20251103222045_pgflow_fix_broadcast_order_and_timestamp_handling.sql h1:K/XnZpOmxfelsaNoJbR5HxhBrs/oW4aYja222h5cps4= +20251104080523_pgflow_upgrade_pgmq_1_5_1.sql h1:xnbHNJkMGWoaIL5RH617CCjM9nX1Yde8TimHpHhk7fM= diff --git a/pkgs/core/supabase/seed.sql b/pkgs/core/supabase/seed.sql index 9bd94a6f3..90e53e9ae 100644 --- a/pkgs/core/supabase/seed.sql +++ b/pkgs/core/supabase/seed.sql @@ -1,3 +1,4 @@ +create schema if not exists pgflow; create schema if not exists pgflow_tests; -------------------------------------------------------------------------------- @@ -72,7 +73,7 @@ create or replace function pgflow_tests.ensure_worker( ) returns uuid as $$ INSERT INTO pgflow.workers (worker_id, queue_name, function_name, last_heartbeat_at) VALUES (worker_uuid, queue_name, function_name, now()) - ON CONFLICT (worker_id) DO UPDATE SET + ON CONFLICT (worker_id) DO UPDATE SET last_heartbeat_at = now(), queue_name = EXCLUDED.queue_name, function_name = EXCLUDED.function_name @@ -83,11 +84,11 @@ $$ language sql; ------- read_and_start - reads messages and starts tasks in one call ----------- -------------------------------------------------------------------------------- create or replace function pgflow_tests.read_and_start( - flow_slug text, - vt integer default 1, - qty integer default 1, - worker_uuid uuid default '11111111-1111-1111-1111-111111111111'::uuid, - function_name text default 'test_worker' + flow_slug text, + vt integer default 1, + qty integer default 1, + worker_uuid uuid default '11111111-1111-1111-1111-111111111111'::uuid, + function_name text default 'test_worker' ) returns setof pgflow.step_task_record language sql as $$ @@ -102,7 +103,7 @@ as $$ -- 2. read messages from the queue msgs AS ( SELECT * - FROM pgflow.read_with_poll(flow_slug, vt, qty, 1, 50) + FROM pgmq.read_with_poll(flow_slug, vt, qty, 1, 50) LIMIT qty ), -- 3. collect their msg_ids @@ -526,66 +527,6 @@ BEGIN END; $$ language plpgsql; --------------------------------------------------------------------------------- -------- create_realtime_partition - creates partition for realtime.messages ---- --------------------------------------------------------------------------------- -/** - * Creates a partition for the realtime.messages table for the current date. - * - * This function ensures the partition exists for the realtime.messages table, - * which is required for realtime.send() to work properly. Without the appropriate - * partition, realtime.send() will silently fail (it catches exceptions and sends - * notifications instead of raising errors). - * - * The partition follows the naming convention: messages_YYYY_MM_DD - * and covers the range from midnight of the target date to midnight of the next day. - * - * @param target_date The date to create the partition for. Defaults to current_date. - * @return boolean TRUE if a partition was created, FALSE if it already existed - */ -create or replace function pgflow_tests.create_realtime_partition( - target_date date default CURRENT_DATE -) returns boolean as $$ -DECLARE - next_date date := target_date + interval '1 day'; - partition_name text := 'messages_' || to_char(target_date, 'YYYY_MM_DD'); - partition_exists boolean; - was_created boolean := false; -BEGIN - -- Check if partition already exists - SELECT EXISTS ( - SELECT 1 FROM pg_class c - JOIN pg_namespace n ON c.relnamespace = n.oid - WHERE n.nspname = 'realtime' - AND c.relname = partition_name - ) INTO partition_exists; - - -- Create partition if it doesn't exist - IF NOT partition_exists THEN - BEGIN - -- Create the partition for the target date - EXECUTE format( - 'CREATE TABLE realtime.%I PARTITION OF realtime.messages - FOR VALUES FROM (%L) TO (%L)', - partition_name, - target_date, - next_date - ); - was_created := true; - RAISE NOTICE 'Created partition % for date range % to %', - partition_name, target_date, next_date; - EXCEPTION WHEN OTHERS THEN - RAISE NOTICE 'Error creating partition %: %', partition_name, SQLERRM; - -- Re-throw the exception - RAISE; - END; - ELSE - RAISE NOTICE 'Partition % already exists', partition_name; - END IF; - - RETURN was_created; -END; -$$ language plpgsql; -------------------------------------------------------------------------------- ------- find_realtime_event - finds a specific realtime event for testing ------ diff --git a/pkgs/core/supabase/tests/cascade_complete_taskless_steps/broadcast_order.test.sql b/pkgs/core/supabase/tests/cascade_complete_taskless_steps/broadcast_order.test.sql index 16ec89236..54bfc876e 100644 --- a/pkgs/core/supabase/tests/cascade_complete_taskless_steps/broadcast_order.test.sql +++ b/pkgs/core/supabase/tests/cascade_complete_taskless_steps/broadcast_order.test.sql @@ -1,9 +1,6 @@ begin; select plan(5); --- Ensure partition exists for realtime.messages -select pgflow_tests.create_realtime_partition(); - -- Reset database and create flow: -- parent_step (single) -> taskless_child1 (map) -> taskless_child2 (map) -- Parent outputs empty array, triggering cascade completion of map children diff --git a/pkgs/core/supabase/tests/cascade_complete_taskless_steps/cascade_event_payload.test.sql b/pkgs/core/supabase/tests/cascade_complete_taskless_steps/cascade_event_payload.test.sql index 935bec970..948073954 100644 --- a/pkgs/core/supabase/tests/cascade_complete_taskless_steps/cascade_event_payload.test.sql +++ b/pkgs/core/supabase/tests/cascade_complete_taskless_steps/cascade_event_payload.test.sql @@ -1,9 +1,6 @@ begin; select plan(8); --- Ensure partition exists for realtime.messages -select pgflow_tests.create_realtime_partition(); - -- Reset database and create flow with single empty map select pgflow_tests.reset_db(); select pgflow.create_flow('payload_check'); diff --git a/pkgs/core/supabase/tests/cascade_complete_taskless_steps/deep_cascade_events.test.sql b/pkgs/core/supabase/tests/cascade_complete_taskless_steps/deep_cascade_events.test.sql index 04c5f3c64..b0a62836f 100644 --- a/pkgs/core/supabase/tests/cascade_complete_taskless_steps/deep_cascade_events.test.sql +++ b/pkgs/core/supabase/tests/cascade_complete_taskless_steps/deep_cascade_events.test.sql @@ -1,9 +1,6 @@ begin; select plan(3); --- Ensure partition exists for realtime.messages -select pgflow_tests.create_realtime_partition(); - -- Reset database and create flow: root_map -> m1 -> m2 -> m3 -> m4 (all maps) select pgflow_tests.reset_db(); select pgflow.create_flow('deep_cascade'); diff --git a/pkgs/core/supabase/tests/cascade_complete_taskless_steps/dependent_map_cascade_events.test.sql b/pkgs/core/supabase/tests/cascade_complete_taskless_steps/dependent_map_cascade_events.test.sql index 8d5dc7540..4b3b8c238 100644 --- a/pkgs/core/supabase/tests/cascade_complete_taskless_steps/dependent_map_cascade_events.test.sql +++ b/pkgs/core/supabase/tests/cascade_complete_taskless_steps/dependent_map_cascade_events.test.sql @@ -1,9 +1,6 @@ begin; select plan(5); --- Ensure partition exists for realtime.messages -select pgflow_tests.create_realtime_partition(); - -- Reset database and create flow: root_map -> dependent_map select pgflow_tests.reset_db(); select pgflow.create_flow('cascade_map_chain'); diff --git a/pkgs/core/supabase/tests/cascade_complete_taskless_steps/empty_root_map_cascade_events.test.sql b/pkgs/core/supabase/tests/cascade_complete_taskless_steps/empty_root_map_cascade_events.test.sql index f011c4e67..4ab638a4a 100644 --- a/pkgs/core/supabase/tests/cascade_complete_taskless_steps/empty_root_map_cascade_events.test.sql +++ b/pkgs/core/supabase/tests/cascade_complete_taskless_steps/empty_root_map_cascade_events.test.sql @@ -1,9 +1,6 @@ begin; select plan(4); --- Ensure partition exists for realtime.messages -select pgflow_tests.create_realtime_partition(); - -- Reset database and create flow with single root map step select pgflow_tests.reset_db(); select pgflow.create_flow('empty_root_map'); diff --git a/pkgs/core/supabase/tests/cascade_complete_taskless_steps/mixed_cascade_events.test.sql b/pkgs/core/supabase/tests/cascade_complete_taskless_steps/mixed_cascade_events.test.sql index a189c7e78..c0aa79704 100644 --- a/pkgs/core/supabase/tests/cascade_complete_taskless_steps/mixed_cascade_events.test.sql +++ b/pkgs/core/supabase/tests/cascade_complete_taskless_steps/mixed_cascade_events.test.sql @@ -1,9 +1,6 @@ begin; select plan(6); --- Ensure partition exists for realtime.messages -select pgflow_tests.create_realtime_partition(); - -- Reset database and create flow: producer -> map1 -> map2 select pgflow_tests.reset_db(); select pgflow.create_flow('mixed_cascade'); diff --git a/pkgs/core/supabase/tests/cascade_complete_taskless_steps/single_step_cascade_events.test.sql b/pkgs/core/supabase/tests/cascade_complete_taskless_steps/single_step_cascade_events.test.sql index 38e571436..0c327d65e 100644 --- a/pkgs/core/supabase/tests/cascade_complete_taskless_steps/single_step_cascade_events.test.sql +++ b/pkgs/core/supabase/tests/cascade_complete_taskless_steps/single_step_cascade_events.test.sql @@ -1,9 +1,6 @@ begin; select plan(3); --- Ensure partition exists for realtime.messages -select pgflow_tests.create_realtime_partition(); - -- Reset database and create flow: A -> B -> C (all single steps) select pgflow_tests.reset_db(); select pgflow.create_flow('single_cascade'); diff --git a/pkgs/core/supabase/tests/map_output_aggregation/broadcast_event_fixed.test.sql b/pkgs/core/supabase/tests/map_output_aggregation/broadcast_event_fixed.test.sql index 6efa6f11c..3a1cbce32 100644 --- a/pkgs/core/supabase/tests/map_output_aggregation/broadcast_event_fixed.test.sql +++ b/pkgs/core/supabase/tests/map_output_aggregation/broadcast_event_fixed.test.sql @@ -1,15 +1,12 @@ begin; select plan(4); + -- Test: Map step completion broadcast contains aggregated array -- This test verifies the broadcast fix is working correctly -- Setup select pgflow_tests.reset_db(); - --- Ensure partition exists for realtime.messages -select pgflow_tests.create_realtime_partition(); - select pgflow.create_flow('test_broadcast_fix', 10, 60, 3); select pgflow.add_step('test_broadcast_fix', 'map_step', '{}', null, null, null, null, 'map'); select pgflow.add_step('test_broadcast_fix', 'consumer', array['map_step'], null, null, null, null, 'single'); diff --git a/pkgs/core/supabase/tests/pgflow_tests/create_realtime_partition.test.sql b/pkgs/core/supabase/tests/pgflow_tests/create_realtime_partition.test.sql deleted file mode 100644 index e417a2b0d..000000000 --- a/pkgs/core/supabase/tests/pgflow_tests/create_realtime_partition.test.sql +++ /dev/null @@ -1,83 +0,0 @@ --- Test for pgflow_tests.create_realtime_partition() function -BEGIN; - -SELECT plan(6); - --- Clear previous data -SELECT pgflow_tests.reset_db(); - --- 1. Test function exists with correct signature -SELECT has_function( - 'pgflow_tests', 'create_realtime_partition', ARRAY['date'], - 'Function create_realtime_partition(date) should exist' -); - --- 2. First drop the partition if it exists to ensure a clean test state -DO $$ -DECLARE - partition_name text := 'messages_' || to_char(current_date, 'YYYY_MM_DD'); -BEGIN - EXECUTE format('DROP TABLE IF EXISTS realtime.%I', partition_name); -END; -$$; - --- 3. Verify that the partition doesn't exist after dropping -SELECT is( - (SELECT EXISTS ( - SELECT 1 FROM pg_class c - JOIN pg_namespace n ON c.relnamespace = n.oid - WHERE n.nspname = 'realtime' - AND c.relname = 'messages_' || to_char(current_date, 'YYYY_MM_DD') - )), - false, - 'Partition should not exist before test' -); - --- 4. Test that the function creates a partition that doesn't exist -SELECT is( - pgflow_tests.create_realtime_partition(), - true, - 'Should return true when creating a new partition' -); - --- 5. Check that the partition actually exists in the system catalog -SELECT is( - (SELECT EXISTS ( - SELECT 1 FROM pg_class c - JOIN pg_namespace n ON c.relnamespace = n.oid - WHERE n.nspname = 'realtime' - AND c.relname = 'messages_' || to_char(current_date, 'YYYY_MM_DD') - )), - true, - 'Partition should exist in system catalog after creation' -); - --- 6. Test that calling the function again returns false (idempotent) -SELECT is( - pgflow_tests.create_realtime_partition(), - false, - 'Should return false when partition already exists' -); - --- 7. Test that realtime.send() works after creating the partition --- First, clear any existing messages -DELETE FROM realtime.messages WHERE payload->>'event_type' = 'partition_test'; - --- Now send a test message -SELECT realtime.send( - jsonb_build_object('event_type', 'partition_test', 'message', 'Testing partition creation'), - 'test:event', - 'test:topic', - false -); - --- Use is() to exactly compare the count as integers -SELECT is( - (SELECT COUNT(*)::int FROM realtime.messages WHERE payload->>'event_type' = 'partition_test'), - 1, - 'realtime.send() should insert exactly 1 message' -); - --- Finish the test -SELECT * FROM finish(); -ROLLBACK; \ No newline at end of file diff --git a/pkgs/core/supabase/tests/realtime/complete_task_events.test.sql b/pkgs/core/supabase/tests/realtime/complete_task_events.test.sql index 82fa2fecd..2e28b4980 100644 --- a/pkgs/core/supabase/tests/realtime/complete_task_events.test.sql +++ b/pkgs/core/supabase/tests/realtime/complete_task_events.test.sql @@ -1,9 +1,6 @@ begin; select plan(9); --- Ensure partition exists for realtime.messages -select pgflow_tests.create_realtime_partition(); - -- Reset database and setup a sequential flow select pgflow_tests.reset_db(); select pgflow_tests.setup_flow('sequential'); diff --git a/pkgs/core/supabase/tests/realtime/empty_map_completion_events.test.sql b/pkgs/core/supabase/tests/realtime/empty_map_completion_events.test.sql index 7c5b47d2c..9118be1f0 100644 --- a/pkgs/core/supabase/tests/realtime/empty_map_completion_events.test.sql +++ b/pkgs/core/supabase/tests/realtime/empty_map_completion_events.test.sql @@ -1,9 +1,6 @@ begin; select plan(8); --- Ensure partition exists for realtime.messages -select pgflow_tests.create_realtime_partition(); - -- Reset database and create flow with root map step select pgflow_tests.reset_db(); select pgflow.create_flow('empty_map_test'); diff --git a/pkgs/core/supabase/tests/realtime/fail_task_events.test.sql b/pkgs/core/supabase/tests/realtime/fail_task_events.test.sql index 273d83f48..adf21ea47 100644 --- a/pkgs/core/supabase/tests/realtime/fail_task_events.test.sql +++ b/pkgs/core/supabase/tests/realtime/fail_task_events.test.sql @@ -1,9 +1,6 @@ begin; select plan(10); --- Ensure partition exists for realtime.messages -select pgflow_tests.create_realtime_partition(); - -- Reset database and setup a sequential flow with retry settings select pgflow_tests.reset_db(); select pgflow.create_flow('sequential', max_attempts => 1); -- Set max_attempts to 1 so it fails permanently diff --git a/pkgs/core/supabase/tests/realtime/full_flow_events.test.sql b/pkgs/core/supabase/tests/realtime/full_flow_events.test.sql index ce2685931..494db51f7 100644 --- a/pkgs/core/supabase/tests/realtime/full_flow_events.test.sql +++ b/pkgs/core/supabase/tests/realtime/full_flow_events.test.sql @@ -1,9 +1,6 @@ begin; select plan(8); --- Ensure partition exists for realtime.messages -select pgflow_tests.create_realtime_partition(); - -- Reset database and setup a flow with multiple steps and dependencies select pgflow_tests.reset_db(); select pgflow_tests.setup_flow('two_roots_left_right'); diff --git a/pkgs/core/supabase/tests/realtime/maybe_complete_run_events.test.sql b/pkgs/core/supabase/tests/realtime/maybe_complete_run_events.test.sql index ffdf8816f..f3005a14f 100644 --- a/pkgs/core/supabase/tests/realtime/maybe_complete_run_events.test.sql +++ b/pkgs/core/supabase/tests/realtime/maybe_complete_run_events.test.sql @@ -1,9 +1,6 @@ begin; select plan(6); --- Ensure partition exists for realtime.messages -select pgflow_tests.create_realtime_partition(); - -- Reset database and setup a simple flow with just one step select pgflow_tests.reset_db(); select pgflow.create_flow('simple'); diff --git a/pkgs/core/supabase/tests/realtime/start_flow_events.test.sql b/pkgs/core/supabase/tests/realtime/start_flow_events.test.sql index db86265de..40d5f361a 100644 --- a/pkgs/core/supabase/tests/realtime/start_flow_events.test.sql +++ b/pkgs/core/supabase/tests/realtime/start_flow_events.test.sql @@ -1,9 +1,6 @@ begin; select plan(7); --- Ensure partition exists for realtime.messages -select pgflow_tests.create_realtime_partition(); - -- Reset database and create test flow select pgflow_tests.reset_db(); select pgflow.create_flow('sequential'); diff --git a/pkgs/core/supabase/tests/realtime/start_ready_steps_events.test.sql b/pkgs/core/supabase/tests/realtime/start_ready_steps_events.test.sql index 6c9447bce..019643d3f 100644 --- a/pkgs/core/supabase/tests/realtime/start_ready_steps_events.test.sql +++ b/pkgs/core/supabase/tests/realtime/start_ready_steps_events.test.sql @@ -1,9 +1,6 @@ begin; select plan(7); --- Ensure partition exists for realtime.messages -select pgflow_tests.create_realtime_partition(); - -- Reset database and setup a sequential flow with dependencies select pgflow_tests.reset_db(); select pgflow_tests.setup_flow('sequential'); -- This creates first -> second -> last steps diff --git a/pkgs/core/supabase/tests/realtime/type_violation_events.test.sql b/pkgs/core/supabase/tests/realtime/type_violation_events.test.sql index 3767753cf..ca41366a5 100644 --- a/pkgs/core/supabase/tests/realtime/type_violation_events.test.sql +++ b/pkgs/core/supabase/tests/realtime/type_violation_events.test.sql @@ -1,9 +1,6 @@ begin; select plan(8); --- Ensure partition exists for realtime.messages -select pgflow_tests.create_realtime_partition(); - -- Reset database select pgflow_tests.reset_db(); diff --git a/pkgs/core/supabase/tests/regressions/step_failed_event_bug.test.sql b/pkgs/core/supabase/tests/regressions/step_failed_event_bug.test.sql index 9e0eb0294..7401b3c08 100644 --- a/pkgs/core/supabase/tests/regressions/step_failed_event_bug.test.sql +++ b/pkgs/core/supabase/tests/regressions/step_failed_event_bug.test.sql @@ -1,9 +1,6 @@ begin; select plan(3); --- Ensure partition exists for realtime.messages -select pgflow_tests.create_realtime_partition(); - -- Reset database and setup a flow with max_attempts = 1 to fail immediately select pgflow_tests.reset_db(); select pgflow.create_flow('test_flow', max_attempts => 1); diff --git a/pkgs/core/supabase/tests/set_vt_batch/headers_handling.test.sql b/pkgs/core/supabase/tests/set_vt_batch/headers_handling.test.sql new file mode 100644 index 000000000..2a89339bd --- /dev/null +++ b/pkgs/core/supabase/tests/set_vt_batch/headers_handling.test.sql @@ -0,0 +1,57 @@ +begin; +select plan(3); +select pgflow_tests.reset_db(); + +-- Create a test queue +select pgmq.create('headers_queue'); + +-- Send messages without headers +select pgmq.send('headers_queue', '{"msg": "no_headers"}'::jsonb); + +-- Send message with headers +select pgmq.send('headers_queue', '{"msg": "with_headers"}'::jsonb, '{"source": "test"}'::jsonb); + +-- Get message IDs +with msgs as ( + select array_agg(msg_id order by msg_id) as ids from pgmq.q_headers_queue +) +-- TEST 1: set_vt_batch returns headers column for message without headers (NULL) +select is( + (select headers from pgflow.set_vt_batch( + 'headers_queue', + ARRAY[(select ids[1] from msgs)], + ARRAY[30] + )), + NULL, + 'set_vt_batch should return NULL in headers column for message sent without headers' +); + +-- TEST 2: set_vt_batch returns headers column for message with headers (non-NULL) +with msgs as ( + select array_agg(msg_id order by msg_id) as ids from pgmq.q_headers_queue +) +select is( + (select headers->>'source' from pgflow.set_vt_batch( + 'headers_queue', + ARRAY[(select ids[2] from msgs)], + ARRAY[30] + )), + 'test', + 'set_vt_batch should return headers column with correct values for message sent with headers' +); + +-- TEST 3: set_vt_batch returns headers column for all messages in batch +with msgs as ( + select array_agg(msg_id order by msg_id) as ids from pgmq.q_headers_queue +) +select ok( + (select count(*) = 2 from pgflow.set_vt_batch( + 'headers_queue', + (select ids from msgs), + ARRAY[30, 40] + )), + 'set_vt_batch should return headers column for all messages in batch' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/set_vt_batch/return_format.test.sql b/pkgs/core/supabase/tests/set_vt_batch/return_format.test.sql index 47fb43d2f..5dd7ab534 100644 --- a/pkgs/core/supabase/tests/set_vt_batch/return_format.test.sql +++ b/pkgs/core/supabase/tests/set_vt_batch/return_format.test.sql @@ -1,5 +1,5 @@ begin; -select plan(6); +select plan(7); select pgflow_tests.reset_db(); -- Create a test queue @@ -70,5 +70,15 @@ select is( 'Message content should be preserved in returned records' ); +-- TEST: Returned records should have headers field as NULL (messages sent without headers) +with msgs as ( + select array_agg(msg_id order by msg_id) as ids from pgmq.q_return_queue +) +select is( + (select count(*)::int from pgflow.set_vt_batch('return_queue', (select ids from msgs), ARRAY[30, 60]) where headers is null), + 2, + 'Messages sent without headers should return NULL in headers field' +); + select finish(); rollback; \ No newline at end of file diff --git a/pkgs/core/supabase/tests/start_tasks/basic_start_tasks.test.sql b/pkgs/core/supabase/tests/start_tasks/basic_start_tasks.test.sql index dfa02d5df..c097e18f2 100644 --- a/pkgs/core/supabase/tests/start_tasks/basic_start_tasks.test.sql +++ b/pkgs/core/supabase/tests/start_tasks/basic_start_tasks.test.sql @@ -11,7 +11,7 @@ select pgflow_tests.ensure_worker('simple'); -- Read messages from queue and start task with msgs as ( - select * from pgflow.read_with_poll('simple', 10, 5, 1, 50) limit 1 + select * from pgmq.read_with_poll('simple', 10, 5, 1, 50) limit 1 ), msg_ids as ( select array_agg(msg_id) as ids from msgs @@ -65,7 +65,7 @@ select ok( -- TEST: Empty queue returns no tasks (after task is already started) with msgs as ( - select * from pgflow.read_with_poll('simple', 10, 5, 1, 50) limit 5 + select * from pgmq.read_with_poll('simple', 10, 5, 1, 50) limit 5 ), msg_ids as ( select array_agg(msg_id) as ids from msgs diff --git a/pkgs/core/supabase/tests/start_tasks/builds_proper_input_from_deps_outputs.test.sql b/pkgs/core/supabase/tests/start_tasks/builds_proper_input_from_deps_outputs.test.sql index db275d807..675288bec 100644 --- a/pkgs/core/supabase/tests/start_tasks/builds_proper_input_from_deps_outputs.test.sql +++ b/pkgs/core/supabase/tests/start_tasks/builds_proper_input_from_deps_outputs.test.sql @@ -14,7 +14,7 @@ select pgflow_tests.ensure_worker('dep_flow'); -- SETUP: Complete the first step to make second step available with first_msg_ids as ( select array_agg(msg_id) as ids - from pgflow.read_with_poll('dep_flow', 10, 5, 1, 100) + from pgmq.read_with_poll('dep_flow', 10, 5, 1, 100) ) select pgflow.start_tasks( 'dep_flow', @@ -32,7 +32,7 @@ select pgflow.complete_task( -- TEST: start_tasks returns a task for the dependent step with second_msg_ids as ( select array_agg(msg_id) as ids - from pgflow.read_with_poll('dep_flow', 10, 5, 1, 100) + from pgmq.read_with_poll('dep_flow', 10, 5, 1, 100) ) select is( (select count(*)::int from pgflow.start_tasks( diff --git a/pkgs/core/supabase/tests/start_tasks/multiple_task_processing.test.sql b/pkgs/core/supabase/tests/start_tasks/multiple_task_processing.test.sql index cd2ff6fde..a75b23e4a 100644 --- a/pkgs/core/supabase/tests/start_tasks/multiple_task_processing.test.sql +++ b/pkgs/core/supabase/tests/start_tasks/multiple_task_processing.test.sql @@ -15,7 +15,7 @@ select pgflow_tests.ensure_worker('multi_flow'); -- Read and start multiple tasks with msgs as ( - select * from pgflow.read_with_poll('multi_flow', 10, 10, 1, 50) limit 10 + select * from pgmq.read_with_poll('multi_flow', 10, 10, 1, 50) limit 10 ), msg_ids as ( select array_agg(msg_id) as ids from msgs diff --git a/pkgs/core/supabase/tests/start_tasks/returns_task_index.test.sql b/pkgs/core/supabase/tests/start_tasks/returns_task_index.test.sql index b06d0afe3..0bf2d8ae1 100644 --- a/pkgs/core/supabase/tests/start_tasks/returns_task_index.test.sql +++ b/pkgs/core/supabase/tests/start_tasks/returns_task_index.test.sql @@ -11,7 +11,7 @@ select pgflow.start_flow('single_task', '{"data": "test"}'::jsonb); select pgflow_tests.ensure_worker('single_task'); with msgs as ( - select * from pgflow.read_with_poll('single_task', 10, 5, 1, 50) limit 1 + select * from pgmq.read_with_poll('single_task', 10, 5, 1, 50) limit 1 ), msg_ids as ( select array_agg(msg_id) as ids from msgs @@ -40,7 +40,7 @@ select pgflow_tests.ensure_worker('map_flow'); -- Read all 3 messages with msgs as ( - select * from pgflow.read_with_poll('map_flow', 10, 5, 3, 50) order by msg_id + select * from pgmq.read_with_poll('map_flow', 10, 5, 3, 50) order by msg_id ), msg_ids as ( select array_agg(msg_id order by msg_id) as ids from msgs @@ -69,7 +69,7 @@ select pgflow_tests.ensure_worker('map_five'); -- Read all 5 messages with msgs as ( - select * from pgflow.read_with_poll('map_five', 10, 5, 5, 50) order by msg_id + select * from pgmq.read_with_poll('map_five', 10, 5, 5, 50) order by msg_id ), msg_ids as ( select array_agg(msg_id order by msg_id) as ids from msgs @@ -120,7 +120,7 @@ select pgflow.complete_task( -- Now read and start second map tasks select pgflow_tests.ensure_worker('map_chain', '22222222-2222-2222-2222-222222222222'::uuid); with msgs as ( - select * from pgflow.read_with_poll('map_chain', 10, 5, 2, 50) order by msg_id + select * from pgmq.read_with_poll('map_chain', 10, 5, 2, 50) order by msg_id ), msg_ids as ( select array_agg(msg_id order by msg_id) as ids from msgs @@ -161,7 +161,7 @@ select pgflow.complete_task( -- Process step_b select pgflow_tests.ensure_worker('sequential', '33333333-3333-3333-3333-333333333333'::uuid); with msgs as ( - select * from pgflow.read_with_poll('sequential', 10, 5, 1, 50) limit 1 + select * from pgmq.read_with_poll('sequential', 10, 5, 1, 50) limit 1 ), msg_ids as ( select array_agg(msg_id) as ids from msgs diff --git a/pkgs/core/supabase/tests/start_tasks/started_at_timestamps.test.sql b/pkgs/core/supabase/tests/start_tasks/started_at_timestamps.test.sql index c0f9acac3..7adb9243e 100644 --- a/pkgs/core/supabase/tests/start_tasks/started_at_timestamps.test.sql +++ b/pkgs/core/supabase/tests/start_tasks/started_at_timestamps.test.sql @@ -17,7 +17,7 @@ select is( select pgflow_tests.ensure_worker('timestamp_flow'); with msg_ids as ( select array_agg(msg_id) as ids - from pgflow.read_with_poll('timestamp_flow', 10, 5, 1, 100) + from pgmq.read_with_poll('timestamp_flow', 10, 5, 1, 100) ) select pgflow.start_tasks( 'timestamp_flow', diff --git a/pkgs/core/supabase/tests/start_tasks/status_transitions.test.sql b/pkgs/core/supabase/tests/start_tasks/status_transitions.test.sql index 105fc8250..3b583491c 100644 --- a/pkgs/core/supabase/tests/start_tasks/status_transitions.test.sql +++ b/pkgs/core/supabase/tests/start_tasks/status_transitions.test.sql @@ -17,7 +17,7 @@ select is( select pgflow_tests.ensure_worker('status_flow'); with msg_ids as ( select array_agg(msg_id) as ids - from pgflow.read_with_poll('status_flow', 10, 5, 1, 100) + from pgmq.read_with_poll('status_flow', 10, 5, 1, 100) ) select pgflow.start_tasks( 'status_flow', diff --git a/pkgs/core/supabase/tests/start_tasks/task_index_returned_correctly.test.sql b/pkgs/core/supabase/tests/start_tasks/task_index_returned_correctly.test.sql index acdcfcccd..f935bebeb 100644 --- a/pkgs/core/supabase/tests/start_tasks/task_index_returned_correctly.test.sql +++ b/pkgs/core/supabase/tests/start_tasks/task_index_returned_correctly.test.sql @@ -52,7 +52,7 @@ select pgflow_tests.ensure_worker('test_task_index', '55555555-5555-5555-5555-55 -- Read messages from queue and start tasks with msgs as ( - select * from pgflow.read_with_poll('test_task_index', 10, 10, 1, 50) + select * from pgmq.read_with_poll('test_task_index', 10, 10, 1, 50) ), msg_ids as ( select array_agg(msg_id) as ids from msgs diff --git a/pkgs/core/supabase/tests/start_tasks/worker_tracking.test.sql b/pkgs/core/supabase/tests/start_tasks/worker_tracking.test.sql index 9f4c8afe4..e06d377a0 100644 --- a/pkgs/core/supabase/tests/start_tasks/worker_tracking.test.sql +++ b/pkgs/core/supabase/tests/start_tasks/worker_tracking.test.sql @@ -13,7 +13,7 @@ select pgflow_tests.ensure_worker('simple', '00000000-0000-0000-0000-00000000000 -- SETUP: Get message IDs and start tasks with specific worker with msg_ids as ( select array_agg(msg_id) as ids - from pgflow.read_with_poll('simple', 10, 5, 1, 100) + from pgmq.read_with_poll('simple', 10, 5, 1, 100) ) select pgflow.start_tasks( 'simple', @@ -32,7 +32,7 @@ select is( select pgflow.start_flow('simple', '"world"'::jsonb); with msg_ids as ( select array_agg(msg_id) as ids - from pgflow.read_with_poll('simple', 10, 5, 1, 100) + from pgmq.read_with_poll('simple', 10, 5, 1, 100) ) select pgflow.start_tasks( 'simple', diff --git a/pkgs/edge-worker/package.json b/pkgs/edge-worker/package.json index 649c7a0d8..523ef03f8 100644 --- a/pkgs/edge-worker/package.json +++ b/pkgs/edge-worker/package.json @@ -30,7 +30,7 @@ "devDependencies": { "@types/deno": "^2.3.0", "@types/node": "~18.16.20", - "supabase": "2.21.1" + "supabase": "^2.34.3" }, "publishConfig": { "access": "public" diff --git a/pkgs/edge-worker/src/queue/Queue.ts b/pkgs/edge-worker/src/queue/Queue.ts index 5925744aa..081e857c4 100644 --- a/pkgs/edge-worker/src/queue/Queue.ts +++ b/pkgs/edge-worker/src/queue/Queue.ts @@ -79,7 +79,7 @@ export class Queue { ); return await this.sql[]>` SELECT * - FROM pgflow.read_with_poll( + FROM pgmq.read_with_poll( queue_name => ${this.queueName}, vt => ${visibilityTimeout}, qty => ${batchSize}, diff --git a/pkgs/edge-worker/src/queue/types.ts b/pkgs/edge-worker/src/queue/types.ts index f36e5f6e0..a6d3901b0 100644 --- a/pkgs/edge-worker/src/queue/types.ts +++ b/pkgs/edge-worker/src/queue/types.ts @@ -12,6 +12,7 @@ export interface PgmqMessageRecord enqueued_at: string; vt: string; message: TPayload; + headers?: Json | null; // Optional for backward compatibility with pgmq 1.5.1+ } /** diff --git a/pkgs/edge-worker/tests/integration/messageExecutorContext.test.ts b/pkgs/edge-worker/tests/integration/messageExecutorContext.test.ts index 914c98851..427f24d67 100644 --- a/pkgs/edge-worker/tests/integration/messageExecutorContext.test.ts +++ b/pkgs/edge-worker/tests/integration/messageExecutorContext.test.ts @@ -27,6 +27,7 @@ Deno.test( enqueued_at: '2024-01-01T00:00:00Z', vt: '2024-01-01T00:01:00Z', message: { data: 'test data' }, + headers: null, }; const abortController = new AbortController(); @@ -83,6 +84,7 @@ Deno.test( enqueued_at: '2024-01-01T00:00:00Z', vt: '2024-01-01T00:01:00Z', message: { data: 'legacy test' }, + headers: null, }; let receivedPayload: { data: string } | undefined; @@ -116,6 +118,7 @@ Deno.test( enqueued_at: '2024-01-01T00:00:00Z', vt: '2024-01-01T00:01:00Z', message: { id: 42, name: 'test item' }, + headers: null, }; const abortController = new AbortController(); @@ -158,6 +161,7 @@ Deno.test( enqueued_at: '2024-01-01T00:00:00Z', vt: '2024-01-01T00:01:00Z', message: { test: 'supabase test' }, + headers: null, }; const abortController = new AbortController(); diff --git a/pkgs/edge-worker/tests/integration/messageExecutorContextWorkerConfig.test.ts b/pkgs/edge-worker/tests/integration/messageExecutorContextWorkerConfig.test.ts index 955259025..516a9e3b0 100644 --- a/pkgs/edge-worker/tests/integration/messageExecutorContextWorkerConfig.test.ts +++ b/pkgs/edge-worker/tests/integration/messageExecutorContextWorkerConfig.test.ts @@ -31,9 +31,10 @@ Deno.test( const mockMessage: PgmqMessageRecord<{test: string}> = { msg_id: 123, read_ct: 2, - enqueued_at: '2024-01-01T00:00:00Z', + enqueued_at: '2024-01-01T00:00:00Z', vt: '2024-01-01T00:01:00Z', - message: { test: 'config test' } + message: { test: 'config test' }, + headers: null, }; // Create context similar to what createQueueWorker does diff --git a/pkgs/edge-worker/tests/unit/contextUtils.test.ts b/pkgs/edge-worker/tests/unit/contextUtils.test.ts index 5accaa3e6..46d12a4d9 100644 --- a/pkgs/edge-worker/tests/unit/contextUtils.test.ts +++ b/pkgs/edge-worker/tests/unit/contextUtils.test.ts @@ -34,6 +34,7 @@ const mockMessage: PgmqMessageRecord<{ test: string }> = { enqueued_at: '2024-01-01T00:00:00Z', vt: '2024-01-01T00:01:00Z', message: { test: 'data' }, + headers: null, }; // Mock pgmq message record with step input structure @@ -43,6 +44,7 @@ const mockStepMessage: PgmqMessageRecord<{ run: { test: string } }> = { enqueued_at: '2024-01-01T00:00:00Z', vt: '2024-01-01T00:01:00Z', message: { run: { test: 'data' } }, + headers: null, }; // Mock step task (using generic typing) diff --git a/pkgs/edge-worker/tests/unit/workerConfigContext.test.ts b/pkgs/edge-worker/tests/unit/workerConfigContext.test.ts index 0b2c3d501..58b03d90b 100644 --- a/pkgs/edge-worker/tests/unit/workerConfigContext.test.ts +++ b/pkgs/edge-worker/tests/unit/workerConfigContext.test.ts @@ -32,10 +32,11 @@ Deno.test('createContextSafeConfig excludes sql field and freezes result', () => Deno.test('Queue worker context includes workerConfig for GitHub issue use case', async () => { const mockMessage: PgmqMessageRecord<{test: string}> = { msg_id: 123, - read_ct: 2, // Current retry attempt + read_ct: 2, // Current retry attempt enqueued_at: '2024-01-01T00:00:00Z', vt: '2024-01-01T00:01:00Z', - message: { test: 'data' } + message: { test: 'data' }, + headers: null, }; const mockConfig: QueueWorkerConfig = { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index a9a11caef..206a954b9 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -274,8 +274,8 @@ importers: specifier: ^3.4.5 version: 3.4.5 supabase: - specifier: 2.21.1 - version: 2.21.1 + specifier: ^2.34.3 + version: 2.54.11 terser: specifier: ^5.43.0 version: 5.43.1 @@ -299,7 +299,7 @@ importers: specifier: ^22.14.1 version: 22.19.0 supabase: - specifier: 2.54.11 + specifier: ^2.34.3 version: 2.54.11 pkgs/dsl: @@ -339,8 +339,8 @@ importers: specifier: ~18.16.20 version: 18.16.20 supabase: - specifier: 2.21.1 - version: 2.21.1 + specifier: ^2.34.3 + version: 2.54.11 pkgs/example-flows: dependencies: @@ -5685,10 +5685,6 @@ packages: big.js@5.2.2: resolution: {integrity: sha512-vyL2OymJxmarO8gxMr0mhChsO9QGwhynfuu4+MHTAW6czfq9humCB7rKpUjDd9YUiDPU4mzpyupFSvOClAwbmQ==} - bin-links@5.0.0: - resolution: {integrity: sha512-sdleLVfCjBtgO5cNjA2HVRvWBJAHs4zwenaCPMNJAJU0yNxpzj80IpjOIimkpkr+mhlA+how5poQtt53PygbHA==} - engines: {node: ^18.17.0 || >=20.5.0} - bin-links@6.0.0: resolution: {integrity: sha512-X4CiKlcV2GjnCMwnKAfbVWpHa++65th9TuzAEYtZoATiOE2DQKhSp4CJlyLoTqdhBKlXjpXjCTYPNNFS33Fi6w==} engines: {node: ^20.17.0 || >=22.9.0} @@ -5978,10 +5974,6 @@ packages: resolution: {integrity: sha512-eYm0QWBtUrBWZWG0d386OGAw16Z995PiOVo2B7bjWSbHedGl5e0ZWaq65kOGgUSNesEIDkB9ISbTg/JK9dhCZA==} engines: {node: '>=6'} - cmd-shim@7.0.0: - resolution: {integrity: sha512-rtpaCbr164TPPh+zFdkWpCyZuKkjpAzODfaZCf/SVJZzJN+4bHQb/LP3Jzq5/+84um3XXY8r548XiWKSborwVw==} - engines: {node: ^18.17.0 || >=20.5.0} - cmd-shim@8.0.0: resolution: {integrity: sha512-Jk/BK6NCapZ58BKUxlSI+ouKRbjH1NLZCgJkYoab+vEHUY3f6OzpNBN9u7HFSv9J6TRDGs4PLOHezoKGaFRSCA==} engines: {node: ^20.17.0 || >=22.9.0} @@ -9078,11 +9070,6 @@ packages: resolution: {integrity: sha512-KZxYo1BUkWD2TVFLr0MQoM8vUUigWD3LlD83a/75BqC+4qE0Hb1Vo5v1FgcfaNXvfXzr+5EhQ6ing/CaBijTlw==} engines: {node: '>= 18'} - mkdirp@3.0.1: - resolution: {integrity: sha512-+NsyUUAZDmo6YVHzL/stxSu3t9YS1iljliy3BSDrXJ/dkn1KYdmtZODGGjLcc9XLgVVpH4KshHB8XmZgMhaBXg==} - engines: {node: '>=10'} - hasBin: true - mlly@1.8.0: resolution: {integrity: sha512-l8D9ODSRWLe2KHJSifWGwBqpTZXIXTeo8mlKjY+E2HAakaTeNpqAyBZ8GSqLzHgw4XmHmC8whvpjJNMbFZN7/g==} @@ -9297,10 +9284,6 @@ packages: resolution: {integrity: sha512-X06Mfd/5aKsRHc0O0J5CUedwnPmnDtLF2+nq+KN9KSDlJHkPuh0JUviWjEWMe0SW/9TDdSLVPuk7L5gGTIA1/w==} engines: {node: '>=14.16'} - npm-normalize-package-bin@4.0.0: - resolution: {integrity: sha512-TZKxPvItzai9kN9H/TkmCtx/ZN/hvr3vUycjlfmH0ootY9yFBzNOpiXAdIn1Iteqsvk4lQn6B5PTrt+n6h8k/w==} - engines: {node: ^18.17.0 || >=20.5.0} - npm-normalize-package-bin@5.0.0: resolution: {integrity: sha512-CJi3OS4JLsNMmr2u07OJlhcrPxCeOeP/4xq67aWNai6TNWWbTrlNDgl8NcFKVlcBKp18GPj+EzbNIgrBfZhsag==} engines: {node: ^20.17.0 || >=22.9.0} @@ -10039,10 +10022,6 @@ packages: resolution: {integrity: sha512-++Vn7NS4Xf9NacaU9Xq3URUuqZETPsf8L4j5/ckhaRYsfPeRyzGw+iDjFhV/Jr3uNmTvvddEJFWh5R1gRgUH8A==} engines: {node: ^14.17.0 || ^16.13.0 || >=18.0.0} - proc-log@5.0.0: - resolution: {integrity: sha512-Azwzvl90HaF0aCz1JrDdXQykFakSSNPaPoiZ9fm5qJIMHioDZEi7OAdRwSm6rSoPtY3Qutnm3L7ogmg3dc+wbQ==} - engines: {node: ^18.17.0 || >=20.5.0} - proc-log@6.0.0: resolution: {integrity: sha512-KG/XsTDN901PNfPfAMmj6N/Ywg9tM+bHK8pAz+27fS4N4Pcr+4zoYBOcGSBu6ceXYNPxkLpa4ohtfxV1XcLAfA==} engines: {node: ^20.17.0 || >=22.9.0} @@ -10182,10 +10161,6 @@ packages: read-cache@1.0.0: resolution: {integrity: sha512-Owdv/Ft7IjOgm/i0xvNDZ1LrRANRfew4b2prF3OWMQLxLfu3bS8FVhCsrSCMK4lR56Y9ya+AThoTpDCTxCmpRA==} - read-cmd-shim@5.0.0: - resolution: {integrity: sha512-SEbJV7tohp3DAAILbEMPXavBjAnMN0tVnh4+9G8ihV4Pq3HYF9h8QNez9zkJ1ILkv9G2BjdzwctznGZXgu/HGw==} - engines: {node: ^18.17.0 || >=20.5.0} - read-cmd-shim@6.0.0: resolution: {integrity: sha512-1zM5HuOfagXCBWMN83fuFI/x+T/UhZ7k+KIzhrHXcQoeX5+7gmaDYjELQHmmzIodumBHeByBJT4QYS7ufAgs7A==} engines: {node: ^20.17.0 || >=22.9.0} @@ -11150,11 +11125,6 @@ packages: suf-log@2.5.3: resolution: {integrity: sha512-KvC8OPjzdNOe+xQ4XWJV2whQA0aM1kGVczMQ8+dStAO6KfEB140JEVQ9dE76ONZ0/Ylf67ni4tILPJB41U0eow==} - supabase@2.21.1: - resolution: {integrity: sha512-xef5mK2vrs/ApaHMOCeL/0rooq2M8xdV632/3VFY2gaoQqYiSGQlh1Yd/Yqa1TPUoPcsszuK6YsjkOdGs+e1iQ==} - engines: {npm: '>=8'} - hasBin: true - supabase@2.54.11: resolution: {integrity: sha512-KuDDVi1s2fhfun81LNPaCLpW4/LFsP3G2LKUQimzEoj64sP1DtZ/d97uw6GFYbNMPW9JC2Ruuei53qHInOwJLA==} engines: {npm: '>=8'} @@ -11256,10 +11226,6 @@ packages: tar-stream@3.1.7: resolution: {integrity: sha512-qJj60CXt7IU1Ffyc3NJMjh6EkuCFej46zUqJ4J7pqYlThyd9bO0XBTmcOIhSzZJVWfsLks0+nle/j538YAW9RQ==} - tar@7.4.3: - resolution: {integrity: sha512-5S7Va8hKfV7W5U6g3aYxXmlPoZVAwUMy9AOKyF2fVuZa2UD3qZjg578OrLRt8PcNN1PleVaL/5/yYATNL0ICUw==} - engines: {node: '>=18'} - tar@7.5.1: resolution: {integrity: sha512-nlGpxf+hv0v7GkWBK2V9spgactGOp0qvfWRxUMjqHyzrt3SgwE48DIv/FhqPHJYLHpgW1opq3nERbz5Anq7n1g==} engines: {node: '>=18'} @@ -12413,10 +12379,6 @@ packages: resolution: {integrity: sha512-+QU2zd6OTD8XWIJCbffaiQeH9U73qIqafo1x6V1snCWYGJf6cVE0cDR4D8xRzcEnfI21IFrUPzPGtcPf8AC+Rw==} engines: {node: ^14.17.0 || ^16.13.0 || >=18.0.0} - write-file-atomic@6.0.0: - resolution: {integrity: sha512-GmqrO8WJ1NuzJ2DrziEI2o57jKAVIQNf8a18W3nCYU3H7PNWqCCVTeH6/NQE93CIllIgQS98rrmVkYgTX9fFJQ==} - engines: {node: ^18.17.0 || >=20.5.0} - write-file-atomic@7.0.0: resolution: {integrity: sha512-YnlPC6JqnZl6aO4uRc+dx5PHguiR9S6WeoLtpxNT9wIG+BDya7ZNE1q7KOjVgaA73hKhKLpVPgJ5QA9THQ5BRg==} engines: {node: ^20.17.0 || >=22.9.0} @@ -19014,14 +18976,6 @@ snapshots: big.js@5.2.2: {} - bin-links@5.0.0: - dependencies: - cmd-shim: 7.0.0 - npm-normalize-package-bin: 4.0.0 - proc-log: 5.0.0 - read-cmd-shim: 5.0.0 - write-file-atomic: 6.0.0 - bin-links@6.0.0: dependencies: cmd-shim: 8.0.0 @@ -19320,8 +19274,6 @@ snapshots: clsx@2.1.1: {} - cmd-shim@7.0.0: {} - cmd-shim@8.0.0: {} co@4.6.0: {} @@ -23372,8 +23324,6 @@ snapshots: dependencies: minipass: 7.1.2 - mkdirp@3.0.1: {} - mlly@1.8.0: dependencies: acorn: 8.15.0 @@ -23719,8 +23669,6 @@ snapshots: normalize-url@8.1.0: {} - npm-normalize-package-bin@4.0.0: {} - npm-normalize-package-bin@5.0.0: {} npm-package-arg@11.0.1: @@ -24531,8 +24479,6 @@ snapshots: proc-log@3.0.0: {} - proc-log@5.0.0: {} - proc-log@6.0.0: {} process-nextick-args@2.0.1: {} @@ -24656,8 +24602,6 @@ snapshots: dependencies: pify: 2.3.0 - read-cmd-shim@5.0.0: {} - read-cmd-shim@6.0.0: {} read-package-up@11.0.0: @@ -25853,15 +25797,6 @@ snapshots: dependencies: s.color: 0.0.15 - supabase@2.21.1: - dependencies: - bin-links: 5.0.0 - https-proxy-agent: 7.0.6(supports-color@10.2.2) - node-fetch: 3.3.2 - tar: 7.4.3 - transitivePeerDependencies: - - supports-color - supabase@2.54.11: dependencies: bin-links: 6.0.0 @@ -25989,15 +25924,6 @@ snapshots: - bare-abort-controller - react-native-b4a - tar@7.4.3: - dependencies: - '@isaacs/fs-minipass': 4.0.1 - chownr: 3.0.0 - minipass: 7.1.2 - minizlib: 3.1.0 - mkdirp: 3.0.1 - yallist: 5.0.0 - tar@7.5.1: dependencies: '@isaacs/fs-minipass': 4.0.1 @@ -27334,11 +27260,6 @@ snapshots: imurmurhash: 0.1.4 signal-exit: 4.1.0 - write-file-atomic@6.0.0: - dependencies: - imurmurhash: 0.1.4 - signal-exit: 4.1.0 - write-file-atomic@7.0.0: dependencies: imurmurhash: 0.1.4