|
| 1 | +-- Modify "steps" table |
| 2 | +ALTER TABLE "pgflow"."steps" ADD CONSTRAINT "opt_start_delay_is_nonnegative" CHECK ((opt_start_delay IS NULL) OR (opt_start_delay >= 0)), ADD COLUMN "opt_start_delay" integer NULL; |
| 3 | +-- Modify "start_ready_steps" function |
| 4 | +CREATE OR REPLACE FUNCTION "pgflow"."start_ready_steps" ("run_id" uuid) RETURNS void LANGUAGE sql SET "search_path" = '' AS $$ |
| 5 | +WITH ready_steps AS ( |
| 6 | + SELECT * |
| 7 | + FROM pgflow.step_states AS step_state |
| 8 | + WHERE step_state.run_id = start_ready_steps.run_id |
| 9 | + AND step_state.status = 'created' |
| 10 | + AND step_state.remaining_deps = 0 |
| 11 | + ORDER BY step_state.step_slug |
| 12 | + FOR UPDATE |
| 13 | +), |
| 14 | +started_step_states AS ( |
| 15 | + UPDATE pgflow.step_states |
| 16 | + SET status = 'started', |
| 17 | + started_at = now() |
| 18 | + FROM ready_steps |
| 19 | + WHERE pgflow.step_states.run_id = start_ready_steps.run_id |
| 20 | + AND pgflow.step_states.step_slug = ready_steps.step_slug |
| 21 | + RETURNING pgflow.step_states.* |
| 22 | +), |
| 23 | +sent_messages AS ( |
| 24 | + SELECT |
| 25 | + started_step.flow_slug, |
| 26 | + started_step.run_id, |
| 27 | + started_step.step_slug, |
| 28 | + pgmq.send( |
| 29 | + started_step.flow_slug, |
| 30 | + jsonb_build_object( |
| 31 | + 'flow_slug', started_step.flow_slug, |
| 32 | + 'run_id', started_step.run_id, |
| 33 | + 'step_slug', started_step.step_slug, |
| 34 | + 'task_index', 0 |
| 35 | + ), |
| 36 | + COALESCE(step.opt_start_delay, 0) |
| 37 | + ) AS msg_id |
| 38 | + FROM started_step_states AS started_step |
| 39 | + JOIN pgflow.steps AS step |
| 40 | + ON step.flow_slug = started_step.flow_slug |
| 41 | + AND step.step_slug = started_step.step_slug |
| 42 | +), |
| 43 | +broadcast_events AS ( |
| 44 | + SELECT |
| 45 | + realtime.send( |
| 46 | + jsonb_build_object( |
| 47 | + 'event_type', 'step:started', |
| 48 | + 'run_id', started_step.run_id, |
| 49 | + 'step_slug', started_step.step_slug, |
| 50 | + 'status', 'started', |
| 51 | + 'started_at', started_step.started_at, |
| 52 | + 'remaining_tasks', 1, |
| 53 | + 'remaining_deps', started_step.remaining_deps |
| 54 | + ), |
| 55 | + concat('step:', started_step.step_slug, ':started'), |
| 56 | + concat('pgflow:run:', started_step.run_id), |
| 57 | + false |
| 58 | + ) |
| 59 | + FROM started_step_states AS started_step |
| 60 | +) |
| 61 | +INSERT INTO pgflow.step_tasks (flow_slug, run_id, step_slug, message_id) |
| 62 | +SELECT |
| 63 | + sent_messages.flow_slug, |
| 64 | + sent_messages.run_id, |
| 65 | + sent_messages.step_slug, |
| 66 | + sent_messages.msg_id |
| 67 | +FROM sent_messages; |
| 68 | +$$; |
| 69 | +-- Create "add_step" function |
| 70 | +CREATE FUNCTION "pgflow"."add_step" ("flow_slug" text, "step_slug" text, "deps_slugs" text[], "max_attempts" integer DEFAULT NULL::integer, "base_delay" integer DEFAULT NULL::integer, "timeout" integer DEFAULT NULL::integer, "start_delay" integer DEFAULT NULL::integer) RETURNS "pgflow"."steps" LANGUAGE sql SET "search_path" = '' AS $$ |
| 71 | +WITH |
| 72 | + next_index AS ( |
| 73 | + SELECT COALESCE(MAX(step_index) + 1, 0) as idx |
| 74 | + FROM pgflow.steps |
| 75 | + WHERE flow_slug = add_step.flow_slug |
| 76 | + ), |
| 77 | + create_step AS ( |
| 78 | + INSERT INTO pgflow.steps (flow_slug, step_slug, step_index, deps_count, opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay) |
| 79 | + SELECT add_step.flow_slug, add_step.step_slug, idx, COALESCE(array_length(deps_slugs, 1), 0), max_attempts, base_delay, timeout, start_delay |
| 80 | + FROM next_index |
| 81 | + ON CONFLICT (flow_slug, step_slug) |
| 82 | + DO UPDATE SET step_slug = pgflow.steps.step_slug |
| 83 | + RETURNING * |
| 84 | + ), |
| 85 | + insert_deps AS ( |
| 86 | + INSERT INTO pgflow.deps (flow_slug, dep_slug, step_slug) |
| 87 | + SELECT add_step.flow_slug, d.dep_slug, add_step.step_slug |
| 88 | + FROM unnest(deps_slugs) AS d(dep_slug) |
| 89 | + ON CONFLICT (flow_slug, dep_slug, step_slug) DO NOTHING |
| 90 | + RETURNING 1 |
| 91 | + ) |
| 92 | +-- Return the created step |
| 93 | +SELECT * FROM create_step; |
| 94 | +$$; |
| 95 | +-- Drop "add_step" function |
| 96 | +DROP FUNCTION "pgflow"."add_step" (text, text, integer, integer, integer); |
| 97 | +-- Drop "add_step" function |
| 98 | +DROP FUNCTION "pgflow"."add_step" (text, text, text[], integer, integer, integer); |
| 99 | +-- Create "add_step" function |
| 100 | +CREATE FUNCTION "pgflow"."add_step" ("flow_slug" text, "step_slug" text, "max_attempts" integer DEFAULT NULL::integer, "base_delay" integer DEFAULT NULL::integer, "timeout" integer DEFAULT NULL::integer, "start_delay" integer DEFAULT NULL::integer) RETURNS "pgflow"."steps" LANGUAGE sql SET "search_path" = '' AS $$ |
| 101 | +-- Call the original function with an empty array |
| 102 | + SELECT * FROM pgflow.add_step(flow_slug, step_slug, ARRAY[]::text[], max_attempts, base_delay, timeout, start_delay); |
| 103 | +$$; |
0 commit comments