Skip to content

Commit 7960d77

Browse files
committed
Add ListGroups to admin client
1 parent a347f7e commit 7960d77

File tree

15 files changed

+618
-46
lines changed

15 files changed

+618
-46
lines changed

index.d.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,22 @@ export interface NewTopic {
333333
} | { [cfg: string]: string; };
334334
}
335335

336+
export enum ConsumerGroupStates {
337+
UNKNOWN = 0,
338+
PREPARING_REBALANCE = 1,
339+
COMPLETING_REBALANCE = 2,
340+
STABLE = 3,
341+
DEAD = 4,
342+
EMPTY = 5,
343+
}
344+
345+
export interface GroupOverview {
346+
groupId: string;
347+
protocolType: string;
348+
isSimpleConsumerGroup: boolean;
349+
state: ConsumerGroupStates;
350+
}
351+
336352
export interface IAdminClient {
337353
createTopic(topic: NewTopic, cb?: (err: LibrdKafkaError) => void): void;
338354
createTopic(topic: NewTopic, timeout?: number, cb?: (err: LibrdKafkaError) => void): void;
@@ -343,6 +359,10 @@ export interface IAdminClient {
343359
createPartitions(topic: string, desiredPartitions: number, cb?: (err: LibrdKafkaError) => void): void;
344360
createPartitions(topic: string, desiredPartitions: number, timeout?: number, cb?: (err: LibrdKafkaError) => void): void;
345361

362+
listGroups(cb?: (err: LibrdKafkaError, result: { groups: GroupOverview[], errors: LibrdKafkaError[] }) => any): void;
363+
listGroups(options: { timeout?: number, matchConsumerGroupStates?: ConsumerGroupStates[] },
364+
cb?: (err: LibrdKafkaError, result: { groups: GroupOverview[], errors: LibrdKafkaError[] }) => any): void;
365+
346366
disconnect(): void;
347367
}
348368

lib/admin.js

Lines changed: 65 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,30 @@
22
* confluent-kafka-javascript - Node.js wrapper for RdKafka C/C++ library
33
*
44
* Copyright (c) 2016-2023 Blizzard Entertainment
5+
* 2024 Confluent, Inc
56
*
67
* This software may be modified and distributed under the terms
78
* of the MIT license. See the LICENSE.txt file for details.
89
*/
910
'use strict';
1011

12+
/* TODO: Think of a way to fetch these from within librdkafka instead of this
13+
* hardcoded list.
14+
* New additions won't be automatically added to this list.
15+
*/
16+
const ConsumerGroupStates = Object.seal({
17+
UNKNOWN: 0,
18+
PREPARING_REBALANCE: 1,
19+
COMPLETING_REBALANCE: 2,
20+
STABLE: 3,
21+
DEAD: 4,
22+
EMPTY: 5,
23+
})
24+
25+
1126
module.exports = {
1227
create: createAdminClient,
28+
ConsumerGroupStates,
1329
};
1430

1531
var Client = require('./client');
@@ -89,7 +105,7 @@ function AdminClient(conf) {
89105
*
90106
* Unlike the other connect methods, this one is synchronous.
91107
*/
92-
AdminClient.prototype.connect = function() {
108+
AdminClient.prototype.connect = function () {
93109
LibrdKafkaError.wrap(this._client.connect(), true);
94110
this._isConnected = true;
95111
};
@@ -100,7 +116,7 @@ AdminClient.prototype.connect = function() {
100116
* This is a synchronous method, but all it does is clean up
101117
* some memory and shut some threads down
102118
*/
103-
AdminClient.prototype.disconnect = function() {
119+
AdminClient.prototype.disconnect = function () {
104120
LibrdKafkaError.wrap(this._client.disconnect(), true);
105121
this._isConnected = false;
106122
};
@@ -112,7 +128,7 @@ AdminClient.prototype.disconnect = function() {
112128
* @param {number} timeout - Number of milliseconds to wait while trying to create the topic.
113129
* @param {function} cb - The callback to be executed when finished
114130
*/
115-
AdminClient.prototype.createTopic = function(topic, timeout, cb) {
131+
AdminClient.prototype.createTopic = function (topic, timeout, cb) {
116132
if (!this._isConnected) {
117133
throw new Error('Client is disconnected');
118134
}
@@ -126,7 +142,7 @@ AdminClient.prototype.createTopic = function(topic, timeout, cb) {
126142
timeout = 5000;
127143
}
128144

129-
this._client.createTopic(topic, timeout, function(err) {
145+
this._client.createTopic(topic, timeout, function (err) {
130146
if (err) {
131147
if (cb) {
132148
cb(LibrdKafkaError.create(err));
@@ -147,7 +163,7 @@ AdminClient.prototype.createTopic = function(topic, timeout, cb) {
147163
* @param {number} timeout - Number of milliseconds to wait while trying to delete the topic.
148164
* @param {function} cb - The callback to be executed when finished
149165
*/
150-
AdminClient.prototype.deleteTopic = function(topic, timeout, cb) {
166+
AdminClient.prototype.deleteTopic = function (topic, timeout, cb) {
151167
if (!this._isConnected) {
152168
throw new Error('Client is disconnected');
153169
}
@@ -161,7 +177,7 @@ AdminClient.prototype.deleteTopic = function(topic, timeout, cb) {
161177
timeout = 5000;
162178
}
163179

164-
this._client.deleteTopic(topic, timeout, function(err) {
180+
this._client.deleteTopic(topic, timeout, function (err) {
165181
if (err) {
166182
if (cb) {
167183
cb(LibrdKafkaError.create(err));
@@ -184,7 +200,7 @@ AdminClient.prototype.deleteTopic = function(topic, timeout, cb) {
184200
* @param {number} timeout - Number of milliseconds to wait while trying to create the partitions.
185201
* @param {function} cb - The callback to be executed when finished
186202
*/
187-
AdminClient.prototype.createPartitions = function(topic, totalPartitions, timeout, cb) {
203+
AdminClient.prototype.createPartitions = function (topic, totalPartitions, timeout, cb) {
188204
if (!this._isConnected) {
189205
throw new Error('Client is disconnected');
190206
}
@@ -198,7 +214,7 @@ AdminClient.prototype.createPartitions = function(topic, totalPartitions, timeou
198214
timeout = 5000;
199215
}
200216

201-
this._client.createPartitions(topic, totalPartitions, timeout, function(err) {
217+
this._client.createPartitions(topic, totalPartitions, timeout, function (err) {
202218
if (err) {
203219
if (cb) {
204220
cb(LibrdKafkaError.create(err));
@@ -211,3 +227,44 @@ AdminClient.prototype.createPartitions = function(topic, totalPartitions, timeou
211227
}
212228
});
213229
};
230+
231+
/**
232+
* List consumer groups.
233+
* @param {any} options
234+
* @param {number?} options.timeout - The request timeout in milliseconds.
235+
* May be unset (default: 5000)
236+
* @param {import("../").ConsumerGroupStates[]?} options.matchConsumerGroupStates -
237+
* A list of consumer group states to match. May be unset, fetches all states (default: unset).
238+
* @param {function} cb - The callback to be executed when finished.
239+
*
240+
* Valid ways to call this function:
241+
* listGroups(cb)
242+
* listGroups(options, cb)
243+
*/
244+
AdminClient.prototype.listGroups = function (options, cb) {
245+
if (!this._isConnected) {
246+
throw new Error('Client is disconnected');
247+
}
248+
249+
if (typeof options === 'function') {
250+
cb = options;
251+
options = {};
252+
}
253+
254+
if (!options) {
255+
options = {};
256+
}
257+
258+
this._client.listGroups(options, function (err, groups) {
259+
if (err) {
260+
if (cb) {
261+
cb(LibrdKafkaError.create(err));
262+
}
263+
return;
264+
}
265+
266+
if (cb) {
267+
cb(null, groups);
268+
}
269+
});
270+
}

lib/kafkajs/_admin.js

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,33 @@ class Admin {
231231
);
232232
}
233233

234+
/**
235+
* List consumer groups.
236+
*
237+
* @param {object?} options
238+
* @param {number?} options.timeout - The request timeout in milliseconds.
239+
* May be unset (default: 5000)
240+
* @param {import("../../types/kafkajs").ConsumerGroupStates[]?} options.matchConsumerGroupStates -
241+
* A list of consumer group states to match. May be unset, fetches all states (default: unset).
242+
* @returns {Promise<{ groups: import("../../types/kafkajs").GroupOverview[], errors: import("../../types/kafkajs").LibrdKafkaError[] }>}
243+
* Resolves with the list of consumer groups, rejects on error.
244+
*/
245+
async listGroups(options = {}) {
246+
if (this.#state !== AdminState.CONNECTED) {
247+
throw new error.KafkaJSError("Admin client is not connected.", { code: error.ErrorCodes.ERR__STATE });
248+
}
249+
250+
return new Promise((resolve, reject) => {
251+
this.#internalClient.listGroups(options, (err, groups) => {
252+
if (err) {
253+
reject(createKafkaJsErrorFromLibRdKafkaError(err));
254+
} else {
255+
resolve(groups);
256+
}
257+
});
258+
});
259+
}
260+
234261
}
235262

236-
module.exports = { Admin }
263+
module.exports = { Admin, ConsumerGroupStates: RdKafka.AdminClient.ConsumerGroupStates }

lib/kafkajs/_kafka.js

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
const { Producer, CompressionTypes } = require('./_producer');
22
const { Consumer, PartitionAssigners } = require('./_consumer');
3-
const { Admin } = require('./_admin');
3+
const { Admin, ConsumerGroupStates } = require('./_admin');
44
const error = require('./_error');
55
const { logLevel, checkIfKafkaJsKeysPresent, CompatibilityErrorMessages } = require('./_common');
66

@@ -80,4 +80,10 @@ class Kafka {
8080
}
8181
}
8282

83-
module.exports = { Kafka, ...error, logLevel, PartitionAssigners, PartitionAssignors: PartitionAssigners, CompressionTypes };
83+
module.exports = {
84+
Kafka,
85+
...error, logLevel,
86+
PartitionAssigners,
87+
PartitionAssignors: PartitionAssigners,
88+
CompressionTypes,
89+
ConsumerGroupStates };

src/admin.cc

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,9 @@ void AdminClient::Init(v8::Local<v8::Object> exports) {
8787
Nan::SetPrototypeMethod(tpl, "deleteTopic", NodeDeleteTopic);
8888
Nan::SetPrototypeMethod(tpl, "createPartitions", NodeCreatePartitions);
8989

90+
// Consumer group related operations
91+
Nan::SetPrototypeMethod(tpl, "listGroups", NodeListGroups);
92+
9093
Nan::SetPrototypeMethod(tpl, "connect", NodeConnect);
9194
Nan::SetPrototypeMethod(tpl, "disconnect", NodeDisconnect);
9295
Nan::SetPrototypeMethod(tpl, "setSaslCredentials", NodeSetSaslCredentials);
@@ -179,6 +182,9 @@ rd_kafka_event_t* PollForEvent(
179182
rd_kafka_event_type(event_response) != event_type &&
180183
attempts > 0);
181184

185+
// TODO: change this function so a type mismatch leads to an INVALID_TYPE
186+
// error rather than a null event. A null event is treated as a timeout, which
187+
// isn't true all the time.
182188
// If this isn't the type of response we want, or if we do not have a response
183189
// type, bail out with a null
184190
if (event_response == NULL ||
@@ -421,6 +427,69 @@ Baton AdminClient::CreatePartitions(
421427
}
422428
}
423429

430+
Baton AdminClient::ListGroups(
431+
bool is_match_states_set,
432+
std::vector<rd_kafka_consumer_group_state_t> &match_states, int timeout_ms,
433+
/* out */ rd_kafka_event_t **event_response) {
434+
if (!IsConnected()) {
435+
return Baton(RdKafka::ERR__STATE);
436+
}
437+
438+
{
439+
scoped_shared_write_lock lock(m_connection_lock);
440+
if (!IsConnected()) {
441+
return Baton(RdKafka::ERR__STATE);
442+
}
443+
444+
// Make admin options to establish that we are listing groups
445+
rd_kafka_AdminOptions_t *options = rd_kafka_AdminOptions_new(
446+
m_client->c_ptr(), RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPS);
447+
448+
if (is_match_states_set) {
449+
rd_kafka_error_t *error =
450+
rd_kafka_AdminOptions_set_match_consumer_group_states(
451+
options, &match_states[0], match_states.size());
452+
if (error) {
453+
return Baton::BatonFromErrorAndDestroy(error);
454+
}
455+
}
456+
457+
// Create queue just for this operation.
458+
rd_kafka_queue_t *topic_rkqu = rd_kafka_queue_new(m_client->c_ptr());
459+
460+
rd_kafka_ListConsumerGroups(m_client->c_ptr(), options, topic_rkqu);
461+
462+
// Poll for an event by type in that queue
463+
// DON'T destroy the event. It is the out parameter, and ownership is
464+
// the caller's.
465+
*event_response = PollForEvent(
466+
topic_rkqu, RD_KAFKA_EVENT_LISTCONSUMERGROUPS_RESULT, timeout_ms);
467+
468+
// Destroy the queue since we are done with it.
469+
rd_kafka_queue_destroy(topic_rkqu);
470+
471+
// Destroy the options we just made because we polled already
472+
rd_kafka_AdminOptions_destroy(options);
473+
474+
// If we got no response from that operation, this is a failure
475+
// likely due to time out
476+
if (*event_response == NULL) {
477+
return Baton(RdKafka::ERR__TIMED_OUT);
478+
}
479+
480+
// Now we can get the error code from the event
481+
if (rd_kafka_event_error(*event_response)) {
482+
// If we had a special error code, get out of here with it
483+
const rd_kafka_resp_err_t errcode = rd_kafka_event_error(*event_response);
484+
return Baton(static_cast<RdKafka::ErrorCode>(errcode));
485+
}
486+
487+
// At this point, event_response contains the result, which needs
488+
// to be parsed/converted by the caller.
489+
return Baton(RdKafka::ERR_NO_ERROR);
490+
}
491+
}
492+
424493
void AdminClient::ActivateDispatchers() {
425494
// Listen to global config
426495
m_gconfig->listen();
@@ -600,4 +669,48 @@ NAN_METHOD(AdminClient::NodeCreatePartitions) {
600669
return info.GetReturnValue().Set(Nan::Null());
601670
}
602671

672+
/**
673+
* List Consumer Groups.
674+
*/
675+
NAN_METHOD(AdminClient::NodeListGroups) {
676+
Nan::HandleScope scope;
677+
678+
if (info.Length() < 2 || !info[1]->IsFunction()) {
679+
// Just throw an exception
680+
return Nan::ThrowError("Need to specify a callback");
681+
}
682+
683+
if (!info[0]->IsObject()) {
684+
return Nan::ThrowError("Must provide options object");
685+
}
686+
687+
v8::Local<v8::Object> config = info[0].As<v8::Object>();
688+
689+
// Create the final callback object
690+
v8::Local<v8::Function> cb = info[1].As<v8::Function>();
691+
Nan::Callback *callback = new Nan::Callback(cb);
692+
AdminClient *client = ObjectWrap::Unwrap<AdminClient>(info.This());
693+
694+
// Get the timeout - default 5000.
695+
int timeout_ms = GetParameter<int64_t>(config, "timeout", 5000);
696+
697+
// Get the match states, or not if they are unset.
698+
std::vector<rd_kafka_consumer_group_state_t> match_states;
699+
v8::Local<v8::String> matchConsumerGroupStatesKey =
700+
Nan::New("matchConsumerGroupStates").ToLocalChecked();
701+
bool is_match_states_set =
702+
Nan::Has(config, matchConsumerGroupStatesKey).FromMaybe(false);
703+
v8::Local<v8::Array> match_states_array;
704+
705+
if (is_match_states_set) {
706+
match_states_array = GetParameter<v8::Local<v8::Array>>(
707+
config, "matchConsumerGroupStates", match_states_array);
708+
match_states = Conversion::Admin::FromV8GroupStateArray(match_states_array);
709+
}
710+
711+
// Queue the work.
712+
Nan::AsyncQueueWorker(new Workers::AdminClientListGroups(
713+
callback, client, is_match_states_set, match_states, timeout_ms));
714+
}
715+
603716
} // namespace NodeKafka

src/admin.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ class AdminClient : public Connection {
5151
Baton CreatePartitions(rd_kafka_NewPartitions_t* topic, int timeout_ms);
5252
// Baton AlterConfig(rd_kafka_NewTopic_t* topic, int timeout_ms);
5353
// Baton DescribeConfig(rd_kafka_NewTopic_t* topic, int timeout_ms);
54+
Baton ListGroups(bool is_match_states_set,
55+
std::vector<rd_kafka_consumer_group_state_t>& match_states,
56+
int timeout_ms,
57+
rd_kafka_event_t** event_response);
5458

5559
protected:
5660
static Nan::Persistent<v8::Function> constructor;
@@ -68,6 +72,9 @@ class AdminClient : public Connection {
6872
static NAN_METHOD(NodeDeleteTopic);
6973
static NAN_METHOD(NodeCreatePartitions);
7074

75+
// Consumer group operations
76+
static NAN_METHOD(NodeListGroups);
77+
7178
static NAN_METHOD(NodeConnect);
7279
static NAN_METHOD(NodeDisconnect);
7380
};

0 commit comments

Comments
 (0)