Skip to content

Commit 744588a

Browse files
authored
Stats callback in promisified api (#399)
Statistics callback is available when using the promisified API with all client types
1 parent 728c609 commit 744588a

File tree

12 files changed

+161
-30
lines changed

12 files changed

+161
-30
lines changed

CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ v1.7.0 is a feature release. It is supported for all usage.
77
1. Configurable batch size through the `js.consumer.max.batch.size` property
88
and cache size through the `js.consumer.max.cache.size.per.worker.ms`
99
property (#393).
10-
2. Fix for at-least-once guarantee not ensured in case a seek happens on one
10+
2. Statistics callback now available when using the promisified API with all
11+
client types (#399).
12+
3. Fix for at-least-once guarantee not ensured in case a seek happens on one
1113
partition and there are messages being fetched about other partitions (#393).
1214

1315

lib/kafkajs/_admin.js

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,12 @@ class Admin {
174174
/* Delete properties which are already processed, or cannot be passed to node-rdkafka */
175175
delete rdKafkaConfig.kafkaJS;
176176

177+
if (Object.hasOwn(rdKafkaConfig, 'stats_cb')) {
178+
if (typeof rdKafkaConfig['stats_cb'] === 'function')
179+
this.#statsCb = rdKafkaConfig['stats_cb'];
180+
delete rdKafkaConfig['stats_cb'];
181+
}
182+
177183
return rdKafkaConfig;
178184
}
179185

@@ -203,6 +209,12 @@ class Admin {
203209
}
204210
}
205211

212+
/**
213+
* Callback for the event.stats event, if defined.
214+
* @private
215+
*/
216+
#statsCb = null;
217+
206218
/**
207219
* Set up the client and connect to the bootstrap brokers.
208220
* @returns {Promise<void>} Resolves when connection is complete, rejects on error.
@@ -225,6 +237,9 @@ class Admin {
225237
'ready': this.#readyCb.bind(this),
226238
'event.log': (msg) => loggerTrampoline(msg, this.#logger),
227239
});
240+
if (this.#statsCb) {
241+
this.#internalClient.on('event.stats', this.#statsCb.bind(this));
242+
}
228243
} else {
229244
const underlyingClient = this.#existingClient._getInternalClient();
230245
if (!underlyingClient) {

lib/kafkajs/_consumer.js

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -731,6 +731,12 @@ class Consumer {
731731
delete rdKafkaConfig['js.consumer.max.cache.size.per.worker.ms'];
732732
}
733733

734+
if (Object.hasOwn(rdKafkaConfig, 'stats_cb')) {
735+
if (typeof rdKafkaConfig['stats_cb'] === 'function')
736+
this.#statsCb = rdKafkaConfig['stats_cb'];
737+
delete rdKafkaConfig['stats_cb'];
738+
}
739+
734740
return rdKafkaConfig;
735741
}
736742

@@ -770,6 +776,13 @@ class Consumer {
770776
}
771777
}
772778

779+
780+
/**
781+
* Callback for the event.stats event, if defined.
782+
* @private
783+
*/
784+
#statsCb = null;
785+
773786
/**
774787
* Converts headers returned by node-rdkafka into a format that can be used by the eachMessage/eachBatch callback.
775788
* @param {import("../..").MessageHeader[] | undefined} messageHeaders
@@ -1167,6 +1180,9 @@ class Consumer {
11671180
this.#internalClient.on('error', this.#errorCb.bind(this));
11681181
this.#internalClient.on('event.error', this.#errorCb.bind(this));
11691182
this.#internalClient.on('event.log', (msg) => loggerTrampoline(msg, this.#logger));
1183+
if (this.#statsCb) {
1184+
this.#internalClient.on('event.stats', this.#statsCb.bind(this));
1185+
}
11701186

11711187
return new Promise((resolve, reject) => {
11721188
this.#connectPromiseFunc = { resolve, reject };

lib/kafkajs/_producer.js

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,12 @@ class Producer {
269269
/* TODO: Add a warning if dr_cb is set? Or else, create a trampoline for it. */
270270
rdKafkaConfig.dr_cb = true;
271271

272+
if (Object.hasOwn(rdKafkaConfig, 'stats_cb')) {
273+
if (typeof rdKafkaConfig['stats_cb'] === 'function')
274+
this.#statsCb = rdKafkaConfig['stats_cb'];
275+
delete rdKafkaConfig['stats_cb'];
276+
}
277+
272278
return rdKafkaConfig;
273279
}
274280

@@ -373,6 +379,12 @@ class Producer {
373379
this.#logger.error(err, this.#createProducerBindingMessageMetadata());
374380
}
375381

382+
/**
383+
* Callback for the event.stats event, if defined.
384+
* @private
385+
*/
386+
#statsCb = null;
387+
376388
/**
377389
* Set up the client and connect to the bootstrap brokers.
378390
*
@@ -394,6 +406,9 @@ class Producer {
394406
this.#internalClient.on('event.error', this.#errorCb.bind(this));
395407
this.#internalClient.on('error', this.#errorCb.bind(this));
396408
this.#internalClient.on('event.log', (msg) => loggerTrampoline(msg, this.#logger));
409+
if (this.#statsCb) {
410+
this.#internalClient.on('event.stats', this.#statsCb.bind(this));
411+
}
397412

398413
return new Promise((resolve, reject) => {
399414
this.#connectPromiseFunc = { resolve, reject };

src/admin.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,12 @@ Baton AdminClient::Connect() {
7575
DeactivateDispatchers();
7676
}
7777

78+
std::string stats_interval_ms;
79+
if (this->m_gconfig->get("statistics.interval.ms", stats_interval_ms) ==
80+
RdKafka::Conf::CONF_OK) {
81+
Connection::SetPollInBackground(true);
82+
}
83+
7884
return baton;
7985
}
8086

src/connection.cc

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ namespace NodeKafka {
4242
Connection::Connection(Conf* gconfig, Conf* tconfig):
4343
m_event_cb(),
4444
m_gconfig(gconfig),
45-
m_tconfig(tconfig) {
45+
m_tconfig(tconfig),
46+
m_is_background_polling(false) {
4647
std::string errstr;
4748

4849
m_client = NULL;
@@ -131,6 +132,30 @@ Baton Connection::setupSaslOAuthBearerBackgroundQueue() {
131132
return rdkafkaErrorToBaton(error);
132133
}
133134

135+
Baton Connection::SetPollInBackground(bool set) {
136+
scoped_shared_read_lock lock(m_connection_lock);
137+
rd_kafka_t* rk = this->m_client->c_ptr();
138+
if (!IsConnected()) {
139+
return Baton(RdKafka::ERR__STATE, "Client is disconnected");
140+
}
141+
142+
if (set && !m_is_background_polling) {
143+
m_is_background_polling = true;
144+
rd_kafka_queue_t* main_q = rd_kafka_queue_get_main(rk);
145+
rd_kafka_queue_t* background_q = rd_kafka_queue_get_background(rk);
146+
rd_kafka_queue_forward(main_q, background_q);
147+
rd_kafka_queue_destroy(main_q);
148+
rd_kafka_queue_destroy(background_q);
149+
} else if (!set && m_is_background_polling) {
150+
m_is_background_polling = false;
151+
rd_kafka_queue_t* main_q = rd_kafka_queue_get_main(rk);
152+
rd_kafka_queue_forward(main_q, NULL);
153+
rd_kafka_queue_destroy(main_q);
154+
}
155+
156+
return Baton(RdKafka::ERR_NO_ERROR);
157+
}
158+
134159
RdKafka::TopicPartition* Connection::GetPartition(std::string &topic) {
135160
return RdKafka::TopicPartition::create(topic, RdKafka::Topic::PARTITION_UA);
136161
}

src/connection.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ class Connection : public Nan::ObjectWrap {
8888

8989
Baton setupSaslOAuthBearerConfig();
9090
Baton setupSaslOAuthBearerBackgroundQueue();
91+
Baton SetPollInBackground(bool set);
9192

9293
bool m_is_closing;
9394

@@ -97,6 +98,7 @@ class Connection : public Nan::ObjectWrap {
9798

9899
uv_rwlock_t m_connection_lock;
99100
bool m_has_underlying = false;
101+
bool m_is_background_polling;
100102

101103
RdKafka::Handle* m_client;
102104

src/producer.cc

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,7 @@ namespace NodeKafka {
3333
Producer::Producer(Conf* gconfig, Conf* tconfig):
3434
Connection(gconfig, tconfig),
3535
m_dr_cb(),
36-
m_partitioner_cb(),
37-
m_is_background_polling(false) {
36+
m_partitioner_cb() {
3837
std::string errstr;
3938

4039
if (m_tconfig)
@@ -345,30 +344,6 @@ void Producer::Poll() {
345344
m_client->poll(0);
346345
}
347346

348-
Baton Producer::SetPollInBackground(bool set) {
349-
scoped_shared_read_lock lock(m_connection_lock);
350-
rd_kafka_t* rk = this->m_client->c_ptr();
351-
if (!IsConnected()) {
352-
return Baton(RdKafka::ERR__STATE, "Producer is disconnected");
353-
}
354-
355-
if (set && !m_is_background_polling) {
356-
m_is_background_polling = true;
357-
rd_kafka_queue_t* main_q = rd_kafka_queue_get_main(rk);
358-
rd_kafka_queue_t* background_q = rd_kafka_queue_get_background(rk);
359-
rd_kafka_queue_forward(main_q, background_q);
360-
rd_kafka_queue_destroy(main_q);
361-
rd_kafka_queue_destroy(background_q);
362-
} else if (!set && m_is_background_polling) {
363-
m_is_background_polling = false;
364-
rd_kafka_queue_t* main_q = rd_kafka_queue_get_main(rk);
365-
rd_kafka_queue_forward(main_q, NULL);
366-
rd_kafka_queue_destroy(main_q);
367-
}
368-
369-
return Baton(RdKafka::ERR_NO_ERROR);
370-
}
371-
372347
void Producer::ConfigureCallback(const std::string& string_key,
373348
const v8::Local<v8::Function>& cb, bool add) {
374349
if (string_key.compare("delivery_cb") == 0) {

src/producer.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ class Producer : public Connection {
5555
Baton Connect();
5656
void Disconnect();
5757
void Poll();
58-
Baton SetPollInBackground(bool);
58+
using Connection::SetPollInBackground;
5959
#if RD_KAFKA_VERSION > 0x00090200
6060
Baton Flush(int timeout_ms);
6161
#endif
@@ -117,7 +117,6 @@ class Producer : public Connection {
117117

118118
Callbacks::Delivery m_dr_cb;
119119
Callbacks::Partitioner m_partitioner_cb;
120-
bool m_is_background_polling;
121120
};
122121

123122
} // namespace NodeKafka
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
jest.setTimeout(10000);
2+
3+
const {
4+
createAdmin,
5+
} = require('../testhelpers');
6+
7+
describe('Admin', () => {
8+
it('can receive stats callbacks',
9+
async () => {
10+
let numCalls = 0;
11+
let admin = createAdmin({}, {
12+
'statistics.interval.ms': '100',
13+
'stats_cb': function (event) {
14+
expect(event).toHaveProperty('message');
15+
expect(event.message).toContain('"type":');
16+
numCalls++;
17+
}
18+
});
19+
await admin.connect();
20+
await new Promise((resolve) => setTimeout(resolve, 400));
21+
await admin.disconnect();
22+
expect(numCalls).toBeGreaterThanOrEqual(3);
23+
}
24+
);
25+
});

0 commit comments

Comments
 (0)