Skip to content

Commit 525411f

Browse files
author
Eugene Cheung
authored
feat(osis): add monitoring for OpenSearch Ingestion pipelines (#659)
Example: <img width="1650" height="288" alt="Screenshot 2025-08-14 at 11 20 04" src="https://github.com/user-attachments/assets/e6d99e37-cab9-4f98-ae54-596d0297972e" /> --- _By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license_
1 parent 327eab6 commit 525411f

File tree

11 files changed

+1530
-2
lines changed

11 files changed

+1530
-2
lines changed

API.md

Lines changed: 854 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ You can browse the documentation at https://constructs.dev/packages/cdk-monitori
8484
| AWS Lambda (`.monitorLambdaFunction()`) | Latency, errors, iterator max age | Latency, errors, throttles, iterator max age | Optional Lambda Insights metrics (opt-in) support |
8585
| AWS Load Balancing (`.monitorNetworkLoadBalancer()`, `.monitorFargateApplicationLoadBalancer()`, `.monitorFargateNetworkLoadBalancer()`, `.monitorEc2ApplicationLoadBalancer()`, `.monitorEc2NetworkLoadBalancer()`) | System resources and task health | Unhealthy task count, running tasks count, (for Fargate/Ec2 apps) CPU/memory usage | Use for FargateService or Ec2Service backed by a NetworkLoadBalancer or ApplicationLoadBalancer |
8686
| AWS OpenSearch/Elasticsearch (`.monitorOpenSearchCluster()`, `.monitorElasticsearchCluster()`) | Indexing and search latency, disk/memory/CPU usage | Indexing and search latency, disk/memory/CPU usage, cluster status, KMS keys | |
87+
| AWS OpenSearch Ingestion (`.monitorOpenSearchIngestionPipeline()`) | Latency, incoming data, DLQ records count | DLQ records count | |
8788
| AWS RDS (`.monitorRdsCluster()`) | Query duration, connections, latency, disk/CPU usage | Connections, disk and CPU usage | |
8889
| AWS RDS (`.monitorRdsInstance()`) | Query duration, connections, latency, disk/CPU usage | Connections, disk and CPU usage | |
8990
| AWS Redshift (`.monitorRedshiftCluster()`) | Query duration, connections, latency, disk/CPU usage | Query duration, connections, disk and CPU usage | |

lib/common/url/AwsConsoleUrlFactory.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,12 @@ export class AwsConsoleUrlFactory {
113113
return this.getAwsConsoleUrl(destinationUrl);
114114
}
115115

116+
getOsisPipelineUrl(pipelineName: string): string | undefined {
117+
const region = this.awsAccountRegion;
118+
const destinationUrl = `https://${region}.console.aws.amazon.com/aos/osis/home?region=${region}#osis/ingestion-pipelines/${pipelineName}`;
119+
return this.getAwsConsoleUrl(destinationUrl);
120+
}
121+
116122
getRdsClusterUrl(clusterId: string): string | undefined {
117123
const region = this.awsAccountRegion;
118124
const destinationUrl = `https://${region}.console.aws.amazon.com/rds/home?region=${region}#database:id=${clusterId};is-cluster=true;tab=monitoring`;

lib/facade/MonitoringFacade.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ import {
9191
NetworkLoadBalancerMonitoringProps,
9292
OpenSearchClusterMonitoring,
9393
OpenSearchClusterMonitoringProps,
94+
OpenSearchIngestionPipelineMonitoring,
95+
OpenSearchIngestionPipelineMonitoringProps,
9496
QueueProcessingEc2ServiceMonitoringProps,
9597
QueueProcessingFargateServiceMonitoringProps,
9698
RdsClusterMonitoring,
@@ -584,6 +586,14 @@ export class MonitoringFacade extends MonitoringScope {
584586
return this;
585587
}
586588

589+
monitorOpenSearchIngestionPipeline(
590+
props: OpenSearchIngestionPipelineMonitoringProps,
591+
): this {
592+
const segment = new OpenSearchIngestionPipelineMonitoring(this, props);
593+
this.addSegment(segment, props);
594+
return this;
595+
}
596+
587597
monitorElastiCacheCluster(props: ElastiCacheClusterMonitoringProps): this {
588598
const segment = new ElastiCacheClusterMonitoring(this, props);
589599
this.addSegment(segment, props);
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
import type { DimensionsMap } from "aws-cdk-lib/aws-cloudwatch";
2+
import {
3+
BaseMetricFactoryProps,
4+
BaseMetricFactory,
5+
MetricFactory,
6+
MetricWithAlarmSupport,
7+
MetricStatistic,
8+
} from "../../common";
9+
10+
const OpenSearchIngestionNamespace = "AWS/OSIS";
11+
12+
export interface OpenSearchIngestionPipelineMetricFactoryProps
13+
extends BaseMetricFactoryProps {
14+
readonly subPipelineName: string;
15+
readonly source: string;
16+
readonly sink: string;
17+
readonly pipelineName: string;
18+
}
19+
20+
/**
21+
* @experimental This is subject to change if an L2 construct becomes available.
22+
*
23+
* @see https://docs.aws.amazon.com/opensearch-service/latest/developerguide/monitoring-pipeline-metrics.html
24+
*/
25+
export class OpenSearchIngestionPipelineMetricFactory extends BaseMetricFactory<OpenSearchIngestionPipelineMetricFactoryProps> {
26+
protected readonly subPipelineName: string;
27+
protected readonly source: string;
28+
protected readonly sink: string;
29+
30+
protected readonly dimensionsMap: DimensionsMap;
31+
32+
constructor(
33+
metricFactory: MetricFactory,
34+
props: OpenSearchIngestionPipelineMetricFactoryProps,
35+
) {
36+
super(metricFactory, props);
37+
38+
this.subPipelineName = props.subPipelineName;
39+
this.source = props.source;
40+
this.sink = props.sink;
41+
42+
this.dimensionsMap = {
43+
PipelineName: props.pipelineName,
44+
};
45+
}
46+
47+
metricSourceBytesReceivedSum(): MetricWithAlarmSupport {
48+
return this.metricFactory.createMetric(
49+
`${this.subPipelineName}.${this.source}.bytesReceived.sum`,
50+
MetricStatistic.SUM,
51+
`${this.source}.bytesReceived.sum`,
52+
this.dimensionsMap,
53+
undefined,
54+
OpenSearchIngestionNamespace,
55+
undefined,
56+
this.region,
57+
this.account,
58+
);
59+
}
60+
61+
metricSinkBulkRequestLatencyMax(): MetricWithAlarmSupport {
62+
return this.metricFactory.createMetric(
63+
`${this.subPipelineName}.${this.sink}.bulkRequestLatency.max`,
64+
MetricStatistic.MAX,
65+
`${this.sink}.bulkRequestLatency.max`,
66+
this.dimensionsMap,
67+
undefined,
68+
OpenSearchIngestionNamespace,
69+
undefined,
70+
this.region,
71+
this.account,
72+
);
73+
}
74+
75+
metricSinkBulkPipelineLatencyMax(): MetricWithAlarmSupport {
76+
return this.metricFactory.createMetric(
77+
`${this.subPipelineName}.${this.sink}.PipelineLatency.max`,
78+
MetricStatistic.MAX,
79+
`${this.sink}.PipelineLatency.max`,
80+
this.dimensionsMap,
81+
undefined,
82+
OpenSearchIngestionNamespace,
83+
undefined,
84+
this.region,
85+
this.account,
86+
);
87+
}
88+
89+
metricRecordsProcessedCount(): MetricWithAlarmSupport {
90+
return this.metricFactory.createMetric(
91+
`${this.subPipelineName}.recordsProcessed.count`,
92+
MetricStatistic.SUM,
93+
"recordsProcessed.count",
94+
this.dimensionsMap,
95+
undefined,
96+
OpenSearchIngestionNamespace,
97+
undefined,
98+
this.region,
99+
this.account,
100+
);
101+
}
102+
103+
metricSinkRecordsInCount(): MetricWithAlarmSupport {
104+
return this.metricFactory.createMetric(
105+
`${this.subPipelineName}.${this.sink}.recordsIn.count`,
106+
MetricStatistic.SUM,
107+
`${this.sink}.recordsIn.count`,
108+
this.dimensionsMap,
109+
undefined,
110+
OpenSearchIngestionNamespace,
111+
undefined,
112+
this.region,
113+
this.account,
114+
);
115+
}
116+
117+
metricDlqS3RecordsCount(): MetricWithAlarmSupport {
118+
return this.metricFactory.createMetricMath(
119+
"successCount + failedCount",
120+
{
121+
successCount: this.metricDlqS3RecordsSuccessCount(),
122+
failedCount: this.metricDlqS3RecordsFailedCount(),
123+
},
124+
"DLQ records count",
125+
);
126+
}
127+
128+
metricDlqS3RecordsSuccessCount(): MetricWithAlarmSupport {
129+
return this.metricFactory.createMetric(
130+
`${this.subPipelineName}.${this.sink}.s3.dlqS3RecordsSuccess.count`,
131+
MetricStatistic.SUM,
132+
"s3.dlqS3RecordsSuccess.count",
133+
this.dimensionsMap,
134+
undefined,
135+
OpenSearchIngestionNamespace,
136+
undefined,
137+
this.region,
138+
this.account,
139+
);
140+
}
141+
142+
metricDlqS3RecordsFailedCount(): MetricWithAlarmSupport {
143+
return this.metricFactory.createMetric(
144+
`${this.subPipelineName}.${this.sink}.s3.dlqS3RecordsFailed.count`,
145+
MetricStatistic.SUM,
146+
"s3.dlqS3RecordsFailed.count",
147+
this.dimensionsMap,
148+
undefined,
149+
OpenSearchIngestionNamespace,
150+
undefined,
151+
this.region,
152+
this.account,
153+
);
154+
}
155+
}
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
import type { HorizontalAnnotation, IWidget } from "aws-cdk-lib/aws-cloudwatch";
2+
import { GraphWidget, Row } from "aws-cdk-lib/aws-cloudwatch";
3+
4+
import type { OpenSearchIngestionPipelineMetricFactoryProps } from "./OpenSearchIngestionPipelineMetricFactory";
5+
import { OpenSearchIngestionPipelineMetricFactory } from "./OpenSearchIngestionPipelineMetricFactory";
6+
import {
7+
BaseMonitoringProps,
8+
MaxUsageCountThreshold,
9+
MetricWithAlarmSupport,
10+
Monitoring,
11+
AlarmFactory,
12+
UsageAlarmFactory,
13+
MonitoringScope,
14+
ThirdWidth,
15+
DefaultGraphWidgetHeight,
16+
TimeAxisMillisFromZero,
17+
CountAxisFromZero,
18+
} from "../../common";
19+
import {
20+
MonitoringNamingStrategy,
21+
MonitoringHeaderWidget,
22+
} from "../../dashboard";
23+
24+
export interface OpenSearchIngestionPipelineMonitoringOptions
25+
extends BaseMonitoringProps {
26+
readonly addMaxDlqS3CountAlarm?: Record<string, MaxUsageCountThreshold>;
27+
}
28+
29+
export interface OpenSearchIngestionPipelineMonitoringProps
30+
extends OpenSearchIngestionPipelineMetricFactoryProps,
31+
OpenSearchIngestionPipelineMonitoringOptions {}
32+
33+
/**
34+
* @experimental This is subject to change if an L2 construct becomes available.
35+
*/
36+
export class OpenSearchIngestionPipelineMonitoring extends Monitoring {
37+
readonly title: string;
38+
readonly pipelineUrl?: string;
39+
40+
readonly metricSinkRecordsInCount: MetricWithAlarmSupport;
41+
readonly metricSourceBytesReceivedSum: MetricWithAlarmSupport;
42+
readonly metricSinkBulkRequestLatencyMax: MetricWithAlarmSupport;
43+
readonly metricSinkBulkPipelineLatencyMax: MetricWithAlarmSupport;
44+
readonly metricDlqS3RecordsCount: MetricWithAlarmSupport;
45+
46+
readonly alarmFactory: AlarmFactory;
47+
readonly usageAlarmFactory: UsageAlarmFactory;
48+
49+
readonly usageAnnotations: HorizontalAnnotation[];
50+
51+
constructor(
52+
scope: MonitoringScope,
53+
props: OpenSearchIngestionPipelineMonitoringProps,
54+
) {
55+
super(scope, props);
56+
57+
const namingStrategy = new MonitoringNamingStrategy({
58+
...props,
59+
fallbackConstructName: props.pipelineName,
60+
});
61+
this.title = namingStrategy.resolveHumanReadableName();
62+
this.pipelineUrl = scope
63+
.createAwsConsoleUrlFactory()
64+
.getOsisPipelineUrl(props.pipelineName);
65+
66+
this.alarmFactory = this.createAlarmFactory(
67+
namingStrategy.resolveAlarmFriendlyName(),
68+
);
69+
this.usageAlarmFactory = new UsageAlarmFactory(this.alarmFactory);
70+
71+
this.usageAnnotations = [];
72+
73+
const metricFactory = new OpenSearchIngestionPipelineMetricFactory(
74+
scope.createMetricFactory(),
75+
props,
76+
);
77+
78+
this.metricSinkRecordsInCount = metricFactory.metricSinkRecordsInCount();
79+
this.metricSourceBytesReceivedSum =
80+
metricFactory.metricSourceBytesReceivedSum();
81+
this.metricSinkBulkRequestLatencyMax =
82+
metricFactory.metricSinkBulkRequestLatencyMax();
83+
this.metricSinkBulkPipelineLatencyMax =
84+
metricFactory.metricSinkBulkPipelineLatencyMax();
85+
this.metricDlqS3RecordsCount = metricFactory.metricDlqS3RecordsCount();
86+
87+
for (const disambiguator in props.addMaxDlqS3CountAlarm) {
88+
const alarmProps = props.addMaxDlqS3CountAlarm[disambiguator];
89+
const createdAlarm = this.usageAlarmFactory.addMaxCountAlarm(
90+
this.metricDlqS3RecordsCount,
91+
alarmProps,
92+
disambiguator,
93+
);
94+
this.usageAnnotations.push(createdAlarm.annotation);
95+
this.addAlarm(createdAlarm);
96+
}
97+
98+
props.useCreatedAlarms?.consume(this.createdAlarms());
99+
}
100+
101+
summaryWidgets(): IWidget[] {
102+
return this.widgets();
103+
}
104+
105+
widgets(): IWidget[] {
106+
return [
107+
this.createTitleWidget(),
108+
new Row(
109+
this.createLatencyWidget(ThirdWidth, DefaultGraphWidgetHeight),
110+
this.createIncomingDataWidget(ThirdWidth, DefaultGraphWidgetHeight),
111+
this.createDlqS3Widget(ThirdWidth, DefaultGraphWidgetHeight),
112+
),
113+
];
114+
}
115+
116+
protected createTitleWidget(): IWidget {
117+
return new MonitoringHeaderWidget({
118+
family: "OpenSearch Ingestion",
119+
title: this.title,
120+
goToLinkUrl: this.pipelineUrl,
121+
});
122+
}
123+
124+
protected createLatencyWidget(width: number, height: number): IWidget {
125+
return new GraphWidget({
126+
width,
127+
height,
128+
title: "Latency",
129+
left: [
130+
this.metricSinkBulkRequestLatencyMax,
131+
this.metricSinkBulkPipelineLatencyMax,
132+
],
133+
leftYAxis: TimeAxisMillisFromZero,
134+
});
135+
}
136+
137+
protected createIncomingDataWidget(width: number, height: number): IWidget {
138+
return new GraphWidget({
139+
width,
140+
height,
141+
title: "Incoming data",
142+
left: [this.metricSinkRecordsInCount],
143+
leftYAxis: CountAxisFromZero,
144+
right: [this.metricSourceBytesReceivedSum],
145+
rightYAxis: CountAxisFromZero,
146+
});
147+
}
148+
149+
protected createDlqS3Widget(width: number, height: number): IWidget {
150+
return new GraphWidget({
151+
width,
152+
height,
153+
title: "DLQ",
154+
left: [this.metricDlqS3RecordsCount],
155+
leftYAxis: CountAxisFromZero,
156+
leftAnnotations: this.usageAnnotations,
157+
});
158+
}
159+
}

lib/monitoring/aws-osis/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
export * from "./OpenSearchIngestionPipelineMetricFactory";
2+
export * from "./OpenSearchIngestionPipelineMonitoring";

lib/monitoring/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ export * from "./aws-kinesisanalytics";
1717
export * from "./aws-lambda";
1818
export * from "./aws-loadbalancing";
1919
export * from "./aws-opensearch";
20+
export * from "./aws-osis";
2021
export * from "./aws-rds";
2122
export * from "./aws-redshift";
2223
export * from "./aws-s3";

test/common/url/AwsConsoleUrlFactory.test.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,25 @@ test("getRedshiftClusterUrl", () => {
255255
).toEqual(expected);
256256
});
257257

258+
test("getOsisPipelineUrl", () => {
259+
const stack = new Stack();
260+
const factory = new AwsConsoleUrlFactory({ awsAccountId, awsAccountRegion });
261+
262+
const expected =
263+
"https://eu-west-1.console.aws.amazon.com/aos/osis/home?region=eu-west-1#osis/ingestion-pipelines/PipelineName";
264+
265+
expect(stack.resolve(factory.getOsisPipelineUrl("PipelineName"))).toEqual(
266+
expected,
267+
);
268+
expect(
269+
stack.resolve(
270+
factory.getOsisPipelineUrl(
271+
Lazy.string({ produce: () => "PipelineName" }),
272+
),
273+
),
274+
).toEqual(expected);
275+
});
276+
258277
test("getOpenSearchClusterUrl", () => {
259278
const stack = new Stack();
260279
const factory = new AwsConsoleUrlFactory({ awsAccountId, awsAccountRegion });

0 commit comments

Comments
 (0)