diff --git a/COMPILE_WORKER.md b/COMPILE_WORKER.md new file mode 100644 index 000000000..a014fee28 --- /dev/null +++ b/COMPILE_WORKER.md @@ -0,0 +1,331 @@ +# Auto-Compilation: Simplified Flow Development + +> **Implementation**: This feature is being built in two phases: +> +> - **Phase 1 (MVP)**: Core auto-compilation with conservative behavior +> - **Phase 2 (Enhancement)**: Smart updates that preserve data when possible + +--- + +## 🚀 Local Development - No Manual Steps + +### 1. Start Edge Runtime + +```bash +supabase functions serve +``` + +### 2. Start Worker (Triggers Auto-Compilation) + +```bash +curl http://localhost:54321/functions/v1/my-worker +``` + +- Worker detects local environment ([see how](#environment-detection)) +- Auto-creates flow in database +- ✅ Ready to process tasks immediately + +### 3. Edit Flow Code + +Make changes to your flow definition file. + +### 4. Restart Worker (After Code Changes) + +```bash +# Kill `functions serve` (Ctrl+C), then restart +supabase functions serve +``` + +```bash +# Start worker with fresh code +curl http://localhost:54321/functions/v1/my-worker +``` + +- Worker auto-updates flow definition +- ✅ Ready to test immediately + +**What happens automatically:** + +- Worker detects local environment +- Compares flow code with database definition +- Updates database to match your code +- **Phase 1**: Always drops and recreates (fresh state guaranteed) +- **Phase 2**: Preserves test data when only runtime options change + +**No `pgflow compile` commands needed in development! 🎉** + +--- + +## 🔍 Environment Detection + +Workers automatically detect whether they're running locally or in production. + +```typescript +// Check for Supabase-specific environment variables +const isLocal = !Boolean( + Deno.env.get('DENO_DEPLOYMENT_ID') || Deno.env.get('SB_REGION') +); +``` + +**How it works:** + +- These environment variables are automatically set by Supabase on hosted deployments +- When running `supabase functions serve` locally, these variables are absent +- Additional DB URL validation warns about unexpected configurations + +**Result:** + +- **Local**: Auto-compilation enabled - worker creates/updates flows automatically +- **Production**: Conservative mode - requires explicit migrations for existing flows + +--- + +## 🏭 Production Deployment + +### Phase 1: Conservative Approach + +**Behavior**: + +- **New flows**: Auto-created on first deployment ✅ +- **Existing flows**: Worker fails, requires migration ❌ + +#### Deploy New Flow + +```bash +# 1. Deploy worker code +supabase functions deploy my-worker + +# 2. First request auto-creates flow +curl https://your-project.supabase.co/functions/v1/my-worker +# ✅ Ready to handle requests +``` + +#### Update Existing Flow + +```bash +# 1. Generate migration +pgflow compile flows/my-flow.ts + +# 2. Deploy migration +supabase db push + +# 3. Deploy worker code +supabase functions deploy my-worker +# ✅ Worker verifies flow matches +``` + +**Phase 1 Benefits**: + +- ✅ Explicit control over production changes +- ✅ Clear audit trail (migrations) +- ✅ Fail-fast protection +- ✅ Simple, predictable behavior + +**Phase 1 Trade-off**: + +- ⚠️ Even option-only changes require migration + +--- + +### Phase 2: Smart Updates (Enhancement) + +**Additional Behavior**: + +- **Existing flows with matching structure**: Auto-updates runtime options ✅ +- **Existing flows with structure changes**: Still requires migration ❌ + +#### Update Runtime Options (No Migration Needed!) + +```bash +# 1. Change timeout/maxAttempts in code +# 2. Deploy worker +supabase functions deploy my-worker +# ✅ Options updated automatically (no migration!) +``` + +#### Update Flow Structure (Migration Required) + +```bash +# 1. Add new step or change dependencies +# 2. Generate migration +pgflow compile flows/my-flow.ts + +# 3. Deploy migration + worker +supabase db push +supabase functions deploy my-worker +``` + +**Phase 2 Benefits**: + +- ✅ Faster deploys for option changes +- ✅ Still safe (structure changes require migration) +- ✅ Backward compatible with Phase 1 + +**Phase 2 Addition: Strict Mode** _(Optional)_ + +```bash +# Require migrations even for new flows +PGFLOW_REQUIRE_MIGRATIONS=true +``` + +--- + +## ⚙️ Manual Compilation Command + +Generate migration files for explicit deployment control. + +### Basic Usage + +```bash +pgflow compile flows/my-flow.ts +``` + +- Infers worker: `my-flow-worker` (basename + "-worker") +- Checks staleness: compares file mtime with worker startup time +- Returns compiled SQL if worker is fresh + +### Custom Worker Name + +```bash +pgflow compile flows/my-flow.ts --worker custom-worker +``` + +- Use when worker doesn't follow naming convention +- Useful for horizontal scaling or specialized workers + +**Success output:** ✅ + +``` +✓ Compiled successfully: my_flow → SQL migration ready +✓ Created: supabase/migrations/20250108120000_create_my_flow.sql +``` + +**If worker needs restart:** ❌ + +``` +Error: Worker code changed since startup +Action: Restart worker and retry +``` + +--- + +## ⚠️ Edge Cases & Solutions + +### Multiple Worker Instances (Horizontal Scaling) ✅ + +```bash +# All instances handle the same flow +my-flow-worker-1, my-flow-worker-2, my-flow-worker-3 +``` + +- ✅ **Phase 1**: First instance creates, others fail gracefully and retry +- ✅ **Phase 2**: First instance creates, others detect and continue +- ✅ Advisory locks prevent race conditions + +### Stale Worker (Code Changes) ❌ + +**Problem:** Worker started before code changes. + +#### Solution: Restart Worker + +```bash +# Kill `functions serve` (Ctrl+C), then restart +supabase functions serve +``` + +```bash +# Start worker with fresh code +curl http://localhost:54321/functions/v1/my-worker +``` + +**Detection:** CLI compares file modification time with worker startup time. + +--- + +### Flow Definition Changes + +#### Local Development ✅ + +**Phase 1**: + +- ✅ Always drops and recreates +- ✅ Guaranteed fresh state +- ⚠️ Test data lost on every restart + +**Phase 2**: + +- ✅ Preserves test data when only options change +- ✅ Only drops when structure changes (new steps, changed dependencies) +- ✅ Better developer experience + +--- + +#### Production Deployment + +**Phase 1 - Any Change**: + +``` +Error: Flow 'my_flow' already exists +Action: Deploy migration first or use different slug +``` + +Must generate and deploy migration for any change. + +**Phase 2 - Structure Change**: + +``` +Error: Flow 'my_flow' structure mismatch +- Step 'process' dependencies changed: ['fetch'] → ['fetch', 'validate'] +- New step 'validate' added +Action: Deploy migration first (pgflow compile flows/my-flow.ts) +``` + +Structure changes still require migration (safe!). + +**Phase 2 - Option Change**: + +``` +✓ Runtime options updated for flow 'my_flow' +- Step 'process': timeout 30s → 60s +``` + +Option changes work automatically (convenient!). + +--- + +## 📋 Behavior Summary + +### What Gets Auto-Compiled + +| Change Type | Local (Phase 1) | Local (Phase 2) | Production (Phase 1) | Production (Phase 2) | +| -------------------- | ---------------- | ------------------ | -------------------- | -------------------- | +| **New flow** | ✅ Auto-create | ✅ Auto-create | ✅ Auto-create | ✅ Auto-create | +| **Runtime options** | ✅ Drop+recreate | ✅ **Update only** | ❌ Require migration | ✅ **Update only** | +| **Structure change** | ✅ Drop+recreate | ✅ Drop+recreate | ❌ Require migration | ❌ Require migration | + +**Key Insight**: Phase 2 adds smart updates that preserve data and allow option changes without migrations. + +--- + +## 🎯 When to Use Each Phase + +### Ship Phase 1 When: + +- ✅ You want auto-compilation ASAP +- ✅ You're okay with explicit migrations in production +- ✅ You don't mind losing local test data on restarts +- ✅ You want simple, predictable behavior + +### Upgrade to Phase 2 When: + +- ✅ Phase 1 is stable in production +- ✅ You want better local dev experience (preserve test data) +- ✅ You want faster production deploys (option changes without migrations) +- ✅ You've validated Phase 1 works for your workflows + +--- + +## 🔗 See Also + +- **[PLAN_phase1.md](./PLAN_phase1.md)** - Detailed Phase 1 implementation plan +- **[PLAN_phase2.md](./PLAN_phase2.md)** - Detailed Phase 2 enhancement plan diff --git a/EDGE_RUNTIME_HOT_RELOAD.md b/EDGE_RUNTIME_HOT_RELOAD.md new file mode 100644 index 000000000..2aa6b24a2 --- /dev/null +++ b/EDGE_RUNTIME_HOT_RELOAD.md @@ -0,0 +1,66 @@ +## Edge Functions Hot Reload Without `functions serve` + +When you run `supabase start` only (without `supabase functions serve`), Edge Functions **do not hot reload automatically** when you change the code. The Edge Functions runtime that starts with `supabase start` serves the functions, but it doesn't watch for file changes or reload them automatically.[1][2] + +Here's what happens in each scenario: + +**With `supabase start` only:** + +- The Edge Functions runtime starts and serves your functions +- Functions are accessible at `http://localhost:54321/functions/v1/[function-name]` +- Code changes **require a manual restart** (`supabase stop` then `supabase start`) to take effect +- This process takes a while since it restarts the entire Docker stack[1] + +**With `supabase functions serve [function-name]`:** + +- Hot reloading is enabled by default +- The function automatically reloads when you save changes to the code +- Changes are reflected instantly without needing to restart anything[3][4] +- The watcher process monitors your function files for changes + +**Why the difference:** + +The `functions serve` command specifically includes a file watcher that monitors your function code for changes and triggers automatic reloads. This is a development convenience feature separate from the base Edge Functions runtime that runs with `supabase start`.[4][3] + +**Best practice for development:** + +Run both commands together: + +```bash +supabase start # Starts all services including Edge Functions runtime +supabase functions serve [function-name] # Enables hot reload for development +``` + +This gives you the full local stack plus hot reloading for fast iteration. Without `functions serve`, you're stuck doing manual restarts to see code changes, which significantly slows down development.[2][1] + +[1](https://www.reddit.com/r/Supabase/comments/1ebjw82/restarting_edge_functions_locally/) +[2](https://supabase.com/docs/guides/cli/config) +[3](https://supabase.com/docs/guides/functions/development-environment) +[4](https://supabase.com/docs/guides/functions/quickstart) +[5](https://supabase.com/blog/supabase-dynamic-functions) +[6](https://github.com/orgs/supabase/discussions/6786) +[7](https://supabase.com/blog/edge-functions-faster-smaller) +[8](https://github.com/supabase/cli/issues/247) +[9](https://supabase.com/docs/guides/local-development/overview) +[10](https://github.com/orgs/supabase/discussions/36109) +[11](https://www.answeroverflow.com/m/1036625056467533905) +[12](https://www.youtube.com/watch?v=BceVcpiOlKM) +[13](https://github.com/supabase/edge-runtime/issues/306) +[14](https://trigger.dev/docs/guides/frameworks/supabase-edge-functions-basic) +[15](https://github.com/orgs/supabase/discussions/9837) +[16](https://www.reddit.com/r/Supabase/comments/1fgu7bb/supabase_edge_functions_are_not_production_ready/) +[17](https://supabase.com/docs/guides/functions) +[18](https://supabase.com/blog/edge-functions-background-tasks-websockets) +[19](https://supabase.com/blog/edge-runtime-self-hosted-deno-functions) +[20](https://github.com/supabase/edge-runtime/issues/212) +[21](https://supabase.com/edge-functions) +[22](https://github.com/orgs/supabase/discussions/33235) +[23](https://supabase.com/docs/reference/cli/introduction) +[24](https://supabase.com/docs/guides/functions/troubleshooting) +[25](https://stackoverflow.com/questions/78783338/unable-to-run-supabase-edge-functions-locally) +[26](https://ethanmick.com/using-supabases-edge-runtime-to-supercharge-your-app/) +[27](https://docs.weweb.io/workflows/actions/supabase/invoke-edge-function.html) +[28](https://supabase.com/blog/supabase-edge-functions-deploy-dashboard-deno-2-1) +[29](https://supabase.com/docs/reference/javascript/auth-startautorefresh) +[30](https://github.com/orgs/supabase/discussions/37271) +[31](https://supabase.com/docs/guides/troubleshooting/edge-function-shutdown-reasons-explained) diff --git a/PLAN_pgflow_compile_worker.md b/PLAN_pgflow_compile_worker.md new file mode 100644 index 000000000..1c9c4f630 --- /dev/null +++ b/PLAN_pgflow_compile_worker.md @@ -0,0 +1,512 @@ +# pgflow compile via Worker Endpoint + +## Goal + +Replace the problematic deno.json-based CLI compilation with a simple HTTP endpoint on the worker that returns flow shape as SQL statements. + +--- + +## Key Insights + +### What Matters for Compilation + +**Flow STRUCTURE only:** +- Step slugs, step_index, step_type +- Dependencies between steps +- Flow-level and step-level options (maxAttempts, timeout, etc.) + +**What DOESN'T matter:** +- Handler implementations (not stored in DB) +- Handler file changes (no migration needed) +- Imported modules (unless they affect flow structure) + +**Conclusion:** Only the flow definition file's modification time matters for staleness checking. + +--- + +## Architecture + +### CLI Command + +**Phase 0 (Single Flow):** +```bash +pgflow compile flows/my-flow.ts --worker my-flow-worker + +# Optional flags +--skip-freshness-check # Skip staleness validation +--timeout 30 # Wait timeout for auto-reload (default: 10s) +``` + +**Future (Multiple Flows):** +```bash +# Compile specific flow by slug +pgflow compile --worker my-worker --flow my-flow + +# Compile all flows from worker +pgflow compile --worker my-worker --all +``` + +### Worker HTTP Endpoint + +**Endpoint:** `GET /functions/v1//metadata` + +**Response 200 (Phase 0 - Single Flow):** +```json +{ + "worker": { + "worker_id": "abc123-def456-ghi789", + "started_at": "2025-01-31T15:30:45.123Z" + }, + "flows": { + "my-flow": { + "sql": [ + "SELECT pgflow.create_flow('my-flow');", + "SELECT pgflow.add_step('my-flow', 'step1');", + "SELECT pgflow.add_step('my-flow', 'step2', ARRAY['step1']);" + ] + } + } +} +``` + +**Response 200 (Future - Multiple Flows & Enhancements):** +```json +{ + "worker": { + "worker_id": "abc123-def456-ghi789", + "started_at": "2025-01-31T15:30:45.123Z", + "status": "running", + "version": "1.2.0", + "environment": "local" + }, + "flows": { + "flow-1": { + "sql": ["SELECT pgflow.create_flow('flow-1');", "..."], + "shape": { "slug": "flow-1", "steps": [...] }, + "options": { "maxAttempts": 3, "timeout": 30 } + }, + "flow-2": { + "sql": ["SELECT pgflow.create_flow('flow-2');", "..."], + "shape": { "slug": "flow-2", "steps": [...] } + } + } +} +``` + +**Note:** `status`, `version`, and `environment` are future enhancements. Phase 0 only includes `worker_id` and `started_at`. + +**Response 503:** (if worker not fully started) +```json +{ + "error": "worker_not_ready", + "message": "Worker is starting up, try again in a moment" +} +``` + +### Design Rationale + +**Why `/metadata` instead of `/compile`?** +- Extensible structure for future metadata (version, environment, etc.) +- Clean separation: worker-level vs flow-level metadata +- Supports multiple flows without API redesign +- Non-breaking: can add new fields to `worker` or per-flow objects + +**Why nested structure?** +```json +{ + "worker": {...}, // Worker-level: worker_id, started_at (+ future: status, version, env) + "flows": { // Flow-level: sql (+ future: shape, options) + "slug": {...} + } +} +``` +- **Phase 0 worker metadata:** + - `worker_id`: Unique identifier for debugging (helpful to identify which worker instance) + - `started_at`: Process restart time (for staleness check) +- **Future worker metadata:** `status`, `version`, `environment`, etc. +- **Phase 0 flow metadata:** `sql` only +- **Future flow metadata:** `shape`, `options`, `dependencies`, `version`, etc. +- Future-proof: easy to add new metadata at either level without breaking changes + +**No source file paths needed!** +- Runtime cannot reliably determine source file paths from objects +- Phase 0: User provides file path to CLI, CLI knows mtime +- Future: User specifies flow by slug, not file path +- Staleness check uses worker.started_at (process-level, not per-file) + +--- + +## Workflow + +### Local Development (Phase 0) +``` +1. Dev edits flows/my-flow.ts +2. Supabase detects change → auto-reloads Edge Function +3. Worker restarts, started_at updates +4. Dev runs: pgflow compile flows/my-flow.ts --worker my-worker +5. CLI: + - Gets file mtime: 3:00pm + - Calls GET /metadata endpoint + - Receives: { worker: { started_at: "3:00:05pm" }, flows: { "my-flow": { sql: [...] } } } + - Compares: file mtime <= worker.started_at → FRESH! + - Extracts flows["my-flow"].sql + - Generates migration file +``` + +### Staleness Detection (Phase 0) +``` +1. Dev edits flows/my-flow.ts at 3:00pm +2. Worker still running from 2:00pm (auto-reload disabled/failed) +3. Dev runs: pgflow compile flows/my-flow.ts --worker my-worker +4. CLI: + - Gets file mtime: 3:00pm + - Calls GET /metadata endpoint + - Receives: { worker: { started_at: "2:00pm" }, flows: {...} } + - Compares: file mtime > worker.started_at → STALE! + - Error: "Worker is stale. Restart worker or wait for auto-reload." + - Optionally: Poll for freshness (up to --timeout seconds) +``` + +### Multiple Flows (Future) +``` +1. Dev runs: pgflow compile --worker my-worker --flow flow-1 +2. CLI: + - Calls GET /metadata endpoint + - Receives: { worker: {...}, flows: { "flow-1": {...}, "flow-2": {...} } } + - Extracts flows["flow-1"].sql + - Generates migration for flow-1 only + +OR: + +1. Dev runs: pgflow compile --worker my-worker --all +2. CLI: + - Calls GET /metadata endpoint + - Receives: { worker: {...}, flows: { "flow-1": {...}, "flow-2": {...} } } + - Iterates over all flows + - Generates migration for each flow +``` + +--- + +## Implementation Details + +### Phase 0: Worker Endpoint + +**Key Requirements:** +1. Add path routing: `/metadata` vs `/` (root) +2. `/metadata` returns metadata, does NOT start worker +3. `/` starts worker normally (existing behavior) + +**Core Implementation:** +```typescript +// Store at module level +const WORKER_START_TIME = new Date(); + +// Handler returns: +{ + worker: { + worker_id: workerId, // From WorkerBootstrap/WorkerLifecycle + started_at: WORKER_START_TIME.toISOString() + }, + flows: { + [flow.slug]: { + sql: compileFlow(flow) // Use EXISTING function from @pgflow/dsl + } + } +} +``` + +**Key Insight:** No new SQL generation logic needed - reuse existing `compileFlow()` from `@pgflow/dsl`. + +### Phase 0: CLI Implementation + +**Key Changes:** +1. Remove Deno runtime execution and deno.json handling +2. Call `/metadata` endpoint via HTTP +3. Extract SQL from response +4. Generate migration file (keep existing logic) + +**Core Flow:** +```typescript +// 1. Get file mtime (for staleness check) +const fileMtime = fs.statSync(flowPath).mtime; + +// 2. Discover worker URL via `supabase status` +const workerUrl = `${projectUrl}/functions/v1/${workerName}`; + +// 3. Fetch metadata +const { worker, flows } = await fetch(`${workerUrl}/metadata`).then(r => r.json()); + +// 4. Staleness check: fileMtime vs worker.started_at +if (fileMtime > new Date(worker.started_at)) { + // Worker stale - error or poll for auto-reload +} + +// 5. Generate migration from flows[slug].sql +generateMigrationFile(flowSlug, flows[flowSlug].sql); +``` + +**Future:** Support `--flow ` and `--all` flags for multiple flows. + +--- + +## Supabase Auto-Reload Research + +**Questions to validate:** +1. Does module-level `WORKER_START_TIME` update when Supabase auto-reloads? +2. What's the typical reload delay? (for polling timeout) +3. Does it watch all files in `functions/` dir or just main file? +4. How reliable is it? (rapid edits, edge cases) + +**Simple test:** Edit flow file, check if `/metadata` endpoint returns updated `started_at`. + +--- + +## Benefits + +### What We Gain +- ✅ No Deno runtime in CLI (simpler!) +- ✅ No deno.json configuration (less complexity!) +- ✅ Worker is source of truth (consistent!) +- ✅ Staleness detection via Supabase auto-reload +- ✅ Works for both local dev and production migration generation +- ✅ Simpler to test (HTTP endpoint vs Deno subprocess) +- ✅ **Future-proof `/metadata` structure:** + - Supports multiple flows without API redesign + - Can add worker metadata (version, environment) without breaking changes + - Can add flow metadata (shape, options) in Phase 1+ without breaking changes + - Clean separation: worker-level vs flow-level metadata + +### What We Give Up +- ⚠️ Requires running worker for compilation (but this is already the case for local dev) +- ⚠️ Depends on Supabase auto-reload reliability (can be overridden with --skip-freshness-check) + +### Future Enhancements Enabled by `/metadata` Structure + +**Phase 1 (Auto-Compilation):** +- Add `shape` field to flows: `flows[slug].shape = serializeFlow(flow)` +- Worker auto-compilation uses same shape for DB comparison +- No API changes needed! + +**Future (Multiple Flows):** +- Add multiple flows to response: `flows: { "flow-1": {...}, "flow-2": {...} }` +- CLI iterates over flows or picks specific one by slug +- No API redesign needed! + +**Future (Worker Metadata):** +- Add `worker.version` from package.json +- Add `worker.environment` detection (local/production) +- Add `worker.platform` info (Deno version, etc.) +- No breaking changes! + +**Future (Flow Metadata):** +- Add `flows[slug].options` for runtime configuration +- Add `flows[slug].dependencies` for visualization +- Add `flows[slug].version` for flow versioning +- All backward-compatible additions! + +--- + +## Implementation Phases + +### Phase 0: Worker Endpoint (No Staleness Check) +**Goal:** Get basic `/metadata` endpoint working with existing `compileFlow()` + +- [ ] Add path routing to Deno.serve handler (`/metadata` vs `/`) +- [ ] Implement `/metadata` endpoint handler +- [ ] Store worker start time (module-level constant: `const WORKER_START_TIME = new Date()`) +- [ ] Get worker_id from WorkerLifecycle or generate at module level +- [ ] Use **existing** `compileFlow()` from `@pgflow/dsl` (no new functions needed!) +- [ ] Return JSON response: `{ worker: { worker_id, started_at }, flows: { [slug]: { sql } } }` +- [ ] Test manually with curl/browser + +**Deliverable:** Worker responds to `/metadata` with SQL statements and basic worker info (worker_id, started_at) + +**Key Insights:** +- NO `serializeFlow()` needed yet! Just reuse existing `compileFlow()` +- NO `status`, `version`, `environment` needed yet! Just worker_id and started_at +- Future enhancements are easy to add without breaking changes + +### Phase 0: CLI Implementation +**Goal:** Replace deno.json approach with HTTP calls + +- [ ] Remove Deno runtime execution from CLI +- [ ] Remove deno.json handling +- [ ] Add worker URL discovery via `supabase status` +- [ ] Add HTTP client to call `/metadata` endpoint +- [ ] Parse response: `const { worker, flows } = await response.json()` +- [ ] Extract SQL: `const flowData = flows[flowSlug]` +- [ ] Generate migration file (keep existing logic) +- [ ] Test with local worker + +**Deliverable:** `pgflow compile flows/my-flow.ts --worker my-worker` generates migration + +### Phase 0 (Optional): Staleness Check +**Goal:** Prevent stale compilations (can be added after research) + +- [ ] Research Supabase auto-reload behavior (see above) +- [ ] Add file mtime reading to CLI: `fs.statSync(flowPath).mtime` +- [ ] Add staleness comparison: `fileMtime > worker.started_at` +- [ ] Add polling with timeout (optional) +- [ ] Add `--skip-freshness-check` flag +- [ ] Add helpful error messages + +**Deliverable:** CLI detects stale workers and waits for reload (or warns) + +--- + +## Edge Cases & Mitigations + +### Case 1: Worker not started yet +**Problem:** Dev runs compile before worker has fully started +**Mitigation:** Return 503 status, CLI retries with exponential backoff + +### Case 2: Multiple workers for same flow +**Problem:** User has multiple instances running (shouldn't happen in local dev) +**Mitigation:** User must specify exact worker name with `--worker` flag + +### Case 3: Auto-reload disabled/broken +**Problem:** Supabase auto-reload not working +**Mitigation:** +- CLI detects staleness, shows error with instructions +- User can use `--skip-freshness-check` to override +- Or manually restart: `supabase functions serve --no-verify-jwt` + +### Case 4: Network issues +**Problem:** CLI can't reach worker (firewall, wrong URL, etc.) +**Mitigation:** +- Clear error messages with troubleshooting steps +- Verify `supabase status` shows correct URL +- Check worker is actually running + +--- + +## Testing Strategy + +### Manual Testing (Trust-Based Development) +Since we don't have E2E tests yet: + +1. **Endpoint responds correctly:** + ```bash + curl http://localhost:54321/functions/v1/my-worker/metadata + # Should return JSON: { worker: { started_at }, flows: { "my-flow": { sql: [...] } } } + ``` + +2. **Worker doesn't start on /metadata:** + - Check logs, ensure no "Worker started" message on /metadata request + - Ensure DB connections aren't created + - Only `/` (root path) should start worker + +3. **CLI generates migration:** + ```bash + pgflow compile flows/my-flow.ts --worker my-worker + # Should create migration file in supabase/migrations/ + ``` + +4. **Staleness detection works (if implemented):** + - Edit flow file + - Stop auto-reload (if possible) or simulate stale worker + - Run compile + - Should detect staleness and error/poll + +5. **Future-proof structure:** + - Verify response has nested `worker` and `flows` objects + - Verify can add new fields without breaking CLI + - Test with multiple flows (when supported) + +### Future E2E Tests (When Available) +- Start worker, call /metadata, verify response structure +- Edit flow, trigger reload, verify freshness +- Test staleness detection and polling +- Test error cases (worker not running, network errors) +- Test multiple flows in single worker + +--- + +## Success Criteria + +**Phase 0 is successful when:** +- ✅ Worker has `/metadata` endpoint that doesn't start worker +- ✅ Endpoint returns valid JSON: `{ worker: {...}, flows: {...} }` +- ✅ Uses existing `compileFlow()` from `@pgflow/dsl` (no new functions!) +- ✅ CLI can call endpoint and generate migration file +- ✅ No deno.json needed in CLI +- ✅ No Deno runtime needed in CLI +- ✅ Works with `supabase functions serve` in local dev +- ✅ Structure supports future enhancements (multiple flows, metadata, etc.) + +**Phase 0 (Optional) - Staleness Check is successful when:** +- ✅ Staleness check detects outdated workers +- ✅ Polling waits for auto-reload (with timeout) +- ✅ Clear error messages guide users +- ✅ `--skip-freshness-check` flag works as escape hatch + +**Future Phases Build On This Foundation:** +- Phase 1: Add `serializeFlow()` and `flows[slug].shape` for auto-compilation +- Future: Add multiple flows support (CLI `--flow` and `--all` flags) +- Future: Add worker metadata (version, environment) +- Future: Add flow metadata (options, dependencies) + +--- + +## Open Questions + +1. **Should /metadata endpoint require authentication?** + - Local dev: Probably not needed (localhost only) + - Production: Should never be exposed (but compile is local-only anyway) + - **Decision:** No auth for Phase 0 (local dev only) + +2. **Should we support compiling multiple flows in one call?** + - Phase 0: No, one flow per worker + - Future: Yes, via `/metadata` structure (already supports it!) + - **Decision:** Phase 0 returns single flow, structure ready for multiple + +3. **Where should serializeFlow() live (Phase 1)?** + - Option A: In `@pgflow/dsl` (shared with auto-compilation) + - Option B: In `@pgflow/edge-worker` (keep isolated) + - **Decision:** Option A (reuse for Phase 1 auto-compilation) + +4. **Should worker.started_at be stored in DB or memory?** + - DB: More accurate, matches Phase 1 auto-compilation check + - Memory: Simpler, no DB query needed + - **Decision:** Memory (module-level constant) for Phase 0 + +5. **Can we determine source file paths at runtime?** + - Answer: **No, not reliably** in TypeScript/Deno + - Stack traces don't show definition location + - import.meta.url only works within the module + - **Solution:** Don't rely on source paths! Use flow slugs instead + - Phase 0: User provides file path to CLI + - Future: User specifies flow by slug (`--flow my-flow`) + +## Why This Design Works + +### No Source Path Tracking Needed +The `/metadata` structure doesn't require knowing source file paths because: + +1. **Phase 0 (Single Flow):** + - User: `pgflow compile flows/my-flow.ts --worker my-worker` + - CLI knows file path (user provided it) + - CLI reads file mtime, compares with `worker.started_at` + - Worker returns flow by slug, CLI matches it + +2. **Future (Multiple Flows):** + - User: `pgflow compile --worker my-worker --flow my-flow` + - No file path needed! User specifies flow by slug + - Worker returns all flows, CLI picks the requested one + - Staleness check at worker level (process restart) + +3. **Why Slugs > File Paths:** + - Slugs are reliable (defined in flow code) + - File paths are fragile (depend on FS location) + - Runtime can't determine source paths anyway + - Slugs work across environments (local, production) + +### Future-Proof Without Complexity +The `/metadata` structure enables future features without painting us into a corner: + +- ✅ Multiple flows: Just add more keys to `flows` object +- ✅ Worker metadata: Just add more keys to `worker` object +- ✅ Flow metadata: Just add more keys to each flow object +- ✅ Non-breaking: Old clients ignore new fields +- ✅ No source path magic needed diff --git a/PLAN_phase1.md b/PLAN_phase1.md new file mode 100644 index 000000000..83dba24cc --- /dev/null +++ b/PLAN_phase1.md @@ -0,0 +1,566 @@ +# Auto-Compilation Phase 1: MVP Basics + +## Goal + +Enable auto-compilation with conservative, simple logic. Ship quickly, safely. + +--- + +## Core Principle + +**Safety over Smartness**: Compare flow structure. Continue if match, drop+recreate (local) or fail (production) if mismatch. + +--- + +## Behavior Matrix + +| Environment | Flow Exists? | Shapes Match? | Action | Result | +| ----------- | ------------ | ------------- | ------------------- | --------------------------- | +| Local | No | N/A | Create fresh | ✅ New flow | +| Local | Yes | **Match** | Continue (noop) | ✅ No changes needed | +| Local | Yes | **Mismatch** | Drop all → Recreate | ✅ Fresh state | +| Production | No | N/A | Create fresh | ✅ First deployment | +| Production | Yes | **Match** | Continue (noop) | ✅ No changes needed | +| Production | Yes | **Mismatch** | **RAISE EXCEPTION** | ❌ "Deploy migration first" | + +**Key Logic**: Shape comparison determines if flow needs update. Only structure matters (slugs, indexes, types, deps), not options. + +--- + +## Architecture + +### Worker Code + +```typescript +const flowShape = serializeFlow(flow); // Convert to JSON +await db.transaction(async (tx) => { + await tx.execute('SELECT pgflow.ensure_flow_compiled($1, $2)', [ + flowShape, + isLocal, + ]); +}); +``` + +### SQL Function + +```sql +pgflow.ensure_flow_compiled(flow_shape jsonb, is_local bool) RETURNS jsonb +``` + +**Logic**: + +1. Acquire advisory lock: `pg_advisory_xact_lock(hashtext('pgflow:' || slug))` +2. Check: `SELECT id FROM flows WHERE slug = ...` +3. Decision: + - NOT exists → Create fresh (both envs) + - Exists → Compare shapes + - Match → Continue (noop) + - Mismatch + is_local → Drop all, recreate + - Mismatch + !is_local → RAISE EXCEPTION + +**Shape comparison determines path!** + +--- + +## Environment Detection + +### Primary Check (Authoritative) + +```typescript +const isLocal = !Boolean( + Deno.env.get('DENO_DEPLOYMENT_ID') || Deno.env.get('SB_REGION') +); +``` + +### DB URL Validation (Warning Only) + +```typescript +const dbUrl = Deno.env.get('EDGE_WORKER_DB_URL') || ''; +const isSupabaseLocal = + dbUrl.includes('localhost:54322') || + dbUrl.includes('127.0.0.1:54322') || + dbUrl.includes('localhost:54321'); + +// Warn but don't block +if (isLocal && dbUrl && !isSupabaseLocal) { + console.warn( + "[pgflow] DB URL doesn't match Supabase local pattern. Verify configuration!" + ); +} + +if (!isLocal && isSupabaseLocal) { + console.warn( + '[pgflow] Production env vars but local DB URL. Configuration mismatch?' + ); +} +``` + +**Key Change**: Warnings not errors (supports Docker/custom setups). + +--- + +## Queue Management + +### Cleanup (drop_flow_data) + +```sql +-- Check before dropping +FOR step_record IN SELECT slug FROM pgflow.steps WHERE flow_id = v_flow_id LOOP + IF EXISTS(SELECT 1 FROM pgmq.list_queues() WHERE queue_name = step_record.slug) THEN + PERFORM pgmq.drop_queue(step_record.slug); + END IF; +END LOOP; +``` + +### Creation (Idempotent) + +```sql +-- In worker or SQL +IF NOT EXISTS(SELECT 1 FROM pgmq.list_queues() WHERE queue_name = step_slug) THEN + PERFORM pgmq.create_queue(step_slug); +END IF; +``` + +**Uses**: `pgmq.list_queues()`, `pgmq.drop_queue()`, `pgmq.create_queue()` + +--- + +## Trade-offs + +### What We Give Up (Phase 1) + +- ❌ Local test data preserved (drops on mismatch) +- ❌ Production option-only updates (requires migration) + +### What We Gain + +- ✅ Shape comparison prevents unnecessary recreates +- ✅ No-op when flow unchanged (fast restarts!) +- ✅ Conservative, safe +- ✅ Clear error messages on mismatch +- ✅ Easy to test +- ✅ Ships quickly + +**Decision**: Phase 1 includes shape comparison for safety and performance. Phase 2 adds option updates. + +--- + +## SQL Functions + +### Helper: compare_flow_shapes + +```sql +CREATE FUNCTION pgflow.compare_flow_shapes( + v_flow_id bigint, + desired_shape jsonb +) RETURNS boolean AS $$ +DECLARE + current_shape jsonb; +BEGIN + -- Extract current structure from DB (structural fields only, no options) + SELECT jsonb_build_object( + 'slug', f.slug, + 'steps', ( + SELECT jsonb_agg( + jsonb_build_object( + 'slug', s.slug, + 'stepIndex', s.step_index, + 'stepType', s.step_type, + 'dependencies', COALESCE( + ( + SELECT array_agg(parent_step.slug ORDER BY parent_step.step_index, parent_step.slug) + FROM pgflow.deps d + JOIN pgflow.steps parent_step ON parent_step.id = d.parent_step_id + WHERE d.child_step_id = s.id + ), + ARRAY[]::text[] + ) + ) + ORDER BY s.step_index, s.slug + ) + FROM pgflow.steps s + WHERE s.flow_id = f.id + ) + ) INTO current_shape + FROM pgflow.flows f + WHERE f.id = v_flow_id; + + -- Strip options from desired shape for comparison + desired_shape := jsonb_build_object( + 'slug', desired_shape->>'slug', + 'steps', ( + SELECT jsonb_agg( + jsonb_build_object( + 'slug', step->>'slug', + 'stepIndex', (step->>'stepIndex')::int, + 'stepType', step->>'stepType', + 'dependencies', step->'dependencies' + ) + ) + FROM jsonb_array_elements(desired_shape->'steps') step + ) + ); + + -- Compare structural fields only + RETURN current_shape = desired_shape; +END; +$$ LANGUAGE plpgsql; +``` + +### Main Function + +```sql +CREATE FUNCTION pgflow.ensure_flow_compiled(flow_shape jsonb, is_local bool) +RETURNS jsonb AS $$ +DECLARE + v_slug text; + v_flow_id bigint; + lock_key bigint; + shapes_match boolean; +BEGIN + v_slug := flow_shape->>'slug'; + lock_key := hashtext('pgflow:' || v_slug); + + -- Advisory lock + PERFORM pg_advisory_xact_lock(lock_key); + + -- Check existence + SELECT id INTO v_flow_id FROM pgflow.flows WHERE slug = v_slug; + + IF v_flow_id IS NULL THEN + -- New flow: create + PERFORM pgflow.create_flow_from_shape(flow_shape); + RETURN jsonb_build_object('status', 'created'); + ELSE + -- Flow exists: compare shapes + shapes_match := pgflow.compare_flow_shapes(v_flow_id, flow_shape); + + IF shapes_match THEN + -- Shapes match: no action needed + RETURN jsonb_build_object('status', 'unchanged'); + ELSE + -- Shapes mismatch + IF is_local THEN + -- Local: drop and recreate + PERFORM pgflow.drop_flow_data(v_slug); + PERFORM pgflow.create_flow_from_shape(flow_shape); + RETURN jsonb_build_object('status', 'recreated'); + ELSE + -- Production: fail + RAISE EXCEPTION 'Flow ''%'' structure mismatch. Deploy migration first (pgflow compile).', v_slug; + END IF; + END IF; + END IF; +END; +$$ LANGUAGE plpgsql; +``` + +### Helper: drop_flow_data + +```sql +CREATE FUNCTION pgflow.drop_flow_data(p_flow_slug text) +RETURNS void AS $$ +DECLARE + v_flow_id bigint; + step_rec record; +BEGIN + SELECT id INTO v_flow_id FROM pgflow.flows WHERE slug = p_flow_slug; + IF v_flow_id IS NULL THEN RETURN; END IF; + + -- Drop queues (idempotent) + FOR step_rec IN SELECT slug FROM pgflow.steps WHERE flow_id = v_flow_id LOOP + IF EXISTS(SELECT 1 FROM pgmq.list_queues() WHERE queue_name = step_rec.slug) THEN + PERFORM pgmq.drop_queue(step_rec.slug); + END IF; + END LOOP; + + -- Delete data (CASCADE handles deps, step_states) + DELETE FROM pgflow.steps WHERE flow_id = v_flow_id; + DELETE FROM pgflow.runs WHERE flow_id = v_flow_id; +END; +$$ LANGUAGE plpgsql; +``` + +### Helper: create_flow_from_shape + +```sql +CREATE FUNCTION pgflow.create_flow_from_shape(flow_shape jsonb) +RETURNS bigint AS $$ +DECLARE + v_flow_id bigint; + v_slug text; + step_json jsonb; +BEGIN + -- Extract flow properties + v_slug := flow_shape->>'slug'; + + -- Create flow + INSERT INTO pgflow.flows ( + slug, + max_attempts, + base_delay, + timeout + ) VALUES ( + v_slug, + COALESCE((flow_shape->'options'->>'maxAttempts')::int, 3), + COALESCE((flow_shape->'options'->>'baseDelay')::int, 5), + COALESCE((flow_shape->'options'->>'timeout')::int, 30) + ) RETURNING id INTO v_flow_id; + + -- Create steps + FOR step_json IN SELECT * FROM jsonb_array_elements(flow_shape->'steps') + LOOP + PERFORM pgflow.create_step_from_json(v_flow_id, step_json); + END LOOP; + + RETURN v_flow_id; +END; +$$ LANGUAGE plpgsql; +``` + +### Helper: create_step_from_json + +```sql +CREATE FUNCTION pgflow.create_step_from_json(v_flow_id bigint, step_json jsonb) +RETURNS bigint AS $$ +DECLARE + v_step_id bigint; + v_step_slug text; + dep_slug text; +BEGIN + v_step_slug := step_json->>'slug'; + + -- Create step + INSERT INTO pgflow.steps ( + flow_id, + slug, + step_index, + step_type, + max_attempts, + base_delay, + timeout, + start_delay + ) VALUES ( + v_flow_id, + v_step_slug, + (step_json->>'stepIndex')::int, + step_json->>'stepType', + (step_json->'options'->>'maxAttempts')::int, + (step_json->'options'->>'baseDelay')::int, + (step_json->'options'->>'timeout')::int, + (step_json->'options'->>'startDelay')::int + ) RETURNING id INTO v_step_id; + + -- Create dependencies + IF step_json->'dependencies' IS NOT NULL THEN + FOR dep_slug IN SELECT jsonb_array_elements_text(step_json->'dependencies') + LOOP + INSERT INTO pgflow.deps (parent_step_id, child_step_id) + SELECT parent.id, v_step_id + FROM pgflow.steps parent + WHERE parent.flow_id = v_flow_id AND parent.slug = dep_slug; + END LOOP; + END IF; + + RETURN v_step_id; +END; +$$ LANGUAGE plpgsql; +``` + +--- + +## TypeScript Implementation + +### serializeFlow() + +```typescript +interface FlowShape { + slug: string; + options: { + maxAttempts?: number; + baseDelay?: number; + timeout?: number; + }; + steps: StepShape[]; +} + +interface StepShape { + slug: string; + stepIndex: number; + stepType: string; + dependencies: string[]; // Sorted alphabetically + options: { + maxAttempts?: number; + baseDelay?: number; + timeout?: number; + startDelay?: number; + }; +} + +function serializeFlow(flow: AnyFlow): FlowShape { + return { + slug: flow.slug, + options: flow.options, + steps: flow.stepOrder.map((stepSlug) => { + const step = flow.getStepDefinition(stepSlug); + + // Sort dependencies by stepIndex, then slug for deterministic comparison + const sortedDependencies = [...step.dependencies].sort((a, b) => { + const aStep = flow.getStepDefinition(a); + const bStep = flow.getStepDefinition(b); + + // First by stepIndex (topological order) + if (aStep.stepIndex !== bStep.stepIndex) { + return aStep.stepIndex - bStep.stepIndex; + } + + // Then by slug (lexicographic) + return a.localeCompare(b); + }); + + return { + slug: step.slug, + stepIndex: step.stepIndex, + stepType: step.stepType, + dependencies: sortedDependencies, + options: step.options, + }; + }), + }; +} +``` + +--- + +## HTTP Compilation Endpoint + +**Same as original plan** - no changes needed. + +``` +GET /functions/v1//compile?check_after= + +Response 200: { sql: string[], flow_slug: string } +Response 409: { error: "stale_worker", ... } +``` + +CLI command unchanged: + +```bash +pgflow compile flows/my-flow.ts +``` + +--- + +## Workflows + +### Local Development - No Changes + +``` +1. Restart worker (flow code unchanged) +2. Worker calls: ensure_flow_compiled(flowShape, is_local=true) +3. Shapes match → Continue (noop) +4. Ready (test data preserved!) +``` + +**Benefit**: Fast restarts when flow unchanged! + +### Local Development - Structure Change + +``` +1. Edit flow code (add step, change deps, etc.) +2. Restart worker +3. Worker calls: ensure_flow_compiled(flowShape, is_local=true) +4. Shapes mismatch → DROPS everything, recreates +5. Ready (fresh state) +``` + +**Trade-off**: Loses test data on structure change. Acceptable for Phase 1. + +### Production (New Flow) + +``` +1. Deploy worker +2. Worker calls: ensure_flow_compiled(flowShape, is_local=false) +3. Flow doesn't exist → Creates +4. Ready +``` + +### Production (Existing Flow - No Changes) + +``` +1. Redeploy worker (flow code unchanged) +2. Worker calls: ensure_flow_compiled(flowShape, is_local=false) +3. Shapes match → Continue (noop) +4. Ready +``` + +**Benefit**: Safe redeployments don't require migrations! + +### Production (Existing Flow - ANY Change) + +``` +1. Deploy updated worker (structure or options changed) +2. Worker calls: ensure_flow_compiled(flowShape, is_local=false) +3. Shapes mismatch → FAILS +4. Developer generates migration: pgflow compile +5. Deploy migration: supabase db push +6. Redeploy worker +7. Ready +``` + +**Trade-off**: Even option-only changes require migration. Phase 2 will improve this. + +--- + +## Implementation Checklist + +### SQL + +- [ ] `pgflow.compare_flow_shapes(flow_id, flow_shape)` - structure comparison +- [ ] `pgflow.ensure_flow_compiled(flow_shape, is_local)` - main function with shape comparison +- [ ] `pgflow.drop_flow_data(slug)` - cleanup with pgmq.list_queues +- [ ] `pgflow.create_flow_from_shape(flow_shape)` - parse JSON, create +- [ ] `pgflow.create_step_from_json(flow_id, step_json)` - step creation helper +- [ ] Update queue creation to be idempotent (IF NOT EXISTS check) + +### TypeScript + +- [ ] `serializeFlow(flow)` - convert Flow to JSON with sorted dependencies +- [ ] Environment detection (Supabase env vars + DB URL warning) +- [ ] Worker integration in `EdgeWorker.acknowledgeStart()` +- [ ] HTTP `/compile` endpoint with staleness check +- [ ] Update `compileFlow()` to include `step_index` parameter + +### CLI + +- [ ] `pgflow compile ` command +- [ ] Worker name inference (basename + "-worker") +- [ ] URL discovery via `supabase status` +- [ ] Migration file generation + +--- + +## What Phase 2 Will Add + +Phase 2 enhancements (not in Phase 1): + +- Update runtime options without dropping data (preserve test data!) +- Production option-only updates (no migration needed!) +- Optional strict mode (PGFLOW_REQUIRE_MIGRATIONS env var) + +**Phase 1 is complete without these.** Ship Phase 1, iterate to Phase 2. + +--- + +## Success Criteria + +Phase 1 is successful when: + +- ✅ Local dev: Edit → Restart → Ready (no manual pgflow compile) +- ✅ Production: Explicit migrations enforced (fails if flow exists) +- ✅ Multi-worker: No race conditions (advisory locks work) +- ✅ HTTP compile: CLI can generate migrations +- ✅ Queue cleanup: No orphaned queues +- ✅ Environment detection: Reliable, warns on unexpected configs + +**Ship Phase 1, then enhance with Phase 2!** diff --git a/PLAN_phase2.md b/PLAN_phase2.md new file mode 100644 index 000000000..5028ee7b1 --- /dev/null +++ b/PLAN_phase2.md @@ -0,0 +1,444 @@ +# Auto-Compilation Phase 2: Option Updates + +## Goal + +Add intelligent option updates to preserve data and enable production option changes without migrations. + +--- + +## Core Principle + +**Smart Updates over Drop+Recreate**: When flow structure matches, update options only. Preserve all data, queues, and runs. + +--- + +## Behavior Matrix (Phase 2 vs Phase 1) + +| Environment | Flow Exists? | Shapes Match? | Phase 1 Action | Phase 2 Action | Improvement | +| ----------- | ------------ | ------------- | --------------- | ------------------ | ------------------- | +| Local | No | N/A | Create fresh | Create fresh | Same | +| Local | Yes | **Match** | Continue (noop) | **Update options** | ✅ Options updated! | +| Local | Yes | **Mismatch** | Drop → Recreate | Drop → Recreate | Same | +| Production | No | N/A | Create fresh | Create fresh | Same | +| Production | Yes | **Match** | Continue (noop) | **Update options** | ✅ No migration! | +| Production | Yes | **Mismatch** | RAISE EXCEPTION | RAISE EXCEPTION | Same | + +**Key Enhancement**: When shapes match, Phase 2 updates options instead of no-op. This preserves data while applying config changes! + +--- + +## Architecture + +### Worker Code (Same as Phase 1) + +```typescript +const flowShape = serializeFlow(flow); // Convert to JSON +await db.transaction(async (tx) => { + await tx.execute('SELECT pgflow.ensure_flow_compiled($1, $2)', [ + flowShape, + isLocal, + ]); +}); +``` + +### SQL Function (Enhanced) + +```sql +pgflow.ensure_flow_compiled(flow_shape jsonb, is_local bool) RETURNS jsonb +``` + +**Logic**: + +1. Acquire advisory lock: `pg_advisory_xact_lock(hashtext('pgflow:' || slug))` +2. Check: `SELECT id FROM flows WHERE slug = ...` +3. Decision: + - NOT exists → Create fresh (both envs) + - Exists → Compare shapes + - Match + is_local → Update options, preserve data + - Match + !is_local → Update options, preserve data + - Mismatch + is_local → Drop all, recreate + - Mismatch + !is_local → RAISE EXCEPTION + +**Shape comparison in Phase 1!** + +--- + +## Option Updates (Phase 2 Addition) + +### Update Flow Options + +```sql +CREATE FUNCTION pgflow.update_flow_options( + v_flow_id bigint, + flow_shape jsonb +) RETURNS void AS $$ +BEGIN + UPDATE pgflow.flows + SET + max_attempts = COALESCE((flow_shape->'options'->>'maxAttempts')::int, max_attempts), + base_delay = COALESCE((flow_shape->'options'->>'baseDelay')::int, base_delay), + timeout = COALESCE((flow_shape->'options'->>'timeout')::int, timeout) + WHERE id = v_flow_id; +END; +$$ LANGUAGE plpgsql; +``` + +### Update Step Options + +```sql +CREATE FUNCTION pgflow.update_step_options( + v_flow_id bigint, + flow_shape jsonb +) RETURNS void AS $$ +DECLARE + step_json jsonb; + v_step_slug text; +BEGIN + FOR step_json IN SELECT * FROM jsonb_array_elements(flow_shape->'steps') + LOOP + v_step_slug := step_json->>'slug'; + + UPDATE pgflow.steps + SET + max_attempts = (step_json->'options'->>'maxAttempts')::int, + base_delay = (step_json->'options'->>'baseDelay')::int, + timeout = (step_json->'options'->>'timeout')::int, + start_delay = (step_json->'options'->>'startDelay')::int + WHERE flow_id = v_flow_id AND slug = v_step_slug; + END LOOP; +END; +$$ LANGUAGE plpgsql; +``` + +**Preserves**: All data, queues, runs, step_states. + +--- + +## Enhanced Main Function + +### pgflow.ensure_flow_compiled (Phase 2 Version) + +```sql +CREATE FUNCTION pgflow.ensure_flow_compiled(flow_shape jsonb, is_local bool) +RETURNS jsonb AS $$ +DECLARE + v_slug text; + v_flow_id bigint; + lock_key bigint; + shapes_match boolean; + require_migrations boolean; +BEGIN + v_slug := flow_shape->>'slug'; + lock_key := hashtext('pgflow:' || v_slug); + require_migrations := COALESCE(current_setting('pgflow.require_migrations', true)::boolean, false); + + -- Advisory lock + PERFORM pg_advisory_xact_lock(lock_key); + + -- Check existence + SELECT id INTO v_flow_id FROM pgflow.flows WHERE slug = v_slug; + + IF v_flow_id IS NULL THEN + -- New flow + IF require_migrations AND NOT is_local THEN + -- Strict mode: even new flows require migration + RAISE EXCEPTION 'Flow ''%'' does not exist. Deploy migration first (PGFLOW_REQUIRE_MIGRATIONS=true).', v_slug; + END IF; + + PERFORM pgflow.create_flow_from_shape(flow_shape); + RETURN jsonb_build_object('status', 'created'); + ELSE + -- Flow exists: compare shapes + shapes_match := pgflow.compare_flow_shapes(v_flow_id, flow_shape); + + IF shapes_match THEN + -- Structure matches: update options only + PERFORM pgflow.update_flow_options(v_flow_id, flow_shape); + PERFORM pgflow.update_step_options(v_flow_id, flow_shape); + RETURN jsonb_build_object('status', 'options_updated'); + ELSE + -- Structure mismatch + IF is_local THEN + -- Local: drop and recreate + PERFORM pgflow.drop_flow_data(v_slug); + PERFORM pgflow.create_flow_from_shape(flow_shape); + RETURN jsonb_build_object('status', 'recreated'); + ELSE + -- Production: fail with details + RAISE EXCEPTION 'Flow ''%'' structure mismatch. Deploy migration first.', v_slug; + END IF; + END IF; + END IF; +END; +$$ LANGUAGE plpgsql; +``` + +**Key Changes**: + +- Shape comparison determines path +- Options updates preserve data +- Strict mode check for new flows +- Same advisory locking as Phase 1 + +--- + +## Strict Mode (PGFLOW_REQUIRE_MIGRATIONS) + +### Configuration + +```sql +-- Set via PostgreSQL config +ALTER DATABASE postgres SET pgflow.require_migrations = 'true'; + +-- Or per-transaction +SET pgflow.require_migrations = 'true'; +``` + +### Behavior + +When `pgflow.require_migrations = true`: + +- **New flows in production**: RAISE EXCEPTION (must deploy migration first) +- **New flows in local**: Still auto-create (development convenience) +- **Existing flows**: Same behavior as without strict mode + +**Use Case**: Organizations requiring audit trail for ALL flow definitions. + +--- + +## Environment Detection + +**Same as Phase 1** - no changes needed. + +```typescript +const isLocal = !Boolean( + Deno.env.get('DENO_DEPLOYMENT_ID') || Deno.env.get('SB_REGION') +); + +// DB URL validation (warnings only) +const dbUrl = Deno.env.get('EDGE_WORKER_DB_URL') || ''; +const isSupabaseLocal = + dbUrl.includes('localhost:54322') || + dbUrl.includes('127.0.0.1:54322') || + dbUrl.includes('localhost:54321'); + +if (isLocal && dbUrl && !isSupabaseLocal) { + console.warn( + "[pgflow] DB URL doesn't match Supabase local pattern. Verify configuration!" + ); +} + +if (!isLocal && isSupabaseLocal) { + console.warn( + '[pgflow] Production env vars but local DB URL. Configuration mismatch?' + ); +} +``` + +--- + +## Queue Management + +**Same as Phase 1** - no changes needed. + +```sql +-- Cleanup (drop_flow_data) +FOR step_record IN SELECT slug FROM pgflow.steps WHERE flow_id = v_flow_id LOOP + IF EXISTS(SELECT 1 FROM pgmq.list_queues() WHERE queue_name = step_record.slug) THEN + PERFORM pgmq.drop_queue(step_record.slug); + END IF; +END LOOP; + +-- Creation (Idempotent) +IF NOT EXISTS(SELECT 1 FROM pgmq.list_queues() WHERE queue_name = step_slug) THEN + PERFORM pgmq.create_queue(step_slug); +END IF; +``` + +--- + +## What Phase 2 Adds Over Phase 1 + +### New Capabilities + +- ✅ Option update functions (flow and step options) +- ✅ Preserve data when only options change (no drops!) +- ✅ Update production runtime options (no migration needed!) +- ✅ Strict mode for compliance (PGFLOW_REQUIRE_MIGRATIONS) + +### Behavioral Improvements + +- ✅ Local dev: Change timeout → Just update (data preserved!) +- ✅ Production: Change timeout → Just update (no migration!) +- ✅ Production: Add step → Still requires migration (safe!) +- ✅ Better DX: Fewer unnecessary drops, faster iteration + +### Backward Compatibility + +- ✅ Worker code unchanged (same serializeFlow call) +- ✅ HTTP /compile endpoint unchanged +- ✅ CLI command unchanged +- ✅ Migration format unchanged + +**Phase 2 is drop-in enhancement!** + +--- + +## Workflows + +### Local Development - Option Change (NEW!) + +``` +1. Edit flow code (change timeout: 30 → 60) +2. Restart worker +3. Worker calls: ensure_flow_compiled(flowShape, is_local=true) + - Shapes match: UPDATES options only + - Data preserved: runs, step_states, queues intact +4. Ready (same test data, updated config) +``` + +**Benefit**: No more losing test data for option tweaks! + +### Local Development - Structure Change + +``` +1. Edit flow code (add new step) +2. Restart worker +3. Worker calls: ensure_flow_compiled(flowShape, is_local=true) + - Shapes mismatch: DROPS everything, recreates +4. Ready (fresh state, new structure) +``` + +**Same as Phase 1** when structure changes. + +### Production - Option Change (NEW!) + +``` +1. Deploy updated worker (timeout changed) +2. Worker calls: ensure_flow_compiled(flowShape, is_local=false) +3. Shapes match → Updates options +4. Ready (no migration needed!) +``` + +**Benefit**: Fast deploys for config changes! + +### Production - Structure Change + +``` +1. Deploy updated worker (new step added) +2. Worker calls: ensure_flow_compiled(flowShape, is_local=false) +3. Shapes mismatch → FAILS +4. Developer generates migration: pgflow compile +5. Deploy migration: supabase db push +6. Redeploy worker +7. Ready +``` + +**Same as Phase 1** when structure changes (safe!). + +--- + +## Implementation Checklist + +### SQL (New Functions) + +- [ ] `pgflow.update_flow_options(flow_id, flow_shape)` - flow option updates +- [ ] `pgflow.update_step_options(flow_id, flow_shape)` - step option updates +- [ ] Update `pgflow.ensure_flow_compiled()` to call update functions when shapes match +- [ ] Add `pgflow.require_migrations` config parameter support + +### SQL (Phase 1 Functions - Already Implemented) + +- [x] `pgflow.compare_flow_shapes(flow_id, flow_shape)` - use existing from Phase 1 +- [x] `pgflow.drop_flow_data(slug)` - no changes needed +- [x] `pgflow.create_flow_from_shape(flow_shape)` - no changes needed +- [x] `pgflow.create_step_from_json(flow_id, step_json)` - no changes needed + +### TypeScript (No Changes) + +- [x] `serializeFlow(flow)` - same as Phase 1 +- [x] Environment detection - same as Phase 1 +- [x] Worker integration - same as Phase 1 +- [x] HTTP `/compile` endpoint - same as Phase 1 + +### Testing + +- [ ] Test option updates (flow and step) +- [ ] Test strict mode (PGFLOW_REQUIRE_MIGRATIONS) +- [ ] Test backward compatibility with Phase 1 +- [ ] Test local data preservation on option changes + +--- + +## Success Criteria + +Phase 2 is successful when: + +- ✅ Local dev: Option changes preserve test data +- ✅ Production: Option changes deploy without migration +- ✅ Production: Structure changes still require migration (safety maintained) +- ✅ Strict mode: Organizations can require migrations for all changes +- ✅ Backward compatible: Drop-in replacement for Phase 1 +- ✅ Same advisory lock protection (no race conditions) +- ✅ Option updates are accurate and complete + +**Phase 2 builds on Phase 1's shape comparison foundation!** + +--- + +## Migration from Phase 1 + +### Database Migration + +```sql +-- Add new functions (Phase 2) +CREATE FUNCTION pgflow.update_flow_options(...) ...; +CREATE FUNCTION pgflow.update_step_options(...) ...; + +-- Update main function to call update functions when shapes match +CREATE OR REPLACE FUNCTION pgflow.ensure_flow_compiled(...) ...; + +-- compare_flow_shapes already exists from Phase 1! +``` + +### No Code Changes Required + +- Worker code: Same serializeFlow() call +- CLI: Same pgflow compile command +- HTTP endpoint: Same /compile response + +### Rollback Plan + +```sql +-- Restore Phase 1 version +CREATE OR REPLACE FUNCTION pgflow.ensure_flow_compiled(...) +... -- Phase 1 implementation +``` + +**Zero breaking changes!** + +--- + +## Trade-offs Analysis + +### What Phase 2 Adds + +- ✅ Smarter local development (data preservation) +- ✅ Faster production deploys (option updates) +- ✅ Better developer experience +- ✅ Compliance mode (strict migrations) + +### What Phase 2 Costs + +- ⚠️ More complex SQL logic (shape comparison) +- ⚠️ More test surface area +- ⚠️ Slightly higher cognitive load + +### When to Use Phase 2 + +- ✅ After Phase 1 is stable and validated +- ✅ When teams iterate frequently on flow options +- ✅ When preserving local test data is valuable +- ✅ When compliance requires strict migration control + +**Decision**: Phase 2 is enhancement, not replacement. Phase 1 is complete and shippable without Phase 2. diff --git a/PLAN_tests.md b/PLAN_tests.md new file mode 100644 index 000000000..1f7d4c3de --- /dev/null +++ b/PLAN_tests.md @@ -0,0 +1,1042 @@ +# Auto-Compilation Testing Strategy + +> Comprehensive testing plan for Phase 1 auto-compilation feature + +--- + +## Executive Summary + +The auto-compilation feature requires **two distinct test layers**: + +1. **Integration Tests** (fast, ~2-3 min) - Test logic and SQL functions directly +2. **E2E Tests** (slower, ~5-10 min) - Test real edge function deployment with HTTP + +**Why both?** Integration tests catch logic bugs quickly during development. E2E tests catch deployment issues that only appear in real Supabase Edge Runtime. + +**Key Decision**: E2E tests run in separate CI job before deployment to provide layered confidence without slowing every commit. + +--- + +## Test Layer 1: Integration Tests + +### Purpose +- Fast feedback during TDD +- Test compilation logic in isolation +- Test SQL functions thoroughly +- Test worker behavior without HTTP overhead + +### What Integration Tests Cover + +#### 1. Environment Detection +**File**: `pkgs/edge-worker/tests/unit/platform-adapter.test.ts` + +```typescript +Deno.test('detects local environment (no DENO_REGION)', () => { + const adapter = new SupabasePlatformAdapter(() => ({ + ...testEnv, + // No DENO_REGION = local + })); + assertEquals(adapter.environment, 'local'); +}); + +Deno.test('detects production environment (DENO_REGION present)', () => { + const adapter = new SupabasePlatformAdapter(() => ({ + ...testEnv, + DENO_REGION: 'us-east-1', + })); + assertEquals(adapter.environment, 'production'); +}); +``` + +**Coverage**: +- Environment detection logic +- Warning message generation +- Constructor parameter injection (for testing) + +#### 2. Flow Serialization +**File**: `pkgs/dsl/tests/unit/serialize-flow.test.ts` + +```typescript +Deno.test('serializes flow with correct step order', () => { + const flow = new Flow({ slug: 'test' }) + .step({ slug: 'step1' }, () => 'a') + .step({ slug: 'step2', dependsOn: ['step1'] }, () => 'b'); + + const shape = serializeFlow(flow); + + assertEquals(shape.steps[0].stepIndex, 0); + assertEquals(shape.steps[1].stepIndex, 1); + assertEquals(shape.steps[1].dependencies, ['step1']); +}); + +Deno.test('sorts dependencies deterministically', () => { + const flow = new Flow({ slug: 'test' }) + .step({ slug: 'a' }, () => 'a') + .step({ slug: 'b' }, () => 'b') + .step({ slug: 'c', dependsOn: ['b', 'a'] }, () => 'c'); + + const shape = serializeFlow(flow); + + // Dependencies sorted by stepIndex (a=0, b=1), then alphabetically + assertEquals(shape.steps[2].dependencies, ['a', 'b']); +}); +``` + +**Coverage**: +- Step index calculation from array position +- Dependency sorting (by stepIndex, then slug) +- Option serialization +- stepType handling (single vs map) + +#### 3. SQL Functions +**File**: `pkgs/core/tests/pgtap/auto-compilation.test.sql` + +```sql +-- Test compare_flow_shapes +SELECT plan(20); + +-- Setup: Create flow with 2 steps +SELECT pgflow.create_flow('test_flow'); +SELECT pgflow.add_step('test_flow', 'step1'); +SELECT pgflow.add_step('test_flow', 'step2', deps_slugs => ARRAY['step1']); + +-- Test: Matching shape returns true +SELECT ok( + pgflow.compare_flow_shapes( + (SELECT id FROM pgflow.flows WHERE slug = 'test_flow'), + '{"slug": "test_flow", "steps": [ + {"slug": "step1", "stepIndex": 0, "stepType": "single", "dependencies": []}, + {"slug": "step2", "stepIndex": 1, "stepType": "single", "dependencies": ["step1"]} + ]}'::jsonb + ), + 'Matching structure returns true' +); + +-- Test: Mismatching shape returns false (different dependencies) +SELECT ok( + NOT pgflow.compare_flow_shapes( + (SELECT id FROM pgflow.flows WHERE slug = 'test_flow'), + '{"slug": "test_flow", "steps": [ + {"slug": "step1", "stepIndex": 0, "stepType": "single", "dependencies": []}, + {"slug": "step2", "stepIndex": 1, "stepType": "single", "dependencies": []} + ]}'::jsonb + ), + 'Different dependencies returns false' +); + +-- Test: Options changes do not affect comparison +SELECT ok( + pgflow.compare_flow_shapes( + (SELECT id FROM pgflow.flows WHERE slug = 'test_flow'), + '{"slug": "test_flow", "options": {"maxAttempts": 999}, "steps": [ + {"slug": "step1", "stepIndex": 0, "stepType": "single", "dependencies": [], "options": {"timeout": 999}}, + {"slug": "step2", "stepIndex": 1, "stepType": "single", "dependencies": ["step1"]} + ]}'::jsonb + ), + 'Options changes do not affect shape comparison' +); + +SELECT finish(); +``` + +**Coverage**: +- `compare_flow_shapes()` - structural comparison logic +- `ensure_flow_compiled()` - decision tree (create/noop/recreate/fail) +- `drop_flow_data()` - queue cleanup + FK cascade +- `create_flow_from_shape()` - JSON parsing + creation +- `create_step_from_json()` - step + dependency creation + +#### 4. Worker Lifecycle Integration +**File**: `pkgs/edge-worker/tests/integration/auto-compilation.test.ts` + +```typescript +Deno.test('local - creates new flow on worker startup', async () => { + await withPgNoTransaction(async (sql) => { + // Create flow definition + const flow = new Flow({ slug: 'test_flow' }) + .step({ slug: 'step1' }, async () => ({ result: 'hello' })); + + // Create local adapter + const adapter = new SupabasePlatformAdapter(() => ({ + ...testEnv, + // No DENO_REGION = local + })); + + // Create and start worker (bypasses HTTP) + const worker = createFlowWorker( + flow, + { sql, maxConcurrent: 1 }, + createLogger, + adapter + ); + + worker.startOnlyOnce({ + edgeFunctionName: 'test-worker', + workerId: crypto.randomUUID(), + }); + + // Verify auto-compilation created flow + const flows = await sql` + SELECT * FROM pgflow.flows WHERE slug = 'test_flow' + `; + assertEquals(flows.length, 1); + + const steps = await sql` + SELECT * FROM pgflow.steps WHERE flow_slug = 'test_flow' + `; + assertEquals(steps.length, 1); + assertEquals(steps[0].step_slug, 'step1'); + + await worker.stop(); + }); +}); + +Deno.test('local - noop when shapes match', async () => { + await withPgNoTransaction(async (sql) => { + // Pre-create flow + await sql`SELECT pgflow.create_flow('test_flow')`; + await sql`SELECT pgflow.add_step('test_flow', 'step1')`; + + const initialCreatedAt = await sql` + SELECT created_at FROM pgflow.flows WHERE slug = 'test_flow' + `; + + // Start worker with SAME shape + const flow = new Flow({ slug: 'test_flow' }) + .step({ slug: 'step1' }, async () => ({ result: 'hello' })); + + const worker = createFlowWorker(flow, { sql }, createLogger, localAdapter); + worker.startOnlyOnce({ edgeFunctionName: 'test', workerId: '123' }); + + // Verify no changes (created_at unchanged) + const finalCreatedAt = await sql` + SELECT created_at FROM pgflow.flows WHERE slug = 'test_flow' + `; + assertEquals(initialCreatedAt[0].created_at, finalCreatedAt[0].created_at); + + await worker.stop(); + }); +}); + +Deno.test('local - drop+recreate when shapes mismatch', async () => { + await withPgNoTransaction(async (sql) => { + // Pre-create flow with 1 step + test data + await sql`SELECT pgflow.create_flow('test_flow')`; + await sql`SELECT pgflow.add_step('test_flow', 'step1')`; + await sql`INSERT INTO pgflow.runs (flow_slug, status) VALUES ('test_flow', 'running')`; + + // Start worker with 2 steps (mismatch!) + const flow = new Flow({ slug: 'test_flow' }) + .step({ slug: 'step1' }, async () => 'a') + .step({ slug: 'step2', dependsOn: ['step1'] }, async () => 'b'); + + const worker = createFlowWorker(flow, { sql }, createLogger, localAdapter); + worker.startOnlyOnce({ edgeFunctionName: 'test', workerId: '123' }); + + // Verify recreated with 2 steps + const steps = await sql` + SELECT * FROM pgflow.steps WHERE flow_slug = 'test_flow' ORDER BY step_index + `; + assertEquals(steps.length, 2); + assertEquals(steps[0].step_slug, 'step1'); + assertEquals(steps[1].step_slug, 'step2'); + + // Verify test data was dropped + const runs = await sql` + SELECT * FROM pgflow.runs WHERE flow_slug = 'test_flow' + `; + assertEquals(runs.length, 0); + + await worker.stop(); + }); +}); + +Deno.test('production - throws on shape mismatch', async () => { + await withPgNoTransaction(async (sql) => { + // Pre-create flow + await sql`SELECT pgflow.create_flow('test_flow')`; + await sql`SELECT pgflow.add_step('test_flow', 'step1')`; + + // Create production adapter + const prodAdapter = new SupabasePlatformAdapter(() => ({ + ...testEnv, + DENO_REGION: 'us-east-1', // Production! + })); + + // Different shape + const flow = new Flow({ slug: 'test_flow' }) + .step({ slug: 'step1' }, async () => 'a') + .step({ slug: 'step2' }, async () => 'b'); + + const worker = createFlowWorker(flow, { sql }, createLogger, prodAdapter); + + // Should throw exception + await assertRejects( + () => worker.startOnlyOnce({ edgeFunctionName: 'test', workerId: '123' }), + Error, + /structure mismatch.*Deploy migration first/ + ); + }); +}); +``` + +**Coverage**: +- All behavior matrix scenarios (6 scenarios) +- Worker lifecycle integration +- Database state verification +- Error message validation +- Advisory lock behavior (via multiple parallel tests) + +### Integration Test Benefits + +✅ **Fast** - No HTTP server, no Docker startup, runs in 2-3 minutes +✅ **Reliable** - No process management, no timing issues +✅ **Debuggable** - Stack traces connect directly to source +✅ **Isolatable** - Each test creates fresh worker instance +✅ **Parallelizable** - Tests can run concurrently + +### Integration Test Limitations + +❌ **Skips EdgeWorker.start()** - Uses createFlowWorker() directly +❌ **No HTTP layer** - Bypasses Deno.serve() and HTTP request handling +❌ **No function file structure** - Doesn't test supabase/functions/ layout +❌ **No real restarts** - Can't test stopping functions serve and restarting +❌ **No Edge Runtime quirks** - Misses Supabase-specific behaviors + +--- + +## Test Layer 2: E2E Tests + +### Purpose +- Validate real deployment scenario +- Test HTTP trigger flow end-to-end +- Catch Supabase Edge Runtime integration issues +- Test actual function restart workflow + +### What E2E Tests Cover + +#### Critical Scenarios Integration Tests MISS + +1. **EdgeWorker.start() Entry Point** + - Integration tests use `createFlowWorker()` directly + - E2E tests call `EdgeWorker.start()` from actual function file + - Catches bugs in platform adapter creation + - Validates static method behavior + +2. **HTTP Trigger Flow** + - First HTTP POST initializes worker + - SupabasePlatformAdapter.setupStartupHandler() behavior + - Deno.serve() integration + - Response handling ("ok" status) + +3. **Supabase Edge Runtime Environment** + - Module resolution (import maps, Deno runtime) + - Permission model + - Environment variable handling + - Edge Runtime Docker container behavior + +4. **Real Function File Structure** + - `supabase/functions/*/index.ts` layout + - Import paths and resolution + - Export patterns + - Function naming conventions + +5. **Actual Restart Scenario** + - Stop `supabase functions serve` + - Update function code files + - Restart `supabase functions serve` + - Verify new code is loaded + +6. **Race Conditions** + - Multiple HTTP requests during worker initialization + - Advisory lock behavior under real load + - Timing between HTTP request and database update + +### E2E Test Infrastructure + +#### Component 1: FunctionsServer Manager +**File**: `pkgs/edge-worker/tests/e2e-auto-compilation/_infrastructure.ts` + +```typescript +export class FunctionsServer { + private process: Deno.ChildProcess | null = null; + private functionsDir: string; + + constructor(functionsDir: string) { + this.functionsDir = functionsDir; + } + + async start(): Promise { + this.process = new Deno.Command('supabase', { + args: ['functions', 'serve', '--env-file', 'test.env', '--no-verify-jwt'], + cwd: this.functionsDir, + stdout: 'piped', + stderr: 'piped', + }).spawn(); + + // Wait for "Serving functions on http://localhost:54321" + await this.waitForReady(); + } + + async stop(): Promise { + if (this.process) { + this.process.kill('SIGTERM'); + await this.process.status; + this.process = null; + } + } + + async restart(): Promise { + await this.stop(); + await new Promise(r => setTimeout(r, 1000)); // Let port release + await this.start(); + } + + private async waitForReady(): Promise { + const startTime = Date.now(); + const timeout = 30000; // 30 seconds + + while (Date.now() - startTime < timeout) { + try { + // Try to hit any function endpoint + const response = await fetch('http://localhost:54321/functions/v1/_health', { + method: 'GET', + }); + // If we get a response (even 404), server is ready + return; + } catch { + await new Promise(r => setTimeout(r, 500)); + } + } + + throw new Error('Functions server did not start within 30 seconds'); + } +} +``` + +#### Component 2: Function Code Generator +**File**: `pkgs/edge-worker/tests/e2e-auto-compilation/_function-generator.ts` + +```typescript +export interface FunctionGeneratorOptions { + functionName: string; + flowCode: string; + simulateProduction?: boolean; + importPath?: string; +} + +export async function generateTestFunction( + options: FunctionGeneratorOptions +): Promise { + const { functionName, flowCode, simulateProduction = false, importPath } = options; + + const functionDir = `./test-functions/${functionName}`; + await Deno.mkdir(functionDir, { recursive: true }); + + // Generate import map + const importMapContent = { + imports: { + '@pgflow/edge-worker': importPath || '../../dist/index.js', + '@pgflow/dsl': '../../../dsl/dist/index.js', + } + }; + await Deno.writeTextFile( + `${functionDir}/import_map.json`, + JSON.stringify(importMapContent, null, 2) + ); + + // Generate function code + const code = ` +${simulateProduction ? "Deno.env.set('DENO_REGION', 'test-us-east-1');" : ''} + +import { EdgeWorker } from '@pgflow/edge-worker'; +import { Flow } from '@pgflow/dsl'; + +${flowCode} + +EdgeWorker.start(TestFlow); +`; + + await Deno.writeTextFile(`${functionDir}/index.ts`, code); + + return functionDir; +} + +export async function cleanupTestFunction(functionName: string): Promise { + const functionDir = `./test-functions/${functionName}`; + try { + await Deno.remove(functionDir, { recursive: true }); + } catch { + // Ignore errors + } +} +``` + +#### Component 3: HTTP Test Helpers +**File**: `pkgs/edge-worker/tests/e2e-auto-compilation/_helpers.ts` + +```typescript +export async function triggerWorker( + functionName: string, + payload?: unknown +): Promise { + return await fetch(`http://localhost:54321/functions/v1/${functionName}`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: payload ? JSON.stringify(payload) : undefined, + }); +} + +export async function waitForFlowInDatabase( + sql: postgres.Sql, + flowSlug: string, + timeoutMs: number = 5000 +): Promise { + const startTime = Date.now(); + + while (Date.now() - startTime < timeoutMs) { + const flows = await sql` + SELECT * FROM pgflow.flows WHERE slug = ${flowSlug} + `; + + if (flows.length > 0) { + return; + } + + await new Promise(r => setTimeout(r, 200)); + } + + throw new Error(`Flow '${flowSlug}' not found in database after ${timeoutMs}ms`); +} +``` + +### E2E Test Examples + +#### Test 1: Local Mode - New Flow +**File**: `pkgs/edge-worker/tests/e2e-auto-compilation/local-new-flow.test.ts` + +```typescript +import { FunctionsServer } from './_infrastructure.ts'; +import { generateTestFunction, cleanupTestFunction } from './_function-generator.ts'; +import { triggerWorker, waitForFlowInDatabase } from './_helpers.ts'; + +Deno.test('E2E: Local mode auto-creates new flow on first HTTP request', async () => { + const functionName = 'test-worker-new-flow'; + const flowSlug = 'e2e_new_flow'; + const server = new FunctionsServer('./test-functions'); + const sql = createSql(); + + try { + // 1. Generate test function + const flowCode = ` + const TestFlow = new Flow({ slug: '${flowSlug}' }) + .step({ slug: 'step1' }, async () => ({ result: 'hello' })); + `; + + await generateTestFunction({ + functionName, + flowCode, + simulateProduction: false, + }); + + // 2. Start functions server + await server.start(); + + // 3. Trigger worker via HTTP + const response = await triggerWorker(functionName); + + // 4. Verify HTTP response + assertEquals(response.status, 200); + assertEquals(await response.text(), 'ok'); + + // 5. Wait for flow to appear in database + await waitForFlowInDatabase(sql, flowSlug); + + // 6. Verify flow structure + const flows = await sql` + SELECT * FROM pgflow.flows WHERE slug = ${flowSlug} + `; + assertEquals(flows.length, 1); + + const steps = await sql` + SELECT * FROM pgflow.steps WHERE flow_slug = ${flowSlug} + `; + assertEquals(steps.length, 1); + assertEquals(steps[0].step_slug, 'step1'); + + } finally { + // Cleanup + await server.stop(); + await sql`DELETE FROM pgflow.flows WHERE slug = ${flowSlug}`; + await sql.end(); + await cleanupTestFunction(functionName); + } +}); +``` + +#### Test 2: Restart with Code Changes +**File**: `pkgs/edge-worker/tests/e2e-auto-compilation/restart-with-changes.test.ts` + +```typescript +Deno.test('E2E: Restart with changed flow code updates database (local mode)', async () => { + const functionName = 'test-worker-restart'; + const flowSlug = 'e2e_restart_flow'; + const server = new FunctionsServer('./test-functions'); + const sql = createSql(); + + try { + // 1. Generate initial function (1 step) + const flowV1 = ` + const TestFlow = new Flow({ slug: '${flowSlug}' }) + .step({ slug: 'step1' }, async () => ({ result: 'v1' })); + `; + await generateTestFunction({ functionName, flowCode: flowV1 }); + + // 2. Start server and trigger worker + await server.start(); + await triggerWorker(functionName); + await waitForFlowInDatabase(sql, flowSlug); + + // 3. Verify initial state (1 step) + let steps = await sql` + SELECT * FROM pgflow.steps + WHERE flow_slug = ${flowSlug} + ORDER BY step_index + `; + assertEquals(steps.length, 1); + assertEquals(steps[0].step_slug, 'step1'); + + // 4. Update function code (add step) + const flowV2 = ` + const TestFlow = new Flow({ slug: '${flowSlug}' }) + .step({ slug: 'step1' }, async () => ({ result: 'v1' })) + .step({ slug: 'step2', dependsOn: ['step1'] }, async () => ({ result: 'v2' })); + `; + await generateTestFunction({ functionName, flowCode: flowV2 }); + + // 5. Restart server (simulates new deployment) + await server.restart(); + + // 6. Trigger worker again + await triggerWorker(functionName); + + // 7. Wait for database update (brief delay for auto-compilation) + await new Promise(r => setTimeout(r, 1000)); + + // 8. Verify flow was updated (local mode: drop+recreate) + steps = await sql` + SELECT * FROM pgflow.steps + WHERE flow_slug = ${flowSlug} + ORDER BY step_index + `; + assertEquals(steps.length, 2); + assertEquals(steps[0].step_slug, 'step1'); + assertEquals(steps[1].step_slug, 'step2'); + + } finally { + await server.stop(); + await sql`DELETE FROM pgflow.flows WHERE slug = ${flowSlug}`; + await sql.end(); + await cleanupTestFunction(functionName); + } +}); +``` + +#### Test 3: Production Mode Rejection +**File**: `pkgs/edge-worker/tests/e2e-auto-compilation/prod-rejects-mismatch.test.ts` + +```typescript +Deno.test('E2E: Production mode rejects shape mismatch', async () => { + const functionName = 'test-worker-prod'; + const flowSlug = 'e2e_prod_flow'; + const server = new FunctionsServer('./test-functions'); + const sql = createSql(); + + try { + // 1. Pre-create flow with structure A + await sql`SELECT pgflow.create_flow(${flowSlug})`; + await sql`SELECT pgflow.add_step(${flowSlug}, 'step1')`; + + // 2. Generate function with structure B + production flag + const flowCode = ` + const TestFlow = new Flow({ slug: '${flowSlug}' }) + .step({ slug: 'step1' }, async () => 'a') + .step({ slug: 'step2' }, async () => 'b'); // Different structure! + `; + await generateTestFunction({ + functionName, + flowCode, + simulateProduction: true, // Sets DENO_REGION + }); + + // 3. Start server and trigger worker + await server.start(); + const response = await triggerWorker(functionName); + + // 4. Verify HTTP error response + // Note: SupabasePlatformAdapter should catch and return error + assertEquals(response.status, 500); + + const errorText = await response.text(); + assert(errorText.includes('structure mismatch')); + assert(errorText.includes('Deploy migration first')); + + // 5. Verify flow was NOT updated + const steps = await sql` + SELECT * FROM pgflow.steps WHERE flow_slug = ${flowSlug} + `; + assertEquals(steps.length, 1); // Still only 1 step + + } finally { + await server.stop(); + await sql`DELETE FROM pgflow.flows WHERE slug = ${flowSlug}`; + await sql.end(); + await cleanupTestFunction(functionName); + } +}); +``` + +### E2E Test Challenges and Solutions + +#### Challenge 1: Production Environment Simulation + +**Problem**: Locally, `DENO_REGION` is not set by Edge Runtime. + +**Solution**: Manually set `DENO_REGION` in test function code: +```typescript +Deno.env.set('DENO_REGION', 'test-us-east-1'); +``` + +**Limitation**: Not a TRUE production environment, but close enough to test rejection logic. + +#### Challenge 2: Process Lifecycle Management + +**Problem**: Starting/stopping `supabase functions serve` reliably. + +**Solution**: +- Use `Deno.Command` with proper signal handling +- Implement `waitForReady()` polling +- Always use try/finally for cleanup +- Add delays after stop to let ports release + +#### Challenge 3: Function Code Generation + +**Problem**: Tests need to write actual function files dynamically. + +**Solution**: +- Template-based generation in `_function-generator.ts` +- Consistent directory structure: `./test-functions/${functionName}/` +- Generate both `index.ts` and `import_map.json` +- Clean up after tests (even on failure) + +#### Challenge 4: Timing and Synchronization + +**Problem**: Multiple async operations need coordination. + +**Solution**: +- `waitForReady()` polls HTTP endpoint until server responds +- `waitForFlowInDatabase()` polls DB until flow appears +- Add small delays after restart for process cleanup +- Use generous timeouts (30s) for CI stability + +#### Challenge 5: Test Isolation + +**Problem**: Tests share database and function port. + +**Solution**: +- Unique flow slugs per test (`e2e_new_flow`, `e2e_restart_flow`) +- Database cleanup in finally blocks +- Directory cleanup in finally blocks +- Run tests sequentially (not in parallel) for E2E suite + +#### Challenge 6: Debugging Failures + +**Problem**: Multiple processes make debugging hard. + +**Solution**: +- Capture stdout/stderr from functions serve +- Log HTTP requests and responses +- Include database state in assertions +- Clear error messages with context + +### E2E Test Benefits + +✅ **Deployment confidence** - Tests actual deployment scenario +✅ **Catches runtime issues** - Finds Edge Runtime specific bugs +✅ **Validates HTTP flow** - Tests complete request/response cycle +✅ **Tests restarts** - Verifies actual function reload behavior +✅ **Real environment** - As close to production as possible locally + +### E2E Test Tradeoffs + +⚠️ **Slower** - 5-10 minutes vs 2-3 minutes for integration +⚠️ **More complex** - Process management, file generation, timing +⚠️ **Harder to debug** - Multiple processes, async operations +⚠️ **Resource intensive** - Docker, HTTP server, port usage +⚠️ **Not fully production** - Can't truly simulate DENO_REGION behavior + +--- + +## CI/CD Strategy + +### Layered Testing Approach + +```yaml +# .github/workflows/ci.yml +jobs: + test-fast: + name: Unit + Integration Tests + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: ./.github/actions/setup + - name: Lint, typecheck, unit/integration tests + run: pnpm nx affected -t lint typecheck test --parallel + # Fast: 2-3 minutes + + build: + name: Build Affected Projects + needs: test-fast + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: ./.github/actions/setup + - name: Build + run: pnpm nx affected -t build --parallel + # Fast: 1-2 minutes + + test-e2e: + name: E2E Tests + needs: build + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: ./.github/actions/setup + + - name: Setup Supabase CLI + uses: supabase/setup-cli@v1 + + - name: Start Supabase + run: pnpm nx supabase:start edge-worker + + - name: Run E2E tests + run: pnpm nx test:e2e edge-worker + timeout-minutes: 15 + + - name: Stop Supabase + if: always() + run: pnpm nx supabase:stop edge-worker + # Slower: 5-10 minutes + + deploy: + name: Deploy to Staging/Production + needs: test-e2e + runs-on: ubuntu-latest + # Only deploy if ALL tests pass +``` + +### Why Separate E2E Job? + +**Benefits:** +1. **Fast feedback** - Unit/integration tests complete in 2-3 min +2. **Parallel execution** - Can run on different runners +3. **Skip for docs** - E2E only runs when code changes +4. **Clear failure isolation** - Know exactly which layer failed +5. **Resource management** - E2E gets dedicated resources + +**Cost:** +- +5-10 minutes to total CI time +- More complex workflow configuration + +**Decision**: Worth it for auto-compilation (major feature requiring deployment confidence). + +### When to Skip E2E Tests + +E2E tests should be skipped for: +- Documentation-only changes +- Comment updates +- README changes +- Non-code changes + +Implementation: +```yaml +test-e2e: + needs: build + if: contains(github.event.head_commit.modified, 'pkgs/') +``` + +--- + +## Test Coverage Matrix + +### Behavior Matrix (from PLAN_phase1.md) + +| Environment | Flow Exists? | Shapes Match? | Expected Behavior | Integration | E2E | +|-------------|--------------|---------------|-------------------------|-------------|-----| +| Local | No | N/A | Create fresh | ✅ | ✅ | +| Local | Yes | Match | Continue (noop) | ✅ | ✅ | +| Local | Yes | Mismatch | Drop all → Recreate | ✅ | ✅ | +| Production | No | N/A | Create fresh | ✅ | ⚠️* | +| Production | Yes | Match | Continue (noop) | ✅ | ⚠️* | +| Production | Yes | Mismatch | RAISE EXCEPTION | ✅ | ✅ | + +*Production behavior with matching shapes can be tested in E2E by setting DENO_REGION manually. + +### Component Coverage + +| Component | Unit Tests | Integration Tests | E2E Tests | +|-----------|-----------|-------------------|-----------| +| Environment detection | ✅ | ✅ | ✅ | +| serializeFlow() | ✅ | ✅ | ✅ | +| compare_flow_shapes() | ✅ (PgTAP) | ✅ | ✅ | +| ensure_flow_compiled() | ✅ (PgTAP) | ✅ | ✅ | +| drop_flow_data() | ✅ (PgTAP) | ✅ | ✅ | +| create_flow_from_shape() | ✅ (PgTAP) | ✅ | ✅ | +| FlowWorkerLifecycle | ❌ | ✅ | ✅ | +| EdgeWorker.start() | ❌ | ❌ | ✅ | +| HTTP trigger | ❌ | ❌ | ✅ | +| Function restart | ❌ | ❌ | ✅ | + +### Critical Edge Cases + +| Edge Case | How Tested | +|-----------|------------| +| Advisory locks prevent race conditions | Integration (parallel tests) | +| Queue cleanup on drop | Integration + PgTAP | +| Multiple flows in one worker | Integration | +| Empty flow (no steps) | Integration | +| Map step flows | Integration | +| Concurrent HTTP requests during startup | E2E | +| Function code syntax errors | E2E (would fail to start) | +| Import resolution issues | E2E | +| DENO_REGION presence/absence | E2E (both modes) | + +--- + +## Implementation Phases + +### Phase 1: Core Logic + Integration Tests (~5 days) + +**Day 1-2: Core Components** +1. Environment detection in SupabasePlatformAdapter +2. serializeFlow() function +3. Unit tests for both + +**Day 3-4: SQL Functions** +1. compare_flow_shapes() +2. ensure_flow_compiled() +3. drop_flow_data() +4. create_flow_from_shape() +5. create_step_from_json() +6. PgTAP tests for all + +**Day 5: Integration** +1. Wire auto-compilation into FlowWorkerLifecycle.acknowledgeStart() +2. Integration tests for all behavior matrix scenarios +3. TestFlowFactory helper + +**Deliverable**: Fully tested auto-compilation logic, ready for manual testing. + +### Phase 2: E2E Infrastructure + Tests (~3 days) + +**Day 6: Infrastructure** +1. FunctionsServer class +2. Function generator +3. HTTP test helpers +4. Test cleanup utilities + +**Day 7-8: E2E Test Suite** +1. Local mode: new flow +2. Local mode: matching shapes (noop) +3. Local mode: shape mismatch (drop+recreate) +4. Production mode: rejection +5. Restart with code changes +6. Multiple flows in one function + +**Deliverable**: Full E2E test suite with real function deployment. + +### Phase 3: CI Integration (~1 day) + +**Day 9: GitHub Actions** +1. Update CI workflow with separate test-e2e job +2. Add Supabase setup step +3. Configure test timeouts +4. Add cleanup on failure +5. Document CI workflow in README + +**Deliverable**: Auto-compilation feature fully tested in CI before deployment. + +--- + +## Testing Philosophy + +### Test Pyramid + +``` + /\ + / \ E2E Tests (10%) + /----\ - Slow, expensive + / \ - High confidence + /--------\ - Deployment validation + / \ + /------------\ Integration Tests (30%) + /--------------\- Medium speed +/----------------\- SQL + Worker logic +|================| Unit Tests (60%) +| | - Fast, focused +| | - Pure functions +| | - Immediate feedback +``` + +### Guiding Principles + +1. **Fast Feedback First** + - Unit tests run in milliseconds + - Integration tests run in seconds + - E2E tests run in minutes + +2. **Test at the Right Level** + - Unit: Pure logic (serializeFlow, environment detection) + - Integration: SQL functions + worker behavior + - E2E: HTTP → Worker → DB complete flow + +3. **Don't Test Implementation Details** + - Test behavior, not internals + - Integration tests can bypass HTTP (that's fine!) + - E2E tests validate deployment scenario + +4. **Balance Coverage with Maintenance** + - E2E tests are expensive to maintain + - Focus E2E on critical deployment paths + - Use integration tests for detailed scenarios + +5. **Make Failures Clear** + - Descriptive test names + - Helpful assertion messages + - Log context on failure + +--- + +## Success Criteria + +Auto-compilation testing is complete when: + +- ✅ All 6 behavior matrix scenarios have integration tests +- ✅ All SQL functions have PgTAP tests with >90% coverage +- ✅ Environment detection has unit tests for all modes +- ✅ serializeFlow() has unit tests for all step patterns +- ✅ E2E tests validate HTTP → Worker → DB flow +- ✅ E2E tests validate function restart scenario +- ✅ CI runs both test layers before deployment +- ✅ CI completes in <15 minutes total +- ✅ Tests are documented and maintainable + +--- + +## References + +- **Implementation Plan**: [PLAN_phase1.md](./PLAN_phase1.md) +- **User Documentation**: [COMPILE_WORKER.md](./COMPILE_WORKER.md) +- **Phase 2 Enhancements**: [PLAN_phase2.md](./PLAN_phase2.md)