diff --git a/services/libs/tinybird/pipes/monitoring_copy_pipe_executions.pipe b/services/libs/tinybird/pipes/monitoring_copy_pipe_executions.pipe new file mode 100644 index 0000000000..3c7446c70e --- /dev/null +++ b/services/libs/tinybird/pipes/monitoring_copy_pipe_executions.pipe @@ -0,0 +1,70 @@ +DESCRIPTION > + Monitors Tinybird copy job executions and exposes their status as Prometheus gauge metrics. Consumed by datadog scraper (`dd-tinybird-copy-pipe-executions-scraper`) + Tracks the latest execution status for each copy pipe (starting today) and generates one-hot encoded metrics where each pipe gets a value of `1` for its current status (`'ok'`, `'error'`, `'cancelled'`, `'queued'`, `'working'`) and `0` for all other statuses. + **Metric:** `copy_pipes_latest_execution_status` (gauge) + **Labels:** `pipe_name`, `status` + **Note:** The `'queued'` status is virtual—it's derived by detecting the error message `"You have reached the maximum number of copy jobs"` rather than being a native Tinybird status. + This is because tinybird retries these again when possible but returns an error status. After 3 retries, if the copy pipe still gets the same limit error, we return an error state. + Similarly, `'ok'` is mapped from the native `'done'` status for datadog color-scheme convention. + +TAGS "Monitoring" + +NODE copy_pipes_latest_executions +SQL > + WITH 'You have reached the maximum number of copy jobs%' AS max_jobs_err + SELECT + JSONExtract(job_metadata, 'pipe_name', 'String') AS pipe_name, + multiIf( + status = 'done', + 'ok', + /* queued: current has max-jobs error AND the previous two do NOT */ + (error LIKE max_jobs_err) + AND (coalesce(prev_error1, '') NOT LIKE max_jobs_err) + AND (coalesce(prev_error2, '') NOT LIKE max_jobs_err), + 'queued', + /* hard error: last three consecutive runs have the same max-jobs error */ + (error LIKE max_jobs_err) + AND (coalesce(prev_error1, '') LIKE max_jobs_err) + AND (coalesce(prev_error2, '') LIKE max_jobs_err), + 'error', + /* any other error on the last execution -> error */ + (error != '' AND error NOT LIKE max_jobs_err), + 'error', + /* otherwise keep original status (e.g., running/queued by system) */ + status + ) AS status, + error, + started_at + FROM + ( + SELECT + *, + lagInFrame(error, 1, '') OVER ( + PARTITION BY pipe_id ORDER BY created_at DESC + ) AS prev_error1, + lagInFrame(error, 2, '') OVER ( + PARTITION BY pipe_id ORDER BY created_at DESC + ) AS prev_error2, + row_number() OVER (PARTITION BY pipe_id ORDER BY created_at DESC) AS rn + FROM tinybird.jobs_log + WHERE + job_type = 'copy' + AND started_at > toStartOfDay(now()) + AND JSONExtract(job_metadata, 'pipe_name', 'String') <> 'members_with_location' + ) + WHERE rn = 1 + ORDER BY pipe_id, created_at DESC + +NODE errored_copy_pipes_latest_execution +SQL > + WITH + possible_statuses AS (SELECT array('ok', 'error', 'cancelled', 'queued', 'working') AS statuses) + SELECT + 'copy_pipes_latest_execution_status' AS name, + IF(s = e.status, 1, 0) AS value, + 'Latest execution status per pipe (one-hot: 1 active, 0 otherwise)' AS help, + 'gauge' AS type, + map('pipe_name', e.pipe_name, 'status', s) AS labels + FROM copy_pipes_latest_executions AS e + CROSS JOIN possible_statuses ps ARRAY + JOIN ps.statuses AS s