@@ -90,6 +90,7 @@ void AdminClient::Init(v8::Local<v8::Object> exports) {
9090 // Consumer group related operations
9191 Nan::SetPrototypeMethod (tpl, " listGroups" , NodeListGroups);
9292 Nan::SetPrototypeMethod (tpl, " describeGroups" , NodeDescribeGroups);
93+ Nan::SetPrototypeMethod (tpl, " deleteGroups" , NodeDeleteGroups);
9394
9495 Nan::SetPrototypeMethod (tpl, " connect" , NodeConnect);
9596 Nan::SetPrototypeMethod (tpl, " disconnect" , NodeDisconnect);
@@ -446,6 +447,13 @@ Baton AdminClient::ListGroups(
446447 rd_kafka_AdminOptions_t *options = rd_kafka_AdminOptions_new (
447448 m_client->c_ptr (), RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPS);
448449
450+ char errstr[512 ];
451+ rd_kafka_resp_err_t err = rd_kafka_AdminOptions_set_request_timeout (
452+ options, timeout_ms, errstr, sizeof (errstr));
453+ if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
454+ return Baton (static_cast <RdKafka::ErrorCode>(err), errstr);
455+ }
456+
449457 if (is_match_states_set) {
450458 rd_kafka_error_t *error =
451459 rd_kafka_AdminOptions_set_match_consumer_group_states (
@@ -509,6 +517,13 @@ Baton AdminClient::DescribeGroups(std::vector<std::string> &groups,
509517 rd_kafka_AdminOptions_t *options = rd_kafka_AdminOptions_new (
510518 m_client->c_ptr (), RD_KAFKA_ADMIN_OP_DESCRIBECONSUMERGROUPS);
511519
520+ char errstr[512 ];
521+ rd_kafka_resp_err_t err = rd_kafka_AdminOptions_set_request_timeout (
522+ options, timeout_ms, errstr, sizeof (errstr));
523+ if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
524+ return Baton (static_cast <RdKafka::ErrorCode>(err), errstr);
525+ }
526+
512527 if (include_authorized_operations) {
513528 rd_kafka_error_t *error =
514529 rd_kafka_AdminOptions_set_include_authorized_operations (
@@ -561,6 +576,67 @@ Baton AdminClient::DescribeGroups(std::vector<std::string> &groups,
561576 }
562577}
563578
579+ Baton AdminClient::DeleteGroups (rd_kafka_DeleteGroup_t **group_list,
580+ size_t group_cnt, int timeout_ms,
581+ /* out */ rd_kafka_event_t **event_response) {
582+ if (!IsConnected ()) {
583+ return Baton (RdKafka::ERR__STATE);
584+ }
585+
586+ {
587+ scoped_shared_write_lock lock (m_connection_lock);
588+ if (!IsConnected ()) {
589+ return Baton (RdKafka::ERR__STATE);
590+ }
591+
592+ // Make admin options to establish that we are deleting groups
593+ rd_kafka_AdminOptions_t *options = rd_kafka_AdminOptions_new (
594+ m_client->c_ptr (), RD_KAFKA_ADMIN_OP_DELETEGROUPS);
595+
596+ char errstr[512 ];
597+ rd_kafka_resp_err_t err = rd_kafka_AdminOptions_set_request_timeout (
598+ options, timeout_ms, errstr, sizeof (errstr));
599+ if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
600+ return Baton (static_cast <RdKafka::ErrorCode>(err), errstr);
601+ }
602+
603+ // Create queue just for this operation.
604+ rd_kafka_queue_t *rkqu = rd_kafka_queue_new (m_client->c_ptr ());
605+
606+ rd_kafka_DeleteGroups (m_client->c_ptr (), group_list, group_cnt, options,
607+ rkqu);
608+
609+ // Poll for an event by type in that queue
610+ // DON'T destroy the event. It is the out parameter, and ownership is
611+ // the caller's.
612+ *event_response =
613+ PollForEvent (rkqu, RD_KAFKA_EVENT_DELETEGROUPS_RESULT, timeout_ms);
614+
615+ // Destroy the queue since we are done with it.
616+ rd_kafka_queue_destroy (rkqu);
617+
618+ // Destroy the options we just made because we polled already
619+ rd_kafka_AdminOptions_destroy (options);
620+
621+ // If we got no response from that operation, this is a failure
622+ // likely due to time out
623+ if (*event_response == NULL ) {
624+ return Baton (RdKafka::ERR__TIMED_OUT);
625+ }
626+
627+ // Now we can get the error code from the event
628+ if (rd_kafka_event_error (*event_response)) {
629+ // If we had a special error code, get out of here with it
630+ const rd_kafka_resp_err_t errcode = rd_kafka_event_error (*event_response);
631+ return Baton (static_cast <RdKafka::ErrorCode>(errcode));
632+ }
633+
634+ // At this point, event_response contains the result, which needs
635+ // to be parsed/converted by the caller.
636+ return Baton (RdKafka::ERR_NO_ERROR);
637+ }
638+ }
639+
564640void AdminClient::ActivateDispatchers () {
565641 // Listen to global config
566642 m_gconfig->listen ();
@@ -831,4 +907,54 @@ NAN_METHOD(AdminClient::NodeDescribeGroups) {
831907 timeout_ms));
832908}
833909
910+ /* *
911+ * Delete Consumer Groups.
912+ */
913+ NAN_METHOD (AdminClient::NodeDeleteGroups) {
914+ Nan::HandleScope scope;
915+
916+ if (info.Length () < 3 || !info[2 ]->IsFunction ()) {
917+ // Just throw an exception
918+ return Nan::ThrowError (" Need to specify a callback" );
919+ }
920+
921+ if (!info[0 ]->IsArray ()) {
922+ return Nan::ThrowError (" Must provide group name array" );
923+ }
924+
925+ if (!info[1 ]->IsObject ()) {
926+ return Nan::ThrowError (" Must provide options object" );
927+ }
928+
929+ // Get list of group names to delete, and convert it into an
930+ // rd_kafka_DeleteGroup_t array.
931+ v8::Local<v8::Array> group_names = info[0 ].As <v8::Array>();
932+ if (group_names->Length () == 0 ) {
933+ return Nan::ThrowError (" Must provide at least one group name" );
934+ }
935+ std::vector<std::string> group_names_vector =
936+ v8ArrayToStringVector (group_names);
937+
938+ // The ownership of this array is transferred to the worker.
939+ rd_kafka_DeleteGroup_t **group_list = static_cast <rd_kafka_DeleteGroup_t **>(
940+ malloc (sizeof (rd_kafka_DeleteGroup_t *) * group_names_vector.size ()));
941+ for (size_t i = 0 ; i < group_names_vector.size (); i++) {
942+ group_list[i] = rd_kafka_DeleteGroup_new (group_names_vector[i].c_str ());
943+ }
944+
945+ v8::Local<v8::Object> config = info[1 ].As <v8::Object>();
946+
947+ // Get the timeout - default 5000.
948+ int timeout_ms = GetParameter<int64_t >(config, " timeout" , 5000 );
949+
950+ // Create the final callback object
951+ v8::Local<v8::Function> cb = info[2 ].As <v8::Function>();
952+ Nan::Callback *callback = new Nan::Callback (cb);
953+ AdminClient *client = ObjectWrap::Unwrap<AdminClient>(info.This ());
954+
955+ // Queue the work.
956+ Nan::AsyncQueueWorker (new Workers::AdminClientDeleteGroups (
957+ callback, client, group_list, group_names_vector.size (), timeout_ms));
958+ }
959+
834960} // namespace NodeKafka
0 commit comments