Skip to content

Commit 2270af0

Browse files
authored
feat(metrics): add basic kafka metrics (#11)
* feat(metrics): add basic kafka metrics Signed-off-by: Roberto Bianchi <roberto.bianchi@spendesk.com> * update Signed-off-by: Roberto Bianchi <roberto.bianchi@spendesk.com> * chore(test): add Signed-off-by: Roberto Bianchi <roberto.bianchi@spendesk.com> * chore(test): call directly prom client Signed-off-by: Roberto Bianchi <roberto.bianchi@spendesk.com> * chore(metric): remove duplicated Signed-off-by: Roberto Bianchi <roberto.bianchi@spendesk.com> * fix Signed-off-by: Roberto Bianchi <roberto.bianchi@spendesk.com> * chore(test): remove metrics mock Signed-off-by: Roberto Bianchi <roberto.bianchi@spendesk.com> * feat(consumer): enable metrics Signed-off-by: Roberto Bianchi <roberto.bianchi@spendesk.com> --------- Signed-off-by: Roberto Bianchi <roberto.bianchi@spendesk.com>
1 parent aa3c6c3 commit 2270af0

File tree

6 files changed

+401
-5
lines changed

6 files changed

+401
-5
lines changed

lib/metrics.js

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
export function initMetrics (prometheus) {
2+
if (!prometheus?.registry || !prometheus?.client) return null
3+
const { client, registry } = prometheus
4+
5+
return {
6+
messagesInFlight: new client.Gauge({
7+
name: 'kafka_hooks_messages_in_flight',
8+
help: 'Number of messages currently being processed',
9+
labelNames: ['topic'],
10+
registers: [registry]
11+
}),
12+
13+
httpRequestDuration: new client.Histogram({
14+
name: 'kafka_hooks_http_request_duration_seconds',
15+
help: 'HTTP request duration for webhook deliveries',
16+
labelNames: ['topic', 'method', 'status_code'],
17+
buckets: [0.1, 0.5, 1, 2, 5, 10],
18+
registers: [registry]
19+
}),
20+
21+
dlqMessages: new client.Counter({
22+
name: 'kafka_hooks_dlq_messages_total',
23+
help: 'Total number of messages sent to the DLQ (Dead Letter Queue)',
24+
labelNames: ['topic', 'reason'],
25+
registers: [registry]
26+
})
27+
}
28+
}

lib/plugin.js

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,17 @@ import {
1717
pathParamsHeader,
1818
queryStringHeader
1919
} from './definitions.js'
20+
import { initMetrics } from './metrics.js'
2021

21-
export async function processMessage (logger, dlqProducer, mappings, message) {
22+
export async function processMessage (logger, dlqProducer, mappings, message, metrics) {
2223
const topic = message.topic
2324
const value = message.value
2425
const { url, dlq, method, retryDelay, headers: protoHeaders, retries, includeAttemptInRequests } = mappings[topic]
26+
27+
if (metrics) {
28+
metrics.messagesInFlight.inc({ topic })
29+
}
30+
2531
const headers = {
2632
...protoHeaders,
2733
...Object.fromEntries(message.headers),
@@ -30,6 +36,8 @@ export async function processMessage (logger, dlqProducer, mappings, message) {
3036

3137
// Perform the delivery
3238
const errors = []
39+
let lastStatusCode = null
40+
3341
for (let attempt = 0; attempt < retries; attempt++) {
3442
try {
3543
if (includeAttemptInRequests) {
@@ -38,7 +46,7 @@ export async function processMessage (logger, dlqProducer, mappings, message) {
3846

3947
headers['content-length'] = Buffer.byteLength(value)
4048

41-
const requestedAt = new Date()
49+
const requestedAt = Date.now()
4250
const {
4351
statusCode,
4452
headers: responseHeaders,
@@ -49,14 +57,29 @@ export async function processMessage (logger, dlqProducer, mappings, message) {
4957
body: value
5058
})
5159

60+
lastStatusCode = statusCode
61+
62+
if (metrics) {
63+
const duration = (Date.now() - requestedAt) / 1000
64+
metrics.httpRequestDuration.observe(
65+
{ topic, method, status_code: statusCode },
66+
duration
67+
)
68+
}
69+
5270
// Success, nothing else to do
5371
if (statusCode < BAD_REQUEST) {
5472
await body.dump()
73+
74+
if (metrics) {
75+
metrics.messagesInFlight.dec({ topic })
76+
}
77+
5578
return
5679
}
5780

5881
const error = new HttpError(statusCode, 'Webhook replied with an error', {
59-
requestedAt: requestedAt.toISOString(),
82+
requestedAt: new Date(requestedAt).toISOString(),
6083
attempt,
6184
headers: responseHeaders,
6285
/* c8 ignore next */
@@ -90,6 +113,17 @@ export async function processMessage (logger, dlqProducer, mappings, message) {
90113
}
91114
}
92115

116+
if (metrics) {
117+
metrics.messagesInFlight.dec({ topic })
118+
119+
if (dlq) {
120+
metrics.dlqMessages.inc({
121+
topic,
122+
reason: lastStatusCode ? `http_${lastStatusCode}` : 'network_error'
123+
})
124+
}
125+
}
126+
93127
if (!dlq) {
94128
logger.error({ dlqMessage }, 'Error while processing message')
95129
return
@@ -215,14 +249,15 @@ export async function setupKafka (server, configuration) {
215249
server.log.info(`Kafka consumer started with concurrency ${configuration.kafka.concurrency} ...`)
216250

217251
const responseProcessor = createResponseProcessor(server.log, pendingRequests)
252+
const metrics = initMetrics(globalThis.platformatic?.prometheus)
218253

219254
forEach(
220255
stream,
221256
message => {
222257
if (responseMappings[message.topic]) {
223258
return responseProcessor(message)
224259
} else {
225-
return processMessage(server.log, dlqProducer, topicsMappings, message)
260+
return processMessage(server.log, dlqProducer, topicsMappings, message, metrics)
226261
}
227262
},
228263
/* c8 ignore next 8 */

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
"neostandard": "^0.12.1",
5757
"pino": "^9.6.0",
5858
"pino-pretty": "^13.0.0",
59+
"prom-client": "^15.1.3",
5960
"prettier": "^3.5.3",
6061
"wait-on": "^8.0.3"
6162
},

test/fixtures/kafka-monitor.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ export async function createMonitor (valueDeserializer = safeJsonDeserializer) {
1818
maxWaitTime: 500,
1919
deserializers: {
2020
value: valueDeserializer
21-
}
21+
},
22+
/* c8 ignore next */
23+
metrics: globalThis.platformatic?.prometheus
2224
})
2325

2426
const topics = ['plt-kafka-hooks-success', 'plt-kafka-hooks-fail', 'plt-kafka-hooks-retry', defaultDlqTopic]

test/metrics.test.js

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
import { test } from 'node:test'
2+
import { strictEqual, deepStrictEqual, ok } from 'node:assert'
3+
import { initMetrics } from '../lib/metrics.js'
4+
import promClient from 'prom-client'
5+
6+
test('initMetrics should return null when prometheus is not provided', async () => {
7+
const result = initMetrics()
8+
strictEqual(result, null)
9+
})
10+
11+
test('initMetrics should return null when prometheus is null', async () => {
12+
const result = initMetrics(null)
13+
strictEqual(result, null)
14+
})
15+
16+
test('initMetrics should return null when prometheus.registry is missing', async () => {
17+
const prometheus = { client: promClient }
18+
const result = initMetrics(prometheus)
19+
strictEqual(result, null)
20+
})
21+
22+
test('initMetrics should return null when prometheus.client is missing', async () => {
23+
const prometheus = { registry: new promClient.Registry() }
24+
const result = initMetrics(prometheus)
25+
strictEqual(result, null)
26+
})
27+
28+
test('initMetrics should return metrics object when prometheus is properly configured', async () => {
29+
const registry = new promClient.Registry()
30+
const prometheus = { registry, client: promClient }
31+
const metrics = initMetrics(prometheus)
32+
33+
ok(metrics, 'Metrics object should be created')
34+
ok(metrics.messagesInFlight, 'messagesInFlight metric should exist')
35+
ok(metrics.httpRequestDuration, 'httpRequestDuration metric should exist')
36+
ok(metrics.dlqMessages, 'dlqMessages metric should exist')
37+
})
38+
39+
test('messagesInFlight should be configured as Gauge with correct properties', async () => {
40+
const registry = new promClient.Registry()
41+
const prometheus = { registry, client: promClient }
42+
const metrics = initMetrics(prometheus)
43+
44+
const gauge = metrics.messagesInFlight
45+
strictEqual(gauge.name, 'kafka_hooks_messages_in_flight')
46+
strictEqual(gauge.help, 'Number of messages currently being processed')
47+
deepStrictEqual(gauge.labelNames, ['topic'])
48+
})
49+
50+
test('httpRequestDuration should be configured as Histogram with correct properties', async () => {
51+
const registry = new promClient.Registry()
52+
const prometheus = { registry, client: promClient }
53+
const metrics = initMetrics(prometheus)
54+
55+
const histogram = metrics.httpRequestDuration
56+
strictEqual(histogram.name, 'kafka_hooks_http_request_duration_seconds')
57+
strictEqual(histogram.help, 'HTTP request duration for webhook deliveries')
58+
deepStrictEqual(histogram.labelNames, ['topic', 'method', 'status_code'])
59+
deepStrictEqual(histogram.buckets, [0.1, 0.5, 1, 2, 5, 10])
60+
})
61+
62+
test('dlqMessages should be configured as Counter with correct properties', async () => {
63+
const registry = new promClient.Registry()
64+
const prometheus = { registry, client: promClient }
65+
const metrics = initMetrics(prometheus)
66+
67+
const counter = metrics.dlqMessages
68+
strictEqual(counter.name, 'kafka_hooks_dlq_messages_total')
69+
strictEqual(counter.help, 'Total number of messages sent to the DLQ (Dead Letter Queue)')
70+
deepStrictEqual(counter.labelNames, ['topic', 'reason'])
71+
})
72+
73+
test('metrics should be functional - Counter operations', async () => {
74+
const registry = new promClient.Registry()
75+
const prometheus = { registry, client: promClient }
76+
const metrics = initMetrics(prometheus)
77+
78+
metrics.dlqMessages.inc({ topic: 'test-topic', reason: 'http_500' })
79+
metrics.dlqMessages.inc({ topic: 'test-topic', reason: 'network_error' }, 3)
80+
81+
const updatedMetrics = await registry.getMetricsAsJSON()
82+
const dlqMetric = updatedMetrics.find(m => m.name === 'kafka_hooks_dlq_messages_total')
83+
84+
const http500Value = dlqMetric.values.find(v =>
85+
v.labels.topic === 'test-topic' && v.labels.reason === 'http_500'
86+
).value
87+
const networkErrorValue = dlqMetric.values.find(v =>
88+
v.labels.topic === 'test-topic' && v.labels.reason === 'network_error'
89+
).value
90+
91+
strictEqual(http500Value, 1)
92+
strictEqual(networkErrorValue, 3)
93+
})
94+
95+
test('metrics should be functional - Gauge operations', async () => {
96+
const registry = new promClient.Registry()
97+
const prometheus = { registry, client: promClient }
98+
const metrics = initMetrics(prometheus)
99+
100+
metrics.messagesInFlight.inc({ topic: 'test-topic' })
101+
metrics.messagesInFlight.inc({ topic: 'test-topic' }, 2)
102+
103+
let registryMetrics = await registry.getMetricsAsJSON()
104+
let gaugeMetric = registryMetrics.find(m => m.name === 'kafka_hooks_messages_in_flight')
105+
let value = gaugeMetric.values.find(v => v.labels.topic === 'test-topic').value
106+
strictEqual(value, 3)
107+
108+
metrics.messagesInFlight.dec({ topic: 'test-topic' })
109+
110+
registryMetrics = await registry.getMetricsAsJSON()
111+
gaugeMetric = registryMetrics.find(m => m.name === 'kafka_hooks_messages_in_flight')
112+
value = gaugeMetric.values.find(v => v.labels.topic === 'test-topic').value
113+
strictEqual(value, 2)
114+
115+
metrics.messagesInFlight.set({ topic: 'test-topic' }, 5)
116+
117+
registryMetrics = await registry.getMetricsAsJSON()
118+
gaugeMetric = registryMetrics.find(m => m.name === 'kafka_hooks_messages_in_flight')
119+
value = gaugeMetric.values.find(v => v.labels.topic === 'test-topic').value
120+
strictEqual(value, 5)
121+
})
122+
123+
test('metrics should be functional - Histogram operations', async () => {
124+
const registry = new promClient.Registry()
125+
const prometheus = { registry, client: promClient }
126+
const metrics = initMetrics(prometheus)
127+
128+
metrics.httpRequestDuration.observe({ topic: 'test-topic', method: 'POST', status_code: '200' }, 0.5)
129+
metrics.httpRequestDuration.observe({ topic: 'test-topic', method: 'POST', status_code: '200' }, 1.2)
130+
metrics.httpRequestDuration.observe({ topic: 'test-topic', method: 'POST', status_code: '500' }, 2.1)
131+
132+
const registryMetrics = await registry.getMetricsAsJSON()
133+
const histogramMetric = registryMetrics.find(m => m.name === 'kafka_hooks_http_request_duration_seconds')
134+
135+
const countValue = histogramMetric.values.find(v =>
136+
v.labels.topic === 'test-topic' &&
137+
v.labels.method === 'POST' &&
138+
v.labels.status_code === '200' &&
139+
v.metricName === 'kafka_hooks_http_request_duration_seconds_count'
140+
).value
141+
142+
const sumValue = histogramMetric.values.find(v =>
143+
v.labels.topic === 'test-topic' &&
144+
v.labels.method === 'POST' &&
145+
v.labels.status_code === '200' &&
146+
v.metricName === 'kafka_hooks_http_request_duration_seconds_sum'
147+
).value
148+
149+
strictEqual(countValue, 2) // Two observations for 200 status code
150+
strictEqual(sumValue, 1.7) // 0.5 + 1.2 = 1.7
151+
})

0 commit comments

Comments
 (0)