Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions services/libs/tinybird/pipes/monitoring_copy_pipe_executions.pipe
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
DESCRIPTION >
Monitors Tinybird copy job executions and exposes their status as Prometheus gauge metrics. Consumed by datadog scraper (`dd-tinybird-copy-pipe-executions-scraper`)
Tracks the latest execution status for each copy pipe (starting today) and generates one-hot encoded metrics where each pipe gets a value of `1` for its current status (`'ok'`, `'error'`, `'cancelled'`, `'queued'`, `'working'`) and `0` for all other statuses.
**Metric:** `copy_pipes_latest_execution_status` (gauge)
**Labels:** `pipe_name`, `status`
**Note:** The `'queued'` status is virtual—it's derived by detecting the error message `"You have reached the maximum number of copy jobs"` rather than being a native Tinybird status.
This is because tinybird retries these again when possible but returns an error status. After 3 retries, if the copy pipe still gets the same limit error, we return an error state.
Similarly, `'ok'` is mapped from the native `'done'` status for datadog color-scheme convention.

TAGS "Monitoring"

NODE copy_pipes_latest_executions
SQL >
WITH 'You have reached the maximum number of copy jobs%' AS max_jobs_err
SELECT
JSONExtract(job_metadata, 'pipe_name', 'String') AS pipe_name,
multiIf(
status = 'done',
'ok',
/* queued: current has max-jobs error AND the previous two do NOT */
(error LIKE max_jobs_err)
AND (coalesce(prev_error1, '') NOT LIKE max_jobs_err)
AND (coalesce(prev_error2, '') NOT LIKE max_jobs_err),
'queued',
/* hard error: last three consecutive runs have the same max-jobs error */
(error LIKE max_jobs_err)
AND (coalesce(prev_error1, '') LIKE max_jobs_err)
AND (coalesce(prev_error2, '') LIKE max_jobs_err),
'error',
/* any other error on the last execution -> error */
(error != '' AND error NOT LIKE max_jobs_err),
'error',
/* otherwise keep original status (e.g., running/queued by system) */
status
) AS status,
error,
started_at
FROM
(
SELECT
*,
lagInFrame(error, 1, '') OVER (
PARTITION BY pipe_id ORDER BY created_at DESC
) AS prev_error1,
lagInFrame(error, 2, '') OVER (
PARTITION BY pipe_id ORDER BY created_at DESC
) AS prev_error2,
row_number() OVER (PARTITION BY pipe_id ORDER BY created_at DESC) AS rn
FROM tinybird.jobs_log
WHERE
job_type = 'copy'
AND started_at > toStartOfDay(now())
AND JSONExtract(job_metadata, 'pipe_name', 'String') <> 'members_with_location'
)
WHERE rn = 1
ORDER BY pipe_id, created_at DESC

NODE errored_copy_pipes_latest_execution
SQL >
WITH
possible_statuses AS (SELECT array('ok', 'error', 'cancelled', 'queued', 'working') AS statuses)
SELECT
'copy_pipes_latest_execution_status' AS name,
IF(s = e.status, 1, 0) AS value,
'Latest execution status per pipe (one-hot: 1 active, 0 otherwise)' AS help,
'gauge' AS type,
map('pipe_name', e.pipe_name, 'status', s) AS labels
FROM copy_pipes_latest_executions AS e
CROSS JOIN possible_statuses ps ARRAY
JOIN ps.statuses AS s
Loading