Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ v1.7.0 is a feature release. It is supported for all usage.
1. Configurable batch size through the `js.consumer.max.batch.size` property
and cache size through the `js.consumer.max.cache.size.per.worker.ms`
property (#393).
2. Fix for at-least-once guarantee not ensured in case a seek happens on one
2. Statistics callback now available when using the promisified API with all
client types (#399).
3. Fix for at-least-once guarantee not ensured in case a seek happens on one
partition and there are messages being fetched about other partitions (#393).


Expand Down
15 changes: 15 additions & 0 deletions lib/kafkajs/_admin.js
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,12 @@ class Admin {
/* Delete properties which are already processed, or cannot be passed to node-rdkafka */
delete rdKafkaConfig.kafkaJS;

if (Object.hasOwn(rdKafkaConfig, 'stats_cb')) {
if (typeof rdKafkaConfig['stats_cb'] === 'function')
this.#statsCb = rdKafkaConfig['stats_cb'];
delete rdKafkaConfig['stats_cb'];
}

return rdKafkaConfig;
}

Expand Down Expand Up @@ -203,6 +209,12 @@ class Admin {
}
}

/**
* Callback for the event.stats event, if defined.
* @private
*/
#statsCb = null;

/**
* Set up the client and connect to the bootstrap brokers.
* @returns {Promise<void>} Resolves when connection is complete, rejects on error.
Expand All @@ -225,6 +237,9 @@ class Admin {
'ready': this.#readyCb.bind(this),
'event.log': (msg) => loggerTrampoline(msg, this.#logger),
});
if (this.#statsCb) {
this.#internalClient.on('event.stats', this.#statsCb.bind(this));
}
} else {
const underlyingClient = this.#existingClient._getInternalClient();
if (!underlyingClient) {
Expand Down
16 changes: 16 additions & 0 deletions lib/kafkajs/_consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,12 @@ class Consumer {
delete rdKafkaConfig['js.consumer.max.cache.size.per.worker.ms'];
}

if (Object.hasOwn(rdKafkaConfig, 'stats_cb')) {
if (typeof rdKafkaConfig['stats_cb'] === 'function')
this.#statsCb = rdKafkaConfig['stats_cb'];
delete rdKafkaConfig['stats_cb'];
}

return rdKafkaConfig;
}

Expand Down Expand Up @@ -770,6 +776,13 @@ class Consumer {
}
}


/**
* Callback for the event.stats event, if defined.
* @private
*/
#statsCb = null;

/**
* Converts headers returned by node-rdkafka into a format that can be used by the eachMessage/eachBatch callback.
* @param {import("../..").MessageHeader[] | undefined} messageHeaders
Expand Down Expand Up @@ -1167,6 +1180,9 @@ class Consumer {
this.#internalClient.on('error', this.#errorCb.bind(this));
this.#internalClient.on('event.error', this.#errorCb.bind(this));
this.#internalClient.on('event.log', (msg) => loggerTrampoline(msg, this.#logger));
if (this.#statsCb) {
this.#internalClient.on('event.stats', this.#statsCb.bind(this));
}

return new Promise((resolve, reject) => {
this.#connectPromiseFunc = { resolve, reject };
Expand Down
15 changes: 15 additions & 0 deletions lib/kafkajs/_producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,12 @@ class Producer {
/* TODO: Add a warning if dr_cb is set? Or else, create a trampoline for it. */
rdKafkaConfig.dr_cb = true;

if (Object.hasOwn(rdKafkaConfig, 'stats_cb')) {
if (typeof rdKafkaConfig['stats_cb'] === 'function')
this.#statsCb = rdKafkaConfig['stats_cb'];
delete rdKafkaConfig['stats_cb'];
}

return rdKafkaConfig;
}

Expand Down Expand Up @@ -373,6 +379,12 @@ class Producer {
this.#logger.error(err, this.#createProducerBindingMessageMetadata());
}

/**
* Callback for the event.stats event, if defined.
* @private
*/
#statsCb = null;

/**
* Set up the client and connect to the bootstrap brokers.
*
Expand All @@ -394,6 +406,9 @@ class Producer {
this.#internalClient.on('event.error', this.#errorCb.bind(this));
this.#internalClient.on('error', this.#errorCb.bind(this));
this.#internalClient.on('event.log', (msg) => loggerTrampoline(msg, this.#logger));
if (this.#statsCb) {
this.#internalClient.on('event.stats', this.#statsCb.bind(this));
}

return new Promise((resolve, reject) => {
this.#connectPromiseFunc = { resolve, reject };
Expand Down
6 changes: 6 additions & 0 deletions src/admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ Baton AdminClient::Connect() {
DeactivateDispatchers();
}

std::string stats_interval_ms;
if (this->m_gconfig->get("statistics.interval.ms", stats_interval_ms) ==
RdKafka::Conf::CONF_OK) {
Connection::SetPollInBackground(true);
}

return baton;
}

Expand Down
27 changes: 26 additions & 1 deletion src/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ namespace NodeKafka {
Connection::Connection(Conf* gconfig, Conf* tconfig):
m_event_cb(),
m_gconfig(gconfig),
m_tconfig(tconfig) {
m_tconfig(tconfig),
m_is_background_polling(false) {
std::string errstr;

m_client = NULL;
Expand Down Expand Up @@ -131,6 +132,30 @@ Baton Connection::setupSaslOAuthBearerBackgroundQueue() {
return rdkafkaErrorToBaton(error);
}

Baton Connection::SetPollInBackground(bool set) {
scoped_shared_read_lock lock(m_connection_lock);
rd_kafka_t* rk = this->m_client->c_ptr();
if (!IsConnected()) {
return Baton(RdKafka::ERR__STATE, "Client is disconnected");
}

if (set && !m_is_background_polling) {
m_is_background_polling = true;
rd_kafka_queue_t* main_q = rd_kafka_queue_get_main(rk);
rd_kafka_queue_t* background_q = rd_kafka_queue_get_background(rk);
rd_kafka_queue_forward(main_q, background_q);
rd_kafka_queue_destroy(main_q);
rd_kafka_queue_destroy(background_q);
} else if (!set && m_is_background_polling) {
m_is_background_polling = false;
rd_kafka_queue_t* main_q = rd_kafka_queue_get_main(rk);
rd_kafka_queue_forward(main_q, NULL);
rd_kafka_queue_destroy(main_q);
}

return Baton(RdKafka::ERR_NO_ERROR);
}

RdKafka::TopicPartition* Connection::GetPartition(std::string &topic) {
return RdKafka::TopicPartition::create(topic, RdKafka::Topic::PARTITION_UA);
}
Expand Down
2 changes: 2 additions & 0 deletions src/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class Connection : public Nan::ObjectWrap {

Baton setupSaslOAuthBearerConfig();
Baton setupSaslOAuthBearerBackgroundQueue();
Baton SetPollInBackground(bool set);

bool m_is_closing;

Expand All @@ -97,6 +98,7 @@ class Connection : public Nan::ObjectWrap {

uv_rwlock_t m_connection_lock;
bool m_has_underlying = false;
bool m_is_background_polling;

RdKafka::Handle* m_client;

Expand Down
27 changes: 1 addition & 26 deletions src/producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ namespace NodeKafka {
Producer::Producer(Conf* gconfig, Conf* tconfig):
Connection(gconfig, tconfig),
m_dr_cb(),
m_partitioner_cb(),
m_is_background_polling(false) {
m_partitioner_cb() {
std::string errstr;

if (m_tconfig)
Expand Down Expand Up @@ -345,30 +344,6 @@ void Producer::Poll() {
m_client->poll(0);
}

Baton Producer::SetPollInBackground(bool set) {
scoped_shared_read_lock lock(m_connection_lock);
rd_kafka_t* rk = this->m_client->c_ptr();
if (!IsConnected()) {
return Baton(RdKafka::ERR__STATE, "Producer is disconnected");
}

if (set && !m_is_background_polling) {
m_is_background_polling = true;
rd_kafka_queue_t* main_q = rd_kafka_queue_get_main(rk);
rd_kafka_queue_t* background_q = rd_kafka_queue_get_background(rk);
rd_kafka_queue_forward(main_q, background_q);
rd_kafka_queue_destroy(main_q);
rd_kafka_queue_destroy(background_q);
} else if (!set && m_is_background_polling) {
m_is_background_polling = false;
rd_kafka_queue_t* main_q = rd_kafka_queue_get_main(rk);
rd_kafka_queue_forward(main_q, NULL);
rd_kafka_queue_destroy(main_q);
}

return Baton(RdKafka::ERR_NO_ERROR);
}

void Producer::ConfigureCallback(const std::string& string_key,
const v8::Local<v8::Function>& cb, bool add) {
if (string_key.compare("delivery_cb") == 0) {
Expand Down
3 changes: 1 addition & 2 deletions src/producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class Producer : public Connection {
Baton Connect();
void Disconnect();
void Poll();
Baton SetPollInBackground(bool);
using Connection::SetPollInBackground;
#if RD_KAFKA_VERSION > 0x00090200
Baton Flush(int timeout_ms);
#endif
Expand Down Expand Up @@ -117,7 +117,6 @@ class Producer : public Connection {

Callbacks::Delivery m_dr_cb;
Callbacks::Partitioner m_partitioner_cb;
bool m_is_background_polling;
};

} // namespace NodeKafka
Expand Down
25 changes: 25 additions & 0 deletions test/promisified/admin/statsCallback.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
jest.setTimeout(10000);

const {
createAdmin,
} = require('../testhelpers');

describe('Admin', () => {
it('can receive stats callbacks',
async () => {
let numCalls = 0;
let admin = createAdmin({}, {
'statistics.interval.ms': '100',
'stats_cb': function (event) {
expect(event).toHaveProperty('message');
expect(event.message).toContain('"type":');
numCalls++;
}
});
await admin.connect();
await new Promise((resolve) => setTimeout(resolve, 400));
await admin.disconnect();
expect(numCalls).toBeGreaterThanOrEqual(3);
}
);
});
26 changes: 26 additions & 0 deletions test/promisified/consumer/statsCallback.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
jest.setTimeout(10000);

const {
createConsumer,
} = require('../testhelpers');

describe('Consumer', () => {
it('can receive stats callbacks',
async () => {
let numCalls = 0;
let consumer = createConsumer({}, {
'group.id': `group-${Date.now()}`,
'statistics.interval.ms': '100',
'stats_cb': function (event) {
expect(event).toHaveProperty('message');
expect(event.message).toContain('"type":');
numCalls++;
}
});
await consumer.connect();
await new Promise((resolve) => setTimeout(resolve, 400));
await consumer.disconnect();
expect(numCalls).toBeGreaterThanOrEqual(3);
}
);
});
25 changes: 25 additions & 0 deletions test/promisified/producer/statsCallback.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
jest.setTimeout(10000);

const {
createProducer,
} = require('../testhelpers');

describe('Producer', () => {
it('can receive stats callbacks',
async () => {
let numCalls = 0;
let producer = createProducer({}, {
'statistics.interval.ms': '100',
'stats_cb': function (event) {
expect(event).toHaveProperty('message');
expect(event.message).toContain('"type":');
numCalls++;
}
});
await producer.connect();
await new Promise((resolve) => setTimeout(resolve, 400));
await producer.disconnect();
expect(numCalls).toBeGreaterThanOrEqual(3);
}
);
});