Skip to content

Commit c8d5c77

Browse files
committed
Stats callback
1 parent 00187f4 commit c8d5c77

File tree

3 files changed

+46
-0
lines changed

3 files changed

+46
-0
lines changed

lib/kafkajs/_admin.js

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,12 @@ class Admin {
174174
/* Delete properties which are already processed, or cannot be passed to node-rdkafka */
175175
delete rdKafkaConfig.kafkaJS;
176176

177+
if (Object.hasOwn(rdKafkaConfig, 'stats_cb')) {
178+
if (typeof rdKafkaConfig['stats_cb'] === 'function')
179+
this.#statsCb = rdKafkaConfig['stats_cb'];
180+
delete rdKafkaConfig['stats_cb'];
181+
}
182+
177183
return rdKafkaConfig;
178184
}
179185

@@ -203,6 +209,12 @@ class Admin {
203209
}
204210
}
205211

212+
/**
213+
* Callback for the event.stats event, if defined.
214+
* @private
215+
*/
216+
#statsCb = null;
217+
206218
/**
207219
* Set up the client and connect to the bootstrap brokers.
208220
* @returns {Promise<void>} Resolves when connection is complete, rejects on error.
@@ -225,6 +237,9 @@ class Admin {
225237
'ready': this.#readyCb.bind(this),
226238
'event.log': (msg) => loggerTrampoline(msg, this.#logger),
227239
});
240+
if (this.#statsCb) {
241+
this.#internalClient.on('event.stats', this.#statsCb.bind(this));
242+
}
228243
} else {
229244
const underlyingClient = this.#existingClient._getInternalClient();
230245
if (!underlyingClient) {

lib/kafkajs/_consumer.js

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -731,6 +731,12 @@ class Consumer {
731731
delete rdKafkaConfig['js.consumer.max.cache.size.per.worker.ms'];
732732
}
733733

734+
if (Object.hasOwn(rdKafkaConfig, 'stats_cb')) {
735+
if (typeof rdKafkaConfig['stats_cb'] === 'function')
736+
this.#statsCb = rdKafkaConfig['stats_cb'];
737+
delete rdKafkaConfig['stats_cb'];
738+
}
739+
734740
return rdKafkaConfig;
735741
}
736742

@@ -770,6 +776,13 @@ class Consumer {
770776
}
771777
}
772778

779+
780+
/**
781+
* Callback for the event.stats event, if defined.
782+
* @private
783+
*/
784+
#statsCb = null;
785+
773786
/**
774787
* Converts headers returned by node-rdkafka into a format that can be used by the eachMessage/eachBatch callback.
775788
* @param {import("../..").MessageHeader[] | undefined} messageHeaders
@@ -1167,6 +1180,9 @@ class Consumer {
11671180
this.#internalClient.on('error', this.#errorCb.bind(this));
11681181
this.#internalClient.on('event.error', this.#errorCb.bind(this));
11691182
this.#internalClient.on('event.log', (msg) => loggerTrampoline(msg, this.#logger));
1183+
if (this.#statsCb) {
1184+
this.#internalClient.on('event.stats', this.#statsCb.bind(this));
1185+
}
11701186

11711187
return new Promise((resolve, reject) => {
11721188
this.#connectPromiseFunc = { resolve, reject };

lib/kafkajs/_producer.js

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,12 @@ class Producer {
269269
/* TODO: Add a warning if dr_cb is set? Or else, create a trampoline for it. */
270270
rdKafkaConfig.dr_cb = true;
271271

272+
if (Object.hasOwn(rdKafkaConfig, 'stats_cb')) {
273+
if (typeof rdKafkaConfig['stats_cb'] === 'function')
274+
this.#statsCb = rdKafkaConfig['stats_cb'];
275+
delete rdKafkaConfig['stats_cb'];
276+
}
277+
272278
return rdKafkaConfig;
273279
}
274280

@@ -373,6 +379,12 @@ class Producer {
373379
this.#logger.error(err, this.#createProducerBindingMessageMetadata());
374380
}
375381

382+
/**
383+
* Callback for the event.stats event, if defined.
384+
* @private
385+
*/
386+
#statsCb = null;
387+
376388
/**
377389
* Set up the client and connect to the bootstrap brokers.
378390
*
@@ -394,6 +406,9 @@ class Producer {
394406
this.#internalClient.on('event.error', this.#errorCb.bind(this));
395407
this.#internalClient.on('error', this.#errorCb.bind(this));
396408
this.#internalClient.on('event.log', (msg) => loggerTrampoline(msg, this.#logger));
409+
if (this.#statsCb) {
410+
this.#internalClient.on('event.stats', this.#statsCb.bind(this));
411+
}
397412

398413
return new Promise((resolve, reject) => {
399414
this.#connectPromiseFunc = { resolve, reject };

0 commit comments

Comments
 (0)