Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions apache-airflow-mixin/alerts.libsonnet
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
{
new(this): {
groups: [
{
name: this.config.uid + '-alerts',
rules: [
{
alert: 'ApacheAirflowStarvingPoolTasks',
expr: |||
airflow_pool_starving_tasks{%(filteringSelector)s} > %(alertsCriticalPoolStarvingTasks)s
||| % this.config,
'for': '5m',
labels: {
severity: 'critical',
},
annotations: {
summary: 'There are starved tasks detected in the Apache Airflow pool.',
description: |||
The number of starved tasks is {{ printf "%%.0f" $value }} over the last 5m on {{ $labels.instance }} - {{ $labels.pool_name }} which is above the threshold of %(alertsCriticalPoolStarvingTasks)s.
||| % this.config,
},
},
{
alert: 'ApacheAirflowDAGScheduleDelayWarning',
expr: |||
increase(airflow_dagrun_schedule_delay_sum{%(filteringSelector)s}[5m]) / clamp_min(increase(airflow_dagrun_schedule_delay_count{%(filteringSelector)s}[5m]),1) > %(alertsWarningDAGScheduleDelayLevel)s
||| % this.config,
'for': '1m',
labels: {
severity: 'warning',
},
annotations: {
summary: 'The delay in DAG schedule time to DAG run time has reached the warning threshold.',
description: |||
The average delay in DAG schedule to run time is {{ printf "%%.0f" $value }} over the last 1m on {{ $labels.instance }} - {{ $labels.dag_id }} which is above the threshold of %(alertsWarningDAGScheduleDelayLevel)s.
||| % this.config,
},
},
{
alert: 'ApacheAirflowDAGScheduleDelayCritical',
expr: |||
increase(airflow_dagrun_schedule_delay_sum{%(filteringSelector)s}[5m]) / clamp_min(increase(airflow_dagrun_schedule_delay_count{%(filteringSelector)s}[5m]),1) > %(alertsCriticalDAGScheduleDelayLevel)s
||| % this.config,
'for': '1m',
labels: {
severity: 'critical',
},
annotations: {
summary: 'The delay in DAG schedule time to DAG run time has reached the critical threshold.',
description: |||
The average delay in DAG schedule to run time is {{ printf "%%.0f" $value }} over the last 1m for {{ $labels.instance }} - {{ $labels.dag_id }} which is above the threshold of %(alertsCriticalDAGScheduleDelayLevel)s.
||| % this.config,
},
},
{
alert: 'ApacheAirflowDAGFailures',
expr: |||
increase(airflow_dagrun_duration_failed_count{%(filteringSelector)s}[5m]) > %(alertsCriticalFailedDAGs)s
||| % this.config,
'for': '1m',
labels: {
severity: 'critical',
},
annotations: {
summary: 'There have been DAG failures detected.',
description: |||
The number of DAG failures seen is {{ printf "%%.0f" $value }} over the last 1m for {{ $labels.instance }} - {{ $labels.dag_id }} which is above the threshold of %(alertsCriticalFailedDAGs)s.
||| % this.config,
},
},
],
},
],
},
}
46 changes: 32 additions & 14 deletions apache-airflow-mixin/config.libsonnet
Original file line number Diff line number Diff line change
@@ -1,20 +1,38 @@
{
_config+:: {
dashboardTags: ['apache-airflow-mixin'],
dashboardPeriod: 'now-1h',
dashboardTimezone: 'default',
dashboardRefresh: '1m',
local this = self,
filteringSelector: 'job="integrations/apache-airflow"',
groupLabels: ['job'],
instanceLabels: ['instance'],
overviewLabels: [],
dashboardTags: ['apache-airflow-mixin'],
uid: 'apache-airflow',
dashboardNamePrefix: 'Apache Airflow',

//alert thresholds
alertsCriticalPoolStarvingTasks: 0,
alertsWarningDAGScheduleDelayLevel: 10, //s
alertsCriticalDAGScheduleDelayLevel: 60, //s
alertsCriticalFailedDAGs: 0,
// additional params
dashboardPeriod: 'now-6h',
dashboardTimezone: 'default',
dashboardRefresh: '1m',

enableLokiLogs: true,
enableMultiCluster: false,
// logs lib related
enableLokiLogs: true,
logLabels: ['job', 'instance'],
extraLogLabels: ['dag_file', 'filename'], // Required by logs-lib
logsVolumeGroupBy: 'level',
showLogsVolume: true,

multiclusterSelector: 'job=~"$job"',
airflowSelector: if self.enableMultiCluster then 'job=~"$job", cluster=~"$cluster"' else 'job=~"$job"',
// alert thresholds
alertsCriticalPoolStarvingTasks: 0, // count
alertsWarningDAGScheduleDelayLevel: 10, // s
alertsCriticalDAGScheduleDelayLevel: 60, // s
alertsCriticalFailedDAGs: 0, // count

// multi-cluster support
enableMultiCluster: false,

// metrics source for signals library
metricsSource: 'prometheus',

signals+: {
overview: (import './signals/overview.libsonnet')(this),
},
}
115 changes: 115 additions & 0 deletions apache-airflow-mixin/dashboards.libsonnet
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
local g = import './g.libsonnet';
local logslib = import 'logs-lib/logs/main.libsonnet';

{
local root = self,
new(this)::
local prefix = this.config.dashboardNamePrefix;
local links = this.grafana.links;
local tags = this.config.dashboardTags;
local uid = g.util.string.slugify(this.config.uid);
local vars = this.grafana.variables;
local annotations = this.grafana.annotations;
local refresh = this.config.dashboardRefresh;
local period = this.config.dashboardPeriod;
local timezone = this.config.dashboardTimezone;

{
// Apache Airflow overview dashboard
'apache-airflow-overview.json':
g.dashboard.new(prefix + ' overview')
+ g.dashboard.withDescription('Dashboard providing an overview of Apache Airflow DAGs, tasks, and scheduler performance.')
+ g.dashboard.withPanels(
g.util.panel.resolveCollapsedFlagOnRows(
g.util.grid.wrapPanels(
[
this.grafana.rows.apacheAirflowOverview,
this.grafana.rows.apacheAirflowSchedulerDetails,
]
)
)
)
+ root.applyCommon(
vars.multiInstance + [
g.dashboard.variable.query.new('dag_id')
+ g.dashboard.variable.query.withDatasourceFromVariable(vars.datasources.prometheus)
+ g.dashboard.variable.query.queryTypes.withLabelValues('dag_id', 'airflow_dagrun_duration_success_sum{job=~"$job", instance=~"$instance"}')
+ g.dashboard.variable.query.generalOptions.withLabel('DAG ID')
+ g.dashboard.variable.query.selectionOptions.withMulti(true)
+ g.dashboard.variable.query.selectionOptions.withIncludeAll(true, '.+')
+ g.dashboard.variable.query.refresh.onLoad()
+ g.dashboard.variable.query.refresh.onTime(),

g.dashboard.variable.query.new('task_id')
+ g.dashboard.variable.query.withDatasourceFromVariable(vars.datasources.prometheus)
+ g.dashboard.variable.query.queryTypes.withLabelValues('task_id', 'airflow_task_finish_total{job=~"$job", instance=~"$instance"}')
+ g.dashboard.variable.query.generalOptions.withLabel('Task')
+ g.dashboard.variable.query.selectionOptions.withMulti(true)
+ g.dashboard.variable.query.selectionOptions.withIncludeAll(true, '.+')
+ g.dashboard.variable.query.refresh.onLoad()
+ g.dashboard.variable.query.refresh.onTime(),

g.dashboard.variable.query.new('state')
+ g.dashboard.variable.query.withDatasourceFromVariable(vars.datasources.prometheus)
+ g.dashboard.variable.query.queryTypes.withLabelValues('state', 'airflow_task_finish_total{job=~"$job", instance=~"$instance"}')
+ g.dashboard.variable.query.generalOptions.withLabel('State')
+ g.dashboard.variable.query.selectionOptions.withMulti(true)
+ g.dashboard.variable.query.selectionOptions.withIncludeAll(true, '.+')
+ g.dashboard.variable.query.refresh.onLoad()
+ g.dashboard.variable.query.refresh.onTime(),

g.dashboard.variable.query.new('pool_name')
+ g.dashboard.variable.query.withDatasourceFromVariable(vars.datasources.prometheus)
+ g.dashboard.variable.query.queryTypes.withLabelValues('pool_name', 'airflow_pool_running_slots{job=~"$job", instance=~"$instance"}')
+ g.dashboard.variable.query.generalOptions.withLabel('Pool name')
+ g.dashboard.variable.query.selectionOptions.withMulti(true)
+ g.dashboard.variable.query.selectionOptions.withIncludeAll(true, '.+')
+ g.dashboard.variable.query.refresh.onLoad()
+ g.dashboard.variable.query.refresh.onTime(),
],
uid + '_overview',
tags,
links { apacheAirflowOverview+:: {} },
annotations,
timezone,
refresh,
period
),
}
+
if this.config.enableLokiLogs then
{
'apache-airflow-logs.json':
logslib.new(
prefix + ' logs',
datasourceName=this.grafana.variables.datasources.loki.name,
datasourceRegex=this.grafana.variables.datasources.loki.regex,
filterSelector=this.config.filteringSelector,
labels=this.config.groupLabels + this.config.extraLogLabels,
)
{
dashboards+:
{
logs+:
root.applyCommon(super.logs.templating.list, uid=uid + '-logs', tags=tags, links=links { logs+:: {} }, annotations=annotations, timezone=timezone, refresh=refresh, period=period),
},
panels+:
{
logs+:
g.panel.logs.options.withEnableLogDetails(true)
+ g.panel.logs.options.withShowTime(false)
+ g.panel.logs.options.withWrapLogMessage(false),
},
}.dashboards.logs,
} else {},

applyCommon(vars, uid, tags, links, annotations, timezone, refresh, period):
g.dashboard.withTags(tags)
+ g.dashboard.withUid(uid)
+ g.dashboard.withLinks(std.objectValues(links))
+ g.dashboard.withTimezone(timezone)
+ g.dashboard.withRefresh(refresh)
+ g.dashboard.time.withFrom(period)
+ g.dashboard.withVariables(vars)
+ g.dashboard.withAnnotations(std.objectValues(annotations)),
}
Loading
Loading