From 44616abb611837bf5f4aa304699ebcb8aab381a4 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Tue, 4 Nov 2025 21:39:42 +0100 Subject: [PATCH 1/2] Stats callback --- lib/kafkajs/_admin.js | 15 +++++++++++++++ lib/kafkajs/_consumer.js | 16 ++++++++++++++++ lib/kafkajs/_producer.js | 15 +++++++++++++++ 3 files changed, 46 insertions(+) diff --git a/lib/kafkajs/_admin.js b/lib/kafkajs/_admin.js index ed713546..a4ce1bac 100644 --- a/lib/kafkajs/_admin.js +++ b/lib/kafkajs/_admin.js @@ -174,6 +174,12 @@ class Admin { /* Delete properties which are already processed, or cannot be passed to node-rdkafka */ delete rdKafkaConfig.kafkaJS; + if (Object.hasOwn(rdKafkaConfig, 'stats_cb')) { + if (typeof rdKafkaConfig['stats_cb'] === 'function') + this.#statsCb = rdKafkaConfig['stats_cb']; + delete rdKafkaConfig['stats_cb']; + } + return rdKafkaConfig; } @@ -203,6 +209,12 @@ class Admin { } } + /** + * Callback for the event.stats event, if defined. + * @private + */ + #statsCb = null; + /** * Set up the client and connect to the bootstrap brokers. * @returns {Promise} Resolves when connection is complete, rejects on error. @@ -225,6 +237,9 @@ class Admin { 'ready': this.#readyCb.bind(this), 'event.log': (msg) => loggerTrampoline(msg, this.#logger), }); + if (this.#statsCb) { + this.#internalClient.on('event.stats', this.#statsCb.bind(this)); + } } else { const underlyingClient = this.#existingClient._getInternalClient(); if (!underlyingClient) { diff --git a/lib/kafkajs/_consumer.js b/lib/kafkajs/_consumer.js index 92efb5c3..37b045de 100644 --- a/lib/kafkajs/_consumer.js +++ b/lib/kafkajs/_consumer.js @@ -731,6 +731,12 @@ class Consumer { delete rdKafkaConfig['js.consumer.max.cache.size.per.worker.ms']; } + if (Object.hasOwn(rdKafkaConfig, 'stats_cb')) { + if (typeof rdKafkaConfig['stats_cb'] === 'function') + this.#statsCb = rdKafkaConfig['stats_cb']; + delete rdKafkaConfig['stats_cb']; + } + return rdKafkaConfig; } @@ -770,6 +776,13 @@ class Consumer { } } + + /** + * Callback for the event.stats event, if defined. + * @private + */ + #statsCb = null; + /** * Converts headers returned by node-rdkafka into a format that can be used by the eachMessage/eachBatch callback. * @param {import("../..").MessageHeader[] | undefined} messageHeaders @@ -1167,6 +1180,9 @@ class Consumer { this.#internalClient.on('error', this.#errorCb.bind(this)); this.#internalClient.on('event.error', this.#errorCb.bind(this)); this.#internalClient.on('event.log', (msg) => loggerTrampoline(msg, this.#logger)); + if (this.#statsCb) { + this.#internalClient.on('event.stats', this.#statsCb.bind(this)); + } return new Promise((resolve, reject) => { this.#connectPromiseFunc = { resolve, reject }; diff --git a/lib/kafkajs/_producer.js b/lib/kafkajs/_producer.js index bf1e058d..2d780afe 100644 --- a/lib/kafkajs/_producer.js +++ b/lib/kafkajs/_producer.js @@ -269,6 +269,12 @@ class Producer { /* TODO: Add a warning if dr_cb is set? Or else, create a trampoline for it. */ rdKafkaConfig.dr_cb = true; + if (Object.hasOwn(rdKafkaConfig, 'stats_cb')) { + if (typeof rdKafkaConfig['stats_cb'] === 'function') + this.#statsCb = rdKafkaConfig['stats_cb']; + delete rdKafkaConfig['stats_cb']; + } + return rdKafkaConfig; } @@ -373,6 +379,12 @@ class Producer { this.#logger.error(err, this.#createProducerBindingMessageMetadata()); } + /** + * Callback for the event.stats event, if defined. + * @private + */ + #statsCb = null; + /** * Set up the client and connect to the bootstrap brokers. * @@ -394,6 +406,9 @@ class Producer { this.#internalClient.on('event.error', this.#errorCb.bind(this)); this.#internalClient.on('error', this.#errorCb.bind(this)); this.#internalClient.on('event.log', (msg) => loggerTrampoline(msg, this.#logger)); + if (this.#statsCb) { + this.#internalClient.on('event.stats', this.#statsCb.bind(this)); + } return new Promise((resolve, reject) => { this.#connectPromiseFunc = { resolve, reject }; From fc56b7e0d235cd363a159f218d59e501c0ad9d4d Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Fri, 7 Nov 2025 15:54:26 +0100 Subject: [PATCH 2/2] Statistics callback is available when using the promisified API with all client types --- CHANGELOG.md | 4 ++- lib/kafkajs/_admin.js | 2 +- src/admin.cc | 6 +++++ src/connection.cc | 27 ++++++++++++++++++- src/connection.h | 2 ++ src/producer.cc | 27 +------------------ src/producer.h | 3 +-- test/promisified/admin/statsCallback.spec.js | 25 +++++++++++++++++ .../consumer/statsCallback.spec.js | 26 ++++++++++++++++++ .../producer/statsCallback.spec.js | 25 +++++++++++++++++ 10 files changed, 116 insertions(+), 31 deletions(-) create mode 100644 test/promisified/admin/statsCallback.spec.js create mode 100644 test/promisified/consumer/statsCallback.spec.js create mode 100644 test/promisified/producer/statsCallback.spec.js diff --git a/CHANGELOG.md b/CHANGELOG.md index 7c05b809..c4656fcb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,9 @@ v1.7.0 is a feature release. It is supported for all usage. 1. Configurable batch size through the `js.consumer.max.batch.size` property and cache size through the `js.consumer.max.cache.size.per.worker.ms` property (#393). -2. Fix for at-least-once guarantee not ensured in case a seek happens on one +2. Statistics callback now available when using the promisified API with all + client types (#399). +3. Fix for at-least-once guarantee not ensured in case a seek happens on one partition and there are messages being fetched about other partitions (#393). diff --git a/lib/kafkajs/_admin.js b/lib/kafkajs/_admin.js index a4ce1bac..e51f7d30 100644 --- a/lib/kafkajs/_admin.js +++ b/lib/kafkajs/_admin.js @@ -238,7 +238,7 @@ class Admin { 'event.log': (msg) => loggerTrampoline(msg, this.#logger), }); if (this.#statsCb) { - this.#internalClient.on('event.stats', this.#statsCb.bind(this)); + this.#internalClient.on('event.stats', this.#statsCb.bind(this)); } } else { const underlyingClient = this.#existingClient._getInternalClient(); diff --git a/src/admin.cc b/src/admin.cc index 5bccccac..84603c27 100644 --- a/src/admin.cc +++ b/src/admin.cc @@ -75,6 +75,12 @@ Baton AdminClient::Connect() { DeactivateDispatchers(); } + std::string stats_interval_ms; + if (this->m_gconfig->get("statistics.interval.ms", stats_interval_ms) == + RdKafka::Conf::CONF_OK) { + Connection::SetPollInBackground(true); + } + return baton; } diff --git a/src/connection.cc b/src/connection.cc index 189c10f1..637feac3 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -42,7 +42,8 @@ namespace NodeKafka { Connection::Connection(Conf* gconfig, Conf* tconfig): m_event_cb(), m_gconfig(gconfig), - m_tconfig(tconfig) { + m_tconfig(tconfig), + m_is_background_polling(false) { std::string errstr; m_client = NULL; @@ -131,6 +132,30 @@ Baton Connection::setupSaslOAuthBearerBackgroundQueue() { return rdkafkaErrorToBaton(error); } +Baton Connection::SetPollInBackground(bool set) { + scoped_shared_read_lock lock(m_connection_lock); + rd_kafka_t* rk = this->m_client->c_ptr(); + if (!IsConnected()) { + return Baton(RdKafka::ERR__STATE, "Client is disconnected"); + } + + if (set && !m_is_background_polling) { + m_is_background_polling = true; + rd_kafka_queue_t* main_q = rd_kafka_queue_get_main(rk); + rd_kafka_queue_t* background_q = rd_kafka_queue_get_background(rk); + rd_kafka_queue_forward(main_q, background_q); + rd_kafka_queue_destroy(main_q); + rd_kafka_queue_destroy(background_q); + } else if (!set && m_is_background_polling) { + m_is_background_polling = false; + rd_kafka_queue_t* main_q = rd_kafka_queue_get_main(rk); + rd_kafka_queue_forward(main_q, NULL); + rd_kafka_queue_destroy(main_q); + } + + return Baton(RdKafka::ERR_NO_ERROR); +} + RdKafka::TopicPartition* Connection::GetPartition(std::string &topic) { return RdKafka::TopicPartition::create(topic, RdKafka::Topic::PARTITION_UA); } diff --git a/src/connection.h b/src/connection.h index 532468fe..e7371825 100644 --- a/src/connection.h +++ b/src/connection.h @@ -88,6 +88,7 @@ class Connection : public Nan::ObjectWrap { Baton setupSaslOAuthBearerConfig(); Baton setupSaslOAuthBearerBackgroundQueue(); + Baton SetPollInBackground(bool set); bool m_is_closing; @@ -97,6 +98,7 @@ class Connection : public Nan::ObjectWrap { uv_rwlock_t m_connection_lock; bool m_has_underlying = false; + bool m_is_background_polling; RdKafka::Handle* m_client; diff --git a/src/producer.cc b/src/producer.cc index 68d8ad75..92751f4a 100644 --- a/src/producer.cc +++ b/src/producer.cc @@ -33,8 +33,7 @@ namespace NodeKafka { Producer::Producer(Conf* gconfig, Conf* tconfig): Connection(gconfig, tconfig), m_dr_cb(), - m_partitioner_cb(), - m_is_background_polling(false) { + m_partitioner_cb() { std::string errstr; if (m_tconfig) @@ -345,30 +344,6 @@ void Producer::Poll() { m_client->poll(0); } -Baton Producer::SetPollInBackground(bool set) { - scoped_shared_read_lock lock(m_connection_lock); - rd_kafka_t* rk = this->m_client->c_ptr(); - if (!IsConnected()) { - return Baton(RdKafka::ERR__STATE, "Producer is disconnected"); - } - - if (set && !m_is_background_polling) { - m_is_background_polling = true; - rd_kafka_queue_t* main_q = rd_kafka_queue_get_main(rk); - rd_kafka_queue_t* background_q = rd_kafka_queue_get_background(rk); - rd_kafka_queue_forward(main_q, background_q); - rd_kafka_queue_destroy(main_q); - rd_kafka_queue_destroy(background_q); - } else if (!set && m_is_background_polling) { - m_is_background_polling = false; - rd_kafka_queue_t* main_q = rd_kafka_queue_get_main(rk); - rd_kafka_queue_forward(main_q, NULL); - rd_kafka_queue_destroy(main_q); - } - - return Baton(RdKafka::ERR_NO_ERROR); -} - void Producer::ConfigureCallback(const std::string& string_key, const v8::Local& cb, bool add) { if (string_key.compare("delivery_cb") == 0) { diff --git a/src/producer.h b/src/producer.h index 8df138e8..97a37b52 100644 --- a/src/producer.h +++ b/src/producer.h @@ -55,7 +55,7 @@ class Producer : public Connection { Baton Connect(); void Disconnect(); void Poll(); - Baton SetPollInBackground(bool); + using Connection::SetPollInBackground; #if RD_KAFKA_VERSION > 0x00090200 Baton Flush(int timeout_ms); #endif @@ -117,7 +117,6 @@ class Producer : public Connection { Callbacks::Delivery m_dr_cb; Callbacks::Partitioner m_partitioner_cb; - bool m_is_background_polling; }; } // namespace NodeKafka diff --git a/test/promisified/admin/statsCallback.spec.js b/test/promisified/admin/statsCallback.spec.js new file mode 100644 index 00000000..d1a35a8a --- /dev/null +++ b/test/promisified/admin/statsCallback.spec.js @@ -0,0 +1,25 @@ +jest.setTimeout(10000); + +const { + createAdmin, +} = require('../testhelpers'); + +describe('Admin', () => { + it('can receive stats callbacks', + async () => { + let numCalls = 0; + let admin = createAdmin({}, { + 'statistics.interval.ms': '100', + 'stats_cb': function (event) { + expect(event).toHaveProperty('message'); + expect(event.message).toContain('"type":'); + numCalls++; + } + }); + await admin.connect(); + await new Promise((resolve) => setTimeout(resolve, 400)); + await admin.disconnect(); + expect(numCalls).toBeGreaterThanOrEqual(3); + } + ); +}); diff --git a/test/promisified/consumer/statsCallback.spec.js b/test/promisified/consumer/statsCallback.spec.js new file mode 100644 index 00000000..a8a1bff6 --- /dev/null +++ b/test/promisified/consumer/statsCallback.spec.js @@ -0,0 +1,26 @@ +jest.setTimeout(10000); + +const { + createConsumer, +} = require('../testhelpers'); + +describe('Consumer', () => { + it('can receive stats callbacks', + async () => { + let numCalls = 0; + let consumer = createConsumer({}, { + 'group.id': `group-${Date.now()}`, + 'statistics.interval.ms': '100', + 'stats_cb': function (event) { + expect(event).toHaveProperty('message'); + expect(event.message).toContain('"type":'); + numCalls++; + } + }); + await consumer.connect(); + await new Promise((resolve) => setTimeout(resolve, 400)); + await consumer.disconnect(); + expect(numCalls).toBeGreaterThanOrEqual(3); + } + ); +}); diff --git a/test/promisified/producer/statsCallback.spec.js b/test/promisified/producer/statsCallback.spec.js new file mode 100644 index 00000000..1b5acc18 --- /dev/null +++ b/test/promisified/producer/statsCallback.spec.js @@ -0,0 +1,25 @@ +jest.setTimeout(10000); + +const { + createProducer, +} = require('../testhelpers'); + +describe('Producer', () => { + it('can receive stats callbacks', + async () => { + let numCalls = 0; + let producer = createProducer({}, { + 'statistics.interval.ms': '100', + 'stats_cb': function (event) { + expect(event).toHaveProperty('message'); + expect(event.message).toContain('"type":'); + numCalls++; + } + }); + await producer.connect(); + await new Promise((resolve) => setTimeout(resolve, 400)); + await producer.disconnect(); + expect(numCalls).toBeGreaterThanOrEqual(3); + } + ); +});