Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
vendor
jsonnetfile.lock.json
*.zip
.worktrees
8 changes: 7 additions & 1 deletion apache-airflow-mixin/.lint
Original file line number Diff line number Diff line change
@@ -1 +1,7 @@
exclusions: {}
exclusions:
template-job-rule:
reason: "Prometheus datasource variable is being named as prometheus_datasource now while linter expects 'datasource'"
panel-datasource-rule:
reason: "Modern mixins use signal-based architecture where datasource references are handled by the framework"
template-datasource-rule:
reason: "Based on new convention we are using variable names prometheus_datasource and loki_datasource where as linter expects 'datasource'"
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
{
prometheusAlerts+:: {
groups+: [
new(this): {
groups: [
{
name: 'apache-airflow',
name: this.config.uid + '-alerts',
rules: [
{
alert: 'ApacheAirflowStarvingPoolTasks',
expr: |||
airflow_pool_starving_tasks > %(alertsCriticalPoolStarvingTasks)s
||| % $._config,
airflow_pool_starving_tasks{%(filteringSelector)s} > %(alertsCriticalPoolStarvingTasks)s
||| % this.config,
'for': '5m',
labels: {
severity: 'critical',
Expand All @@ -17,14 +17,14 @@
summary: 'There are starved tasks detected in the Apache Airflow pool.',
description: |||
The number of starved tasks is {{ printf "%%.0f" $value }} over the last 5m on {{ $labels.instance }} - {{ $labels.pool_name }} which is above the threshold of %(alertsCriticalPoolStarvingTasks)s.
||| % $._config,
||| % this.config,
},
},
{
alert: 'ApacheAirflowDAGScheduleDelayWarningLevel',
alert: 'ApacheAirflowDAGScheduleDelayWarning',
expr: |||
increase(airflow_dagrun_schedule_delay_sum[5m]) / clamp_min(increase(airflow_dagrun_schedule_delay_count[5m]),1) > %(alertsWarningDAGScheduleDelayLevel)s
||| % $._config,
increase(airflow_dagrun_schedule_delay_sum{%(filteringSelector)s}[5m]) / clamp_min(increase(airflow_dagrun_schedule_delay_count{%(filteringSelector)s}[5m]),1) > %(alertsWarningDAGScheduleDelayLevel)s
||| % this.config,
'for': '1m',
labels: {
severity: 'warning',
Expand All @@ -33,14 +33,14 @@
summary: 'The delay in DAG schedule time to DAG run time has reached the warning threshold.',
description: |||
The average delay in DAG schedule to run time is {{ printf "%%.0f" $value }} over the last 1m on {{ $labels.instance }} - {{ $labels.dag_id }} which is above the threshold of %(alertsWarningDAGScheduleDelayLevel)s.
||| % $._config,
||| % this.config,
},
},
{
alert: 'ApacheAirflowDAGScheduleDelayCriticalLevel',
alert: 'ApacheAirflowDAGScheduleDelayCritical',
expr: |||
increase(airflow_dagrun_schedule_delay_sum[5m]) / clamp_min(increase(airflow_dagrun_schedule_delay_count[5m]),1) > %(alertsCriticalDAGScheduleDelayLevel)s
||| % $._config,
increase(airflow_dagrun_schedule_delay_sum{%(filteringSelector)s}[5m]) / clamp_min(increase(airflow_dagrun_schedule_delay_count{%(filteringSelector)s}[5m]),1) > %(alertsCriticalDAGScheduleDelayLevel)s
||| % this.config,
'for': '1m',
labels: {
severity: 'critical',
Expand All @@ -49,14 +49,14 @@
summary: 'The delay in DAG schedule time to DAG run time has reached the critical threshold.',
description: |||
The average delay in DAG schedule to run time is {{ printf "%%.0f" $value }} over the last 1m for {{ $labels.instance }} - {{ $labels.dag_id }} which is above the threshold of %(alertsCriticalDAGScheduleDelayLevel)s.
||| % $._config,
||| % this.config,
},
},
{
alert: 'ApacheAirflowDAGFailures',
expr: |||
increase(airflow_dagrun_duration_failed_count[5m]) > %(alertsCriticalFailedDAGs)s
||| % $._config,
increase(airflow_dagrun_duration_failed_count{%(filteringSelector)s}[5m]) > %(alertsCriticalFailedDAGs)s
||| % this.config,
'for': '1m',
labels: {
severity: 'critical',
Expand All @@ -65,7 +65,7 @@
summary: 'There have been DAG failures detected.',
description: |||
The number of DAG failures seen is {{ printf "%%.0f" $value }} over the last 1m for {{ $labels.instance }} - {{ $labels.dag_id }} which is above the threshold of %(alertsCriticalFailedDAGs)s.
||| % $._config,
||| % this.config,
},
},
],
Expand Down
50 changes: 36 additions & 14 deletions apache-airflow-mixin/config.libsonnet
Original file line number Diff line number Diff line change
@@ -1,20 +1,42 @@
{
_config+:: {
dashboardTags: ['apache-airflow-mixin'],
dashboardPeriod: 'now-1h',
dashboardTimezone: 'default',
dashboardRefresh: '1m',
local this = self,
filteringSelector: 'job=~"$job", instance=~"$instance"',
groupLabels: ['job'],
instanceLabels: ['instance'],
dashboardTags: ['apache-airflow-mixin'],
uid: 'apache-airflow',
dashboardNamePrefix: 'Apache Airflow',

//alert thresholds
alertsCriticalPoolStarvingTasks: 0,
alertsWarningDAGScheduleDelayLevel: 10, //s
alertsCriticalDAGScheduleDelayLevel: 60, //s
alertsCriticalFailedDAGs: 0,
// additional params
dashboardPeriod: 'now-1h',
dashboardTimezone: 'default',
dashboardRefresh: '1m',

enableLokiLogs: true,
enableMultiCluster: false,
// logs lib related
enableLokiLogs: true,
logLabels: ['job', 'instance', 'filename'],
extraLogLabels: [], // Required by logs-lib
logsVolumeGroupBy: 'level',
showLogsVolume: true,

multiclusterSelector: 'job=~"$job"',
airflowSelector: if self.enableMultiCluster then 'job=~"$job", cluster=~"$cluster"' else 'job=~"$job"',
// alert thresholds
alertsCriticalPoolStarvingTasks: 0, // count
alertsWarningDAGScheduleDelayLevel: 10, // s
alertsCriticalDAGScheduleDelayLevel: 60, // s
alertsCriticalFailedDAGs: 0, // count

// multi-cluster support
enableMultiCluster: false,

// metrics source for signals library
metricsSource: 'prometheus',

legendCustomTemplate: std.join(' ', std.map(function(label) '{{' + label + '}}', this.instanceLabels)),
signals+: {
dags: (import './signals/dags.libsonnet')(this),
tasks: (import './signals/tasks.libsonnet')(this),
scheduler: (import './signals/scheduler.libsonnet')(this),
executor: (import './signals/executor.libsonnet')(this),
pools: (import './signals/pools.libsonnet')(this),
},
}
142 changes: 142 additions & 0 deletions apache-airflow-mixin/dashboards.libsonnet
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
local g = import './g.libsonnet';
local logslib = import 'logs-lib/logs/main.libsonnet';

{
local root = self,
new(this)::
local prefix = this.config.dashboardNamePrefix;
local links = this.grafana.links;
local tags = this.config.dashboardTags;
local uid = g.util.string.slugify(this.config.uid);
local vars = this.grafana.variables;
local annotations = this.grafana.annotations;
local refresh = this.config.dashboardRefresh;
local period = this.config.dashboardPeriod;
local timezone = this.config.dashboardTimezone;

{
// Apache Airflow overview dashboard
'apache-airflow-overview.json':
g.dashboard.new(prefix + ' overview')
+ g.dashboard.withDescription('Dashboard providing an overview of Apache Airflow DAGs, tasks, and scheduler performance.')
+ g.dashboard.withPanels(
g.util.panel.resolveCollapsedFlagOnRows(
g.util.grid.wrapPanels(
[
this.grafana.rows.apacheAirflowOverview,
this.grafana.rows.apacheAirflowSchedulerDetails,
]
)
)
)
+ root.applyCommon(
vars.multiInstance + [
g.dashboard.variable.query.new('dag_id')
+ g.dashboard.variable.query.withDatasourceFromVariable(vars.datasources.prometheus)
+ g.dashboard.variable.query.queryTypes.withLabelValues('dag_id', 'airflow_dagrun_duration_success_sum{job=~"$job", instance=~"$instance"}')
+ g.dashboard.variable.query.generalOptions.withLabel('DAG ID')
+ g.dashboard.variable.query.selectionOptions.withMulti(true)
+ g.dashboard.variable.query.selectionOptions.withIncludeAll(true, '.+')
+ g.dashboard.variable.query.refresh.onLoad()
+ g.dashboard.variable.query.refresh.onTime(),

g.dashboard.variable.query.new('task_id')
+ g.dashboard.variable.query.withDatasourceFromVariable(vars.datasources.prometheus)
+ g.dashboard.variable.query.queryTypes.withLabelValues('task_id', 'airflow_ti_failures{job=~"$job", instance=~"$instance"}')
+ g.dashboard.variable.query.generalOptions.withLabel('Task ID')
+ g.dashboard.variable.query.selectionOptions.withMulti(true)
+ g.dashboard.variable.query.selectionOptions.withIncludeAll(true, '.+')
+ g.dashboard.variable.query.refresh.onLoad()
+ g.dashboard.variable.query.refresh.onTime(),

g.dashboard.variable.query.new('state')
+ g.dashboard.variable.query.withDatasourceFromVariable(vars.datasources.prometheus)
+ g.dashboard.variable.query.queryTypes.withLabelValues('state', 'airflow_task_finish_total{job=~"$job", instance=~"$instance"}')
+ g.dashboard.variable.query.generalOptions.withLabel('State')
+ g.dashboard.variable.query.selectionOptions.withMulti(true)
+ g.dashboard.variable.query.selectionOptions.withIncludeAll(true, '.+')
+ g.dashboard.variable.query.refresh.onLoad()
+ g.dashboard.variable.query.refresh.onTime(),

g.dashboard.variable.query.new('pool_name')
+ g.dashboard.variable.query.withDatasourceFromVariable(vars.datasources.prometheus)
+ g.dashboard.variable.query.queryTypes.withLabelValues('pool_name', 'airflow_pool_running_slots{job=~"$job", instance=~"$instance"}')
+ g.dashboard.variable.query.generalOptions.withLabel('Pool name')
+ g.dashboard.variable.query.selectionOptions.withMulti(true)
+ g.dashboard.variable.query.selectionOptions.withIncludeAll(true, '.+')
+ g.dashboard.variable.query.refresh.onLoad()
+ g.dashboard.variable.query.refresh.onTime(),
],
uid + '_overview',
tags,
links { apacheAirflowOverview+:: {} },
annotations,
timezone,
refresh,
period
),
}
+
if this.config.enableLokiLogs then
{
'apache-airflow-logs.json':
logslib.new(
prefix + ' logs',
datasourceName=vars.datasources.loki.name,
datasourceRegex=vars.datasources.loki.regex,
filterSelector=this.config.filteringSelector,
labels=this.config.logLabels + this.config.extraLogLabels,
formatParser=null,
showLogsVolume=this.config.showLogsVolume,
logsVolumeGroupBy=this.config.logsVolumeGroupBy,
extraFilters=[]
)
{
dashboards+:
{
logs+:
g.dashboard.withPanels(
g.util.panel.resolveCollapsedFlagOnRows(
g.util.grid.wrapPanels(
[
this.grafana.rows.apacheAirflowLogs,
]
)
)
)
+ root.applyCommon(
[],
uid + '_logs',
tags,
links { logs+:: {} },
annotations,
timezone,
refresh,
period
),
},
variables+: {
toArray+: [
g.dashboard.variable.query.new('dag_file')
+ g.dashboard.variable.query.withDatasourceFromVariable(vars.datasources.loki)
+ g.dashboard.variable.query.queryTypes.withLabelValues('filename', '{job=~"$job", instance=~"$instance"}')
+ g.dashboard.variable.query.generalOptions.withLabel('DAG file')
+ g.dashboard.variable.query.selectionOptions.withMulti(true)
+ g.dashboard.variable.query.selectionOptions.withIncludeAll(true, '.+')
+ g.dashboard.variable.query.refresh.onLoad()
+ g.dashboard.variable.query.refresh.onTime(),
],
},
}.dashboards.logs,
} else {},

applyCommon(vars, uid, tags, links, annotations, timezone, refresh, period):
g.dashboard.withTags(tags)
+ g.dashboard.withUid(uid)
+ g.dashboard.withLinks(std.objectValues(links))
+ g.dashboard.withTimezone(timezone)
+ g.dashboard.withRefresh(refresh)
+ g.dashboard.time.withFrom(period)
+ g.dashboard.withVariables(vars)
+ g.dashboard.withAnnotations(std.objectValues(annotations)),
}
Loading
Loading