Skip to content

Commit 22990aa

Browse files
committed
Statistics callback is available when using the promisified API with all client types
1 parent 44616ab commit 22990aa

File tree

9 files changed

+115
-30
lines changed

9 files changed

+115
-30
lines changed

CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ v1.7.0 is a feature release. It is supported for all usage.
77
1. Configurable batch size through the `js.consumer.max.batch.size` property
88
and cache size through the `js.consumer.max.cache.size.per.worker.ms`
99
property (#393).
10-
2. Fix for at-least-once guarantee not ensured in case a seek happens on one
10+
2. Statistics callback now available when using the promisified API with all
11+
client types (#399).
12+
3. Fix for at-least-once guarantee not ensured in case a seek happens on one
1113
partition and there are messages being fetched about other partitions (#393).
1214

1315

src/admin.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,12 @@ Baton AdminClient::Connect() {
7575
DeactivateDispatchers();
7676
}
7777

78+
std::string stats_interval_ms;
79+
if (this->m_gconfig->get("statistics.interval.ms", stats_interval_ms) ==
80+
RdKafka::Conf::CONF_OK) {
81+
Connection::SetPollInBackground(true);
82+
}
83+
7884
return baton;
7985
}
8086

src/connection.cc

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ namespace NodeKafka {
4242
Connection::Connection(Conf* gconfig, Conf* tconfig):
4343
m_event_cb(),
4444
m_gconfig(gconfig),
45-
m_tconfig(tconfig) {
45+
m_tconfig(tconfig),
46+
m_is_background_polling(false) {
4647
std::string errstr;
4748

4849
m_client = NULL;
@@ -131,6 +132,30 @@ Baton Connection::setupSaslOAuthBearerBackgroundQueue() {
131132
return rdkafkaErrorToBaton(error);
132133
}
133134

135+
Baton Connection::SetPollInBackground(bool set) {
136+
scoped_shared_read_lock lock(m_connection_lock);
137+
rd_kafka_t* rk = this->m_client->c_ptr();
138+
if (!IsConnected()) {
139+
return Baton(RdKafka::ERR__STATE, "Client is disconnected");
140+
}
141+
142+
if (set && !m_is_background_polling) {
143+
m_is_background_polling = true;
144+
rd_kafka_queue_t* main_q = rd_kafka_queue_get_main(rk);
145+
rd_kafka_queue_t* background_q = rd_kafka_queue_get_background(rk);
146+
rd_kafka_queue_forward(main_q, background_q);
147+
rd_kafka_queue_destroy(main_q);
148+
rd_kafka_queue_destroy(background_q);
149+
} else if (!set && m_is_background_polling) {
150+
m_is_background_polling = false;
151+
rd_kafka_queue_t* main_q = rd_kafka_queue_get_main(rk);
152+
rd_kafka_queue_forward(main_q, NULL);
153+
rd_kafka_queue_destroy(main_q);
154+
}
155+
156+
return Baton(RdKafka::ERR_NO_ERROR);
157+
}
158+
134159
RdKafka::TopicPartition* Connection::GetPartition(std::string &topic) {
135160
return RdKafka::TopicPartition::create(topic, RdKafka::Topic::PARTITION_UA);
136161
}

src/connection.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ class Connection : public Nan::ObjectWrap {
8888

8989
Baton setupSaslOAuthBearerConfig();
9090
Baton setupSaslOAuthBearerBackgroundQueue();
91+
Baton SetPollInBackground(bool set);
9192

9293
bool m_is_closing;
9394

@@ -97,6 +98,7 @@ class Connection : public Nan::ObjectWrap {
9798

9899
uv_rwlock_t m_connection_lock;
99100
bool m_has_underlying = false;
101+
bool m_is_background_polling;
100102

101103
RdKafka::Handle* m_client;
102104

src/producer.cc

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,7 @@ namespace NodeKafka {
3333
Producer::Producer(Conf* gconfig, Conf* tconfig):
3434
Connection(gconfig, tconfig),
3535
m_dr_cb(),
36-
m_partitioner_cb(),
37-
m_is_background_polling(false) {
36+
m_partitioner_cb() {
3837
std::string errstr;
3938

4039
if (m_tconfig)
@@ -345,30 +344,6 @@ void Producer::Poll() {
345344
m_client->poll(0);
346345
}
347346

348-
Baton Producer::SetPollInBackground(bool set) {
349-
scoped_shared_read_lock lock(m_connection_lock);
350-
rd_kafka_t* rk = this->m_client->c_ptr();
351-
if (!IsConnected()) {
352-
return Baton(RdKafka::ERR__STATE, "Producer is disconnected");
353-
}
354-
355-
if (set && !m_is_background_polling) {
356-
m_is_background_polling = true;
357-
rd_kafka_queue_t* main_q = rd_kafka_queue_get_main(rk);
358-
rd_kafka_queue_t* background_q = rd_kafka_queue_get_background(rk);
359-
rd_kafka_queue_forward(main_q, background_q);
360-
rd_kafka_queue_destroy(main_q);
361-
rd_kafka_queue_destroy(background_q);
362-
} else if (!set && m_is_background_polling) {
363-
m_is_background_polling = false;
364-
rd_kafka_queue_t* main_q = rd_kafka_queue_get_main(rk);
365-
rd_kafka_queue_forward(main_q, NULL);
366-
rd_kafka_queue_destroy(main_q);
367-
}
368-
369-
return Baton(RdKafka::ERR_NO_ERROR);
370-
}
371-
372347
void Producer::ConfigureCallback(const std::string& string_key,
373348
const v8::Local<v8::Function>& cb, bool add) {
374349
if (string_key.compare("delivery_cb") == 0) {

src/producer.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ class Producer : public Connection {
5555
Baton Connect();
5656
void Disconnect();
5757
void Poll();
58-
Baton SetPollInBackground(bool);
58+
using Connection::SetPollInBackground;
5959
#if RD_KAFKA_VERSION > 0x00090200
6060
Baton Flush(int timeout_ms);
6161
#endif
@@ -117,7 +117,6 @@ class Producer : public Connection {
117117

118118
Callbacks::Delivery m_dr_cb;
119119
Callbacks::Partitioner m_partitioner_cb;
120-
bool m_is_background_polling;
121120
};
122121

123122
} // namespace NodeKafka
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
jest.setTimeout(10000);
2+
3+
const {
4+
createAdmin,
5+
} = require('../testhelpers');
6+
7+
describe('Admin', () => {
8+
it('can receive stats callbacks',
9+
async () => {
10+
let numCalls = 0;
11+
let admin = createAdmin({}, {
12+
'statistics.interval.ms': '100',
13+
'stats_cb': function (event) {
14+
expect(event).toHaveProperty('message');
15+
expect(event.message).toContain('"type":');
16+
numCalls++;
17+
}
18+
});
19+
await admin.connect();
20+
await new Promise((resolve) => setTimeout(resolve, 400));
21+
await admin.disconnect();
22+
expect(numCalls).toBeGreaterThanOrEqual(3);
23+
}
24+
);
25+
});
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
jest.setTimeout(10000);
2+
3+
const {
4+
createConsumer,
5+
} = require('../testhelpers');
6+
7+
describe('Consumer', () => {
8+
it('can receive stats callbacks',
9+
async () => {
10+
let numCalls = 0;
11+
let consumer = createConsumer({}, {
12+
'group.id': `group-${Date.now()}`,
13+
'statistics.interval.ms': '100',
14+
'stats_cb': function (event) {
15+
expect(event).toHaveProperty('message');
16+
expect(event.message).toContain('"type":');
17+
numCalls++;
18+
}
19+
});
20+
await consumer.connect();
21+
await new Promise((resolve) => setTimeout(resolve, 400));
22+
await consumer.disconnect();
23+
expect(numCalls).toBeGreaterThanOrEqual(3);
24+
}
25+
);
26+
});
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
jest.setTimeout(10000);
2+
3+
const {
4+
createProducer,
5+
} = require('../testhelpers');
6+
7+
describe('Producer', () => {
8+
it('can receive stats callbacks',
9+
async () => {
10+
let numCalls = 0;
11+
let producer = createProducer({}, {
12+
'statistics.interval.ms': '100',
13+
'stats_cb': function (event) {
14+
expect(event).toHaveProperty('message');
15+
expect(event.message).toContain('"type":');
16+
numCalls++;
17+
}
18+
});
19+
await producer.connect();
20+
await new Promise((resolve) => setTimeout(resolve, 400));
21+
await producer.disconnect();
22+
expect(numCalls).toBeGreaterThanOrEqual(3);
23+
}
24+
);
25+
});

0 commit comments

Comments
 (0)