@@ -89,6 +89,7 @@ void AdminClient::Init(v8::Local<v8::Object> exports) {
8989
9090 // Consumer group related operations
9191 Nan::SetPrototypeMethod (tpl, " listGroups" , NodeListGroups);
92+ Nan::SetPrototypeMethod (tpl, " describeGroups" , NodeDescribeGroups);
9293
9394 Nan::SetPrototypeMethod (tpl, " connect" , NodeConnect);
9495 Nan::SetPrototypeMethod (tpl, " disconnect" , NodeDisconnect);
@@ -455,18 +456,88 @@ Baton AdminClient::ListGroups(
455456 }
456457
457458 // Create queue just for this operation.
458- rd_kafka_queue_t *topic_rkqu = rd_kafka_queue_new (m_client->c_ptr ());
459+ rd_kafka_queue_t *rkqu = rd_kafka_queue_new (m_client->c_ptr ());
459460
460- rd_kafka_ListConsumerGroups (m_client->c_ptr (), options, topic_rkqu );
461+ rd_kafka_ListConsumerGroups (m_client->c_ptr (), options, rkqu );
461462
462463 // Poll for an event by type in that queue
463464 // DON'T destroy the event. It is the out parameter, and ownership is
464465 // the caller's.
465466 *event_response = PollForEvent (
466- topic_rkqu , RD_KAFKA_EVENT_LISTCONSUMERGROUPS_RESULT, timeout_ms);
467+ rkqu , RD_KAFKA_EVENT_LISTCONSUMERGROUPS_RESULT, timeout_ms);
467468
468469 // Destroy the queue since we are done with it.
469- rd_kafka_queue_destroy (topic_rkqu);
470+ rd_kafka_queue_destroy (rkqu);
471+
472+ // Destroy the options we just made because we polled already
473+ rd_kafka_AdminOptions_destroy (options);
474+
475+ // If we got no response from that operation, this is a failure
476+ // likely due to time out
477+ if (*event_response == NULL ) {
478+ return Baton (RdKafka::ERR__TIMED_OUT);
479+ }
480+
481+ // Now we can get the error code from the event
482+ if (rd_kafka_event_error (*event_response)) {
483+ // If we had a special error code, get out of here with it
484+ const rd_kafka_resp_err_t errcode = rd_kafka_event_error (*event_response);
485+ return Baton (static_cast <RdKafka::ErrorCode>(errcode));
486+ }
487+
488+ // At this point, event_response contains the result, which needs
489+ // to be parsed/converted by the caller.
490+ return Baton (RdKafka::ERR_NO_ERROR);
491+ }
492+ }
493+
494+ Baton AdminClient::DescribeGroups (std::vector<std::string> &groups,
495+ bool include_authorized_operations,
496+ int timeout_ms,
497+ /* out */ rd_kafka_event_t **event_response) {
498+ if (!IsConnected ()) {
499+ return Baton (RdKafka::ERR__STATE);
500+ }
501+
502+ {
503+ scoped_shared_write_lock lock (m_connection_lock);
504+ if (!IsConnected ()) {
505+ return Baton (RdKafka::ERR__STATE);
506+ }
507+
508+ // Make admin options to establish that we are describing groups
509+ rd_kafka_AdminOptions_t *options = rd_kafka_AdminOptions_new (
510+ m_client->c_ptr (), RD_KAFKA_ADMIN_OP_DESCRIBECONSUMERGROUPS);
511+
512+ if (include_authorized_operations) {
513+ rd_kafka_error_t *error =
514+ rd_kafka_AdminOptions_set_include_authorized_operations (
515+ options, include_authorized_operations);
516+ if (error) {
517+ return Baton::BatonFromErrorAndDestroy (error);
518+ }
519+ }
520+
521+ // Create queue just for this operation.
522+ rd_kafka_queue_t *rkqu = rd_kafka_queue_new (m_client->c_ptr ());
523+
524+ // Construct a char** to pass to librdkafka. Avoid too many allocations.
525+ std::vector<const char *> c_groups (groups.size ());
526+ for (size_t i = 0 ; i < groups.size (); i++) {
527+ c_groups[i] = groups[i].c_str ();
528+ }
529+
530+ rd_kafka_DescribeConsumerGroups (m_client->c_ptr (), &c_groups[0 ],
531+ groups.size (), options, rkqu);
532+
533+ // Poll for an event by type in that queue
534+ // DON'T destroy the event. It is the out parameter, and ownership is
535+ // the caller's.
536+ *event_response = PollForEvent (
537+ rkqu, RD_KAFKA_EVENT_DESCRIBECONSUMERGROUPS_RESULT, timeout_ms);
538+
539+ // Destroy the queue since we are done with it.
540+ rd_kafka_queue_destroy (rkqu);
470541
471542 // Destroy the options we just made because we polled already
472543 rd_kafka_AdminOptions_destroy (options);
@@ -696,10 +767,10 @@ NAN_METHOD(AdminClient::NodeListGroups) {
696767
697768 // Get the match states, or not if they are unset.
698769 std::vector<rd_kafka_consumer_group_state_t > match_states;
699- v8::Local<v8::String> matchConsumerGroupStatesKey =
770+ v8::Local<v8::String> match_consumer_group_states_key =
700771 Nan::New (" matchConsumerGroupStates" ).ToLocalChecked ();
701772 bool is_match_states_set =
702- Nan::Has (config, matchConsumerGroupStatesKey ).FromMaybe (false );
773+ Nan::Has (config, match_consumer_group_states_key ).FromMaybe (false );
703774 v8::Local<v8::Array> match_states_array;
704775
705776 if (is_match_states_set) {
@@ -713,4 +784,51 @@ NAN_METHOD(AdminClient::NodeListGroups) {
713784 callback, client, is_match_states_set, match_states, timeout_ms));
714785}
715786
787+ /* *
788+ * Describe Consumer Groups.
789+ */
790+ NAN_METHOD (AdminClient::NodeDescribeGroups) {
791+ Nan::HandleScope scope;
792+
793+ if (info.Length () < 3 || !info[2 ]->IsFunction ()) {
794+ // Just throw an exception
795+ return Nan::ThrowError (" Need to specify a callback" );
796+ }
797+
798+ if (!info[0 ]->IsArray ()) {
799+ return Nan::ThrowError (" Must provide group name array" );
800+ }
801+
802+ if (!info[1 ]->IsObject ()) {
803+ return Nan::ThrowError (" Must provide options object" );
804+ }
805+
806+ // Get list of group names to describe.
807+ v8::Local<v8::Array> group_names = info[0 ].As <v8::Array>();
808+ if (group_names->Length () == 0 ) {
809+ return Nan::ThrowError (" Must provide at least one group name" );
810+ }
811+ std::vector<std::string> group_names_vector =
812+ v8ArrayToStringVector (group_names);
813+
814+ v8::Local<v8::Object> config = info[1 ].As <v8::Object>();
815+
816+ // Get the timeout - default 5000.
817+ int timeout_ms = GetParameter<int64_t >(config, " timeout" , 5000 );
818+
819+ // Get whether to include authorized operations - default false.
820+ bool include_authorized_operations =
821+ GetParameter<bool >(config, " includeAuthorizedOperations" , false );
822+
823+ // Create the final callback object
824+ v8::Local<v8::Function> cb = info[2 ].As <v8::Function>();
825+ Nan::Callback *callback = new Nan::Callback (cb);
826+ AdminClient *client = ObjectWrap::Unwrap<AdminClient>(info.This ());
827+
828+ // Queue the work.
829+ Nan::AsyncQueueWorker (new Workers::AdminClientDescribeGroups (
830+ callback, client, group_names_vector, include_authorized_operations,
831+ timeout_ms));
832+ }
833+
716834} // namespace NodeKafka
0 commit comments