Skip to content

Commit 998c0b8

Browse files
authored
Add highwatermark offset to eachBatch callback (#317)
* Add highwatermark offsets to eachBatch callback * Update changelog * Add additional error handling * Change error to warning * Change MIGRATION.md
1 parent c37401c commit 998c0b8

File tree

5 files changed

+93
-4
lines changed

5 files changed

+93
-4
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,16 @@
22

33
v1.5.0 is a feature release. It is supported for all usage.
44

5+
### Enhancements
6+
7+
1. Adds support for `highWatermark`, `offsetLag()`, and `offsetLagLow()` in `eachBatch` callback (#317).
8+
9+
510
## Fixes
611

712
1. Fix issue of delay of up to 5s in receiving messages after pause and resume, or seek (#285, #363).
813

14+
915
# confluent-kafka-javascript v1.4.1
1016

1117
v1.4.1 is a maintenance release. It is supported for all usage.

MIGRATION.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ producerRun().then(consumerRun).catch(console.error);
308308
to change.
309309
The property `eachBatchAutoResolve` is supported.
310310
Within the `eachBatch` callback, use of `uncommittedOffsets` is unsupported,
311-
and within the returned batch, `offsetLag` and `offsetLagLow` are unsupported.
311+
and within the returned batch, `offsetLag` and `offsetLagLow` are supported.
312312
* `commitOffsets`:
313313
- Does not yet support sending metadata for topic partitions being committed.
314314
- If called with no arguments, it commits all offsets passed to the user (or the stored offsets, if manually handling offset storage using `consumer.storeOffsets`).

lib/kafkajs/_consumer.js

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -880,6 +880,25 @@ class Consumer {
880880
#createBatchPayload(messages) {
881881
const topic = messages[0].topic;
882882
const partition = messages[0].partition;
883+
let watermarkOffsets = {};
884+
let highWatermark = '-1001';
885+
let offsetLag_ = -1;
886+
let offsetLagLow_ = -1;
887+
888+
try {
889+
watermarkOffsets = this.#internalClient.getWatermarkOffsets(topic, partition);
890+
} catch (e) {
891+
/* Only warn. The batch as a whole remains valid but for the fact that the highwatermark won't be there. */
892+
this.#logger.warn(`Could not get watermark offsets for batch: ${e}`, this.#createConsumerBindingMessageMetadata());
893+
}
894+
895+
if (Number.isInteger(watermarkOffsets.highOffset)) {
896+
highWatermark = watermarkOffsets.highOffset.toString();
897+
/* While calculating lag, we subtract 1 from the high offset
898+
* for compatibility reasons with KafkaJS's API */
899+
offsetLag_ = (watermarkOffsets.highOffset - 1) - messages[messages.length - 1].offset;
900+
offsetLagLow_ = (watermarkOffsets.highOffset - 1) - messages[0].offset;
901+
}
883902

884903
const messagesConverted = [];
885904
for (let i = 0; i < messages.length; i++) {
@@ -909,13 +928,13 @@ class Consumer {
909928
const batch = {
910929
topic,
911930
partition,
912-
highWatermark: '-1001', /* We don't fetch it yet. We can call committed() to fetch it but that might incur network calls. */
931+
highWatermark,
913932
messages: messagesConverted,
914933
isEmpty: () => false,
915934
firstOffset: () => (messagesConverted[0].offset).toString(),
916935
lastOffset: () => (messagesConverted[messagesConverted.length - 1].offset).toString(),
917-
offsetLag: () => notImplemented(),
918-
offsetLagLow: () => notImplemented(),
936+
offsetLag: () => offsetLag_.toString(),
937+
offsetLagLow: () => offsetLagLow_.toString(),
919938
};
920939

921940
const returnPayload = {

test/promisified/consumer/consumeMessages.spec.js

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -845,4 +845,66 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit
845845
await waitFor(() => (messagesConsumed === messages.length || batchesCountExceeds1), () => { }, 100);
846846
expect(batchesCountExceeds1).toBe(false);
847847
});
848+
849+
it('shows the correct high watermark and lag for partition', async () => {
850+
await producer.connect();
851+
852+
let messages0 = Array(10).fill().map(() => {
853+
const value = secureRandom();
854+
return { value: `value-${value}`, partition: 0 };
855+
});
856+
let partition0ProducedMessages = messages0.length;
857+
858+
const messages1 = Array(5).fill().map(() => {
859+
const value = secureRandom();
860+
return { value: `value-${value}`, partition: 1 };
861+
});
862+
863+
const messages2 = Array(2).fill().map(() => {
864+
const value = secureRandom();
865+
return { value: `value-${value}`, partition: 2 };
866+
});
867+
868+
for (const messages of [messages0, messages1, messages2]) {
869+
await producer.send({
870+
topic: topicName,
871+
messages: messages,
872+
});
873+
}
874+
875+
await consumer.connect();
876+
await consumer.subscribe({ topic: topicName });
877+
878+
let messagesConsumed = 0;
879+
consumer.run({
880+
partitionsConsumedConcurrently,
881+
eachBatch: async ({ batch }) => {
882+
if (batch.partition === 0) {
883+
expect(batch.highWatermark).toEqual(String(partition0ProducedMessages));
884+
} else if (batch.partition === 1) {
885+
expect(batch.highWatermark).toEqual(String(messages1.length));
886+
} else if (batch.partition === 2) {
887+
expect(batch.highWatermark).toEqual(String(messages2.length));
888+
}
889+
expect(batch.offsetLag()).toEqual(String(+batch.highWatermark - 1 - +batch.lastOffset()));
890+
expect(batch.offsetLagLow()).toEqual(String(+batch.highWatermark - 1 - +batch.firstOffset()));
891+
messagesConsumed += batch.messages.length;
892+
}
893+
});
894+
await waitFor(() => (messagesConsumed === (partition0ProducedMessages + messages1.length + messages2.length)), () => { }, 100);
895+
896+
/* Add some more messages to partition 0 to make sure high watermark is updated. */
897+
messages0 = Array(15).fill().map(() => {
898+
const value = secureRandom();
899+
return { value: `value-${value}`, partition: 0 };
900+
});
901+
partition0ProducedMessages += messages0.length;
902+
await producer.send({
903+
topic: topicName,
904+
messages: messages0,
905+
});
906+
907+
await waitFor(() => (messagesConsumed === (partition0ProducedMessages + messages1.length + messages2.length)), () => { }, 100);
908+
});
909+
848910
});

types/kafkajs.d.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,8 @@ export type Batch = {
280280
isEmpty(): boolean
281281
firstOffset(): string | null
282282
lastOffset(): string
283+
offsetLag(): string
284+
offsetLagLow(): string
283285
}
284286

285287
export type KafkaMessage = MessageSetEntry | RecordBatchEntry

0 commit comments

Comments
 (0)