Skip to content

Commit c2ec363

Browse files
author
Daniel Batica
committed
feat: added header support for send, tested using our tools
1 parent bd2fc1d commit c2ec363

File tree

2 files changed

+11
-4
lines changed

2 files changed

+11
-4
lines changed

src/lib/client/NativeKafkaClient.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,9 +185,9 @@ export class NativeKafkaClient extends KafkaClient {
185185

186186
if (!this.producer) {
187187
return Promise.reject("producer is not yet setup.");
188-
}
188+
}
189189

190-
return this.producer.send(topicName, message, partition, key, partitionKey, opaqueKey, headers);
190+
return this.producer.send(topicName, message, partition, key, partitionKey, headers);
191191
}
192192

193193
/**

src/lib/messageProduceHandle.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ const hasKVStructure = message => {
3636
* @param version - optional
3737
* @returns {Promise<void>}
3838
*/
39-
const produceTypeSelection = (produceType, kafka, compressionType, topic, partition, key, value, partitionKey = null, opaqueKey = null, version = 1) => {
39+
const produceTypeSelection = (produceType, kafka, compressionType, topic, partition, key, value, headers, partitionKey = null, opaqueKey = null, version = 1) => {
4040

4141
debug("producing", produceType, topic, partition, key, partitionKey, opaqueKey, version, value);
4242

@@ -46,7 +46,7 @@ const produceTypeSelection = (produceType, kafka, compressionType, topic, partit
4646
switch (produceType) {
4747

4848
case PRODUCE_TYPES.SEND:
49-
return kafka.send(topic, value, partition, key, partitionKey, opaqueKey);
49+
return kafka.send(topic, value, partition, key, partitionKey, opaqueKey, headers);
5050

5151
case PRODUCE_TYPES.BUFFER:
5252
return kafka.buffer(topic, key, value, compressionType, partition, version, partitionKey);
@@ -80,6 +80,7 @@ export const messageProduceHandle = (kafka, message, outputTopicName, produceTyp
8080
let _partitionKey = null;
8181
let _opaqueKey = null;
8282
let _value = message;
83+
let _headers = [];
8384

8485
//overwrite default values or configured values
8586
//with on demand message settings
@@ -108,6 +109,10 @@ export const messageProduceHandle = (kafka, message, outputTopicName, produceTyp
108109
_opaqueKey = message.opaqueKey;
109110
}
110111

112+
if (typeof message.headers !== "undefined" && Array.isArray(message.headers)) {
113+
_headers = message.headers;
114+
}
115+
111116
_value = message.value;
112117
}
113118

@@ -118,6 +123,7 @@ export const messageProduceHandle = (kafka, message, outputTopicName, produceTyp
118123
partition: _partition,
119124
key: _key,
120125
value: _value,
126+
headers: _headers,
121127
partitionKey: _partitionKey,
122128
opaqueKey: _opaqueKey
123129
};
@@ -137,6 +143,7 @@ export const messageProduceHandle = (kafka, message, outputTopicName, produceTyp
137143
_partition,
138144
_key,
139145
_value,
146+
_headers,
140147
_partitionKey,
141148
_opaqueKey,
142149
_version

0 commit comments

Comments
 (0)