Skip to content

Commit bf00119

Browse files
authored
feat: emit CloudWatch metric for inflight requests in gateway (#1850)
- Add AWS CloudWatch SDK client dependency - Create CloudWatch metrics service to track inflight requests per task - Add middleware to increment/decrement inflight request counter - Emit metric every 10 seconds with TaskId and Environment dimensions - Only enable when AWS credentials are available (AWS_ACCESS_KEY and AWS_ACCESS_SECRET) - Metric name: inflight_per_task under Latitude/Gateway namespace
1 parent 7196c7a commit bf00119

File tree

6 files changed

+935
-9
lines changed

6 files changed

+935
-9
lines changed

apps/gateway/package.json

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,24 @@
1515
"datadog:sourcemaps": "datadog-ci sourcemaps upload dist --service=latitude-llm-gateway --minified-path-prefix=/app/apps/gateway/dist --release-version=${RELEASE_VERSION:-unknown}"
1616
},
1717
"dependencies": {
18+
"@aws-sdk/client-cloudwatch": "^3.914.0",
1819
"@hono/node-server": "1.13.2",
1920
"@hono/swagger-ui": "0.4.1",
2021
"@hono/zod-openapi": "1.1.1",
2122
"@latitude-data/constants": "workspace:^",
2223
"@latitude-data/core": "workspace:^",
2324
"@latitude-data/env": "workspace:^",
24-
"@latitude-data/telemetry": "workspace:*",
2525
"@latitude-data/sdk": "workspace:^",
26+
"@latitude-data/telemetry": "workspace:*",
2627
"@t3-oss/env-core": "0.13.8",
28+
"dd-trace": "catalog:",
2729
"drizzle-orm": "catalog:",
2830
"hono": "4.9.7",
2931
"ioredis": "5.6.0",
3032
"lodash-es": "4.17.21",
3133
"promptl-ai": "catalog:",
3234
"rate-limiter-flexible": "5.0.3",
33-
"zod": "catalog:",
34-
"dd-trace": "catalog:"
35+
"zod": "catalog:"
3536
},
3637
"devDependencies": {
3738
"@datadog/datadog-ci": "catalog:",
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import { MiddlewareHandler } from 'hono'
2+
import { cloudWatchMetrics } from '../services/cloudwatchMetrics'
3+
4+
export const inflightRequestsMiddleware = (): MiddlewareHandler => {
5+
return async (_c, next) => {
6+
cloudWatchMetrics.incrementInflightRequests()
7+
8+
try {
9+
await next()
10+
} finally {
11+
cloudWatchMetrics.decrementInflightRequests()
12+
}
13+
}
14+
}

apps/gateway/src/routes/app.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@ import { logger } from 'hono/logger'
33
import authMiddleware from '$/middlewares/auth'
44
import { rateLimitMiddleware } from '$/middlewares/rateLimit'
55
import errorHandlerMiddleware from '$/middlewares/errorHandler'
6+
import { inflightRequestsMiddleware } from '$/middlewares/inflightRequests'
67

78
import createApp from '$/openApi/createApp'
89
import configureOpenAPI from '$/openApi/configureOpenAPI'
910
import { configureApiRoutes } from './api'
1011
import { configureWebhookRoutes } from './webhook'
1112
import { memoryUsageMiddleware } from '$/middlewares/memoryLogger'
1213
import { tracerMiddleware } from '$/middlewares/tracer'
14+
import { env } from '@latitude-data/env'
1315

1416
const app = createApp()
1517

@@ -30,6 +32,10 @@ configureWebhookRoutes(app)
3032
app.use(rateLimitMiddleware())
3133
app.use(authMiddleware())
3234

35+
if (env.AWS_ACCESS_KEY && env.AWS_ACCESS_SECRET) {
36+
app.use(inflightRequestsMiddleware())
37+
}
38+
3339
configureApiRoutes(app)
3440

3541
app.onError(errorHandlerMiddleware)

apps/gateway/src/server.ts

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import cluster from 'cluster'
66
import os from 'os'
77

88
import { env } from '@latitude-data/env'
9+
import { cloudWatchMetrics } from './services/cloudwatchMetrics'
910

1011
const HOSTNAME = env.GATEWAY_BIND_ADDRESS
1112
const PORT = env.GATEWAY_BIND_PORT
@@ -60,11 +61,26 @@ if (cluster.isPrimary) {
6061
console.log(
6162
`Worker ${process.pid} listening on http://${HOSTNAME}:${PORT}`,
6263
)
64+
65+
if (env.AWS_ACCESS_KEY && env.AWS_ACCESS_SECRET) {
66+
cloudWatchMetrics.startPeriodicEmission()
67+
console.log('Started CloudWatch metrics emission for inflight requests')
68+
}
6369
},
6470
)
6571

66-
process.on('SIGTERM', () => gracefulShutdown(server))
67-
process.on('SIGINT', () => gracefulShutdown(server))
72+
process.on('SIGTERM', () => {
73+
if (env.AWS_ACCESS_KEY && env.AWS_ACCESS_SECRET) {
74+
cloudWatchMetrics.stopPeriodicEmission()
75+
}
76+
gracefulShutdown(server)
77+
})
78+
process.on('SIGINT', () => {
79+
if (env.AWS_ACCESS_KEY && env.AWS_ACCESS_SECRET) {
80+
cloudWatchMetrics.stopPeriodicEmission()
81+
}
82+
gracefulShutdown(server)
83+
})
6884

6985
process.on('uncaughtException', function (err) {
7086
captureException(err)
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import {
2+
CloudWatchClient,
3+
PutMetricDataCommand,
4+
} from '@aws-sdk/client-cloudwatch'
5+
import { env } from '@latitude-data/env'
6+
7+
class CloudWatchMetricsService {
8+
private client: CloudWatchClient | null = null
9+
private inflightRequests = 0
10+
private namespace = 'Latitude/Gateway'
11+
private metricName = 'inflight_per_task'
12+
private intervalId: ReturnType<typeof setInterval> | null = null
13+
14+
constructor() {
15+
if (this.isConfigured()) {
16+
this.client = new CloudWatchClient({
17+
region: env.AWS_REGION || 'us-east-1',
18+
credentials: {
19+
accessKeyId: env.AWS_ACCESS_KEY!,
20+
secretAccessKey: env.AWS_ACCESS_SECRET!,
21+
},
22+
})
23+
}
24+
}
25+
26+
private isConfigured(): boolean {
27+
return Boolean(env.AWS_ACCESS_KEY && env.AWS_ACCESS_SECRET)
28+
}
29+
30+
incrementInflightRequests(): void {
31+
this.inflightRequests++
32+
}
33+
34+
decrementInflightRequests(): void {
35+
this.inflightRequests = Math.max(0, this.inflightRequests - 1)
36+
}
37+
38+
getInflightRequests(): number {
39+
return this.inflightRequests
40+
}
41+
42+
async emitMetric(): Promise<void> {
43+
if (!this.isConfigured() || !this.client) {
44+
return
45+
}
46+
47+
try {
48+
const command = new PutMetricDataCommand({
49+
Namespace: this.namespace,
50+
MetricData: [
51+
{
52+
MetricName: this.metricName,
53+
Value: this.inflightRequests,
54+
Unit: 'Count',
55+
Timestamp: new Date(),
56+
},
57+
],
58+
})
59+
60+
await this.client.send(command)
61+
} catch (error) {
62+
console.error('Failed to emit CloudWatch metric:', error)
63+
}
64+
}
65+
66+
startPeriodicEmission(intervalMs = 10000): void {
67+
if (!this.isConfigured() || this.intervalId) {
68+
return
69+
}
70+
71+
this.intervalId = setInterval(() => {
72+
this.emitMetric()
73+
}, intervalMs)
74+
75+
this.intervalId.unref()
76+
}
77+
78+
stopPeriodicEmission(): void {
79+
if (this.intervalId) {
80+
clearInterval(this.intervalId)
81+
this.intervalId = null
82+
}
83+
}
84+
}
85+
86+
export const cloudWatchMetrics = new CloudWatchMetricsService()

0 commit comments

Comments
 (0)