@@ -115,6 +115,7 @@ void AdminClient::Init(v8::Local<v8::Object> exports) {
115115 Nan::SetPrototypeMethod (tpl, " createTopic" , NodeCreateTopic);
116116 Nan::SetPrototypeMethod (tpl, " deleteTopic" , NodeDeleteTopic);
117117 Nan::SetPrototypeMethod (tpl, " createPartitions" , NodeCreatePartitions);
118+ Nan::SetPrototypeMethod (tpl, " deleteRecords" , NodeDeleteRecords);
118119
119120 // Consumer group related operations
120121 Nan::SetPrototypeMethod (tpl, " listGroups" , NodeListGroups);
@@ -743,6 +744,74 @@ Baton AdminClient::ListConsumerGroupOffsets(
743744 }
744745}
745746
747+ Baton AdminClient::DeleteRecords (rd_kafka_DeleteRecords_t **del_records,
748+ size_t del_records_cnt,
749+ int operation_timeout_ms, int timeout_ms,
750+ rd_kafka_event_t **event_response) {
751+ if (!IsConnected ()) {
752+ return Baton (RdKafka::ERR__STATE);
753+ }
754+
755+ {
756+ scoped_shared_write_lock lock (m_connection_lock);
757+ if (!IsConnected ()) {
758+ return Baton (RdKafka::ERR__STATE);
759+ }
760+
761+ // Make admin options to establish that we are deleting records
762+ rd_kafka_AdminOptions_t *options = rd_kafka_AdminOptions_new (
763+ m_client->c_ptr (), RD_KAFKA_ADMIN_OP_DELETERECORDS);
764+
765+ char errstr[512 ];
766+ rd_kafka_resp_err_t err = rd_kafka_AdminOptions_set_request_timeout (
767+ options, timeout_ms, errstr, sizeof (errstr));
768+ if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
769+ return Baton (static_cast <RdKafka::ErrorCode>(err), errstr);
770+ }
771+
772+ err = rd_kafka_AdminOptions_set_operation_timeout (
773+ options, operation_timeout_ms, errstr, sizeof (errstr));
774+ if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
775+ return Baton (static_cast <RdKafka::ErrorCode>(err), errstr);
776+ }
777+
778+ // Create queue just for this operation.
779+ rd_kafka_queue_t *rkqu = rd_kafka_queue_new (m_client->c_ptr ());
780+
781+ rd_kafka_DeleteRecords (m_client->c_ptr (), del_records,
782+ del_records_cnt, options, rkqu);
783+
784+ // Poll for an event by type in that queue
785+ // DON'T destroy the event. It is the out parameter, and ownership is
786+ // the caller's.
787+ *event_response =
788+ PollForEvent (rkqu, RD_KAFKA_EVENT_DELETERECORDS_RESULT, timeout_ms);
789+
790+ // Destroy the queue since we are done with it.
791+ rd_kafka_queue_destroy (rkqu);
792+
793+ // Destroy the options we just made because we polled already
794+ rd_kafka_AdminOptions_destroy (options);
795+
796+ // If we got no response from that operation, this is a failure
797+ // likely due to time out
798+ if (*event_response == NULL ) {
799+ return Baton (RdKafka::ERR__TIMED_OUT);
800+ }
801+
802+ // Now we can get the error code from the event
803+ if (rd_kafka_event_error (*event_response)) {
804+ // If we had a special error code, get out of here with it
805+ const rd_kafka_resp_err_t errcode = rd_kafka_event_error (*event_response);
806+ return Baton (static_cast <RdKafka::ErrorCode>(errcode));
807+ }
808+
809+ // At this point, event_response contains the result, which needs
810+ // to be parsed/converted by the caller.
811+ return Baton (RdKafka::ERR_NO_ERROR);
812+ }
813+ }
814+
746815void AdminClient::ActivateDispatchers () {
747816 // Listen to global config
748817 m_gconfig->listen ();
@@ -1147,10 +1216,7 @@ NAN_METHOD(AdminClient::NodeListConsumerGroupOffsets) {
11471216 }
11481217
11491218 // Now process the second argument: options (timeout and requireStableOffsets)
1150- v8::Local<v8::Object> options = Nan::New<v8::Object>();
1151- if (info.Length () > 2 && info[1 ]->IsObject ()) {
1152- options = info[1 ].As <v8::Object>();
1153- }
1219+ v8::Local<v8::Object> options = info[1 ].As <v8::Object>();
11541220
11551221 bool require_stable_offsets =
11561222 GetParameter<bool >(options, " requireStableOffsets" , false );
@@ -1167,4 +1233,69 @@ NAN_METHOD(AdminClient::NodeListConsumerGroupOffsets) {
11671233 require_stable_offsets, timeout_ms));
11681234}
11691235
1236+ /* *
1237+ * Delete Records.
1238+ */
1239+ NAN_METHOD (AdminClient::NodeDeleteRecords) {
1240+ Nan::HandleScope scope;
1241+
1242+ if (info.Length () < 3 || !info[2 ]->IsFunction ()) {
1243+ return Nan::ThrowError (" Need to specify a callback" );
1244+ }
1245+
1246+ if (!info[0 ]->IsArray ()) {
1247+ return Nan::ThrowError (
1248+ " Must provide array containg 'TopicPartitionOffset' objects" );
1249+ }
1250+
1251+ if (!info[1 ]->IsObject ()) {
1252+ return Nan::ThrowError (" Must provide 'options' object" );
1253+ }
1254+
1255+ // Get list of TopicPartitions to delete records from
1256+ // and convert it into rd_kafka_DeleteRecords_t array
1257+ v8::Local<v8::Array> delete_records_list = info[0 ].As <v8::Array>();
1258+
1259+ if (delete_records_list->Length () == 0 ) {
1260+ return Nan::ThrowError (" Must provide at least one TopicPartitionOffset" );
1261+ }
1262+
1263+ /* *
1264+ * The ownership of this is taken by
1265+ * Workers::AdminClientDeleteRecords and freeing it is also handled
1266+ * by that class.
1267+ */
1268+ rd_kafka_DeleteRecords_t **delete_records =
1269+ static_cast <rd_kafka_DeleteRecords_t **>(
1270+ malloc (sizeof (rd_kafka_DeleteRecords_t *) * 1 ));
1271+
1272+ rd_kafka_topic_partition_list_t *partitions =
1273+ Conversion::TopicPartition::TopicPartitionv8ArrayToTopicPartitionList (
1274+ delete_records_list, true );
1275+ if (partitions == NULL ) {
1276+ return Nan::ThrowError (
1277+ " Failed to convert objects in delete records list, provide proper "
1278+ " TopicPartitionOffset objects" );
1279+ }
1280+ delete_records[0 ] = rd_kafka_DeleteRecords_new (partitions);
1281+
1282+ rd_kafka_topic_partition_list_destroy (partitions);
1283+
1284+ // Now process the second argument: options (timeout and operation_timeout)
1285+ v8::Local<v8::Object> options = info[1 ].As <v8::Object>();
1286+
1287+ int operation_timeout_ms =
1288+ GetParameter<int64_t >(options, " operation_timeout" , 60000 );
1289+ int timeout_ms = GetParameter<int64_t >(options, " timeout" , 5000 );
1290+
1291+ // Create the final callback object
1292+ v8::Local<v8::Function> cb = info[2 ].As <v8::Function>();
1293+ Nan::Callback *callback = new Nan::Callback (cb);
1294+ AdminClient *client = ObjectWrap::Unwrap<AdminClient>(info.This ());
1295+
1296+ // Queue the worker to process the offset fetch request asynchronously
1297+ Nan::AsyncQueueWorker (new Workers::AdminClientDeleteRecords (
1298+ callback, client, delete_records, 1 , operation_timeout_ms, timeout_ms));
1299+ }
1300+
11701301} // namespace NodeKafka
0 commit comments