Skip to content

Commit 17c5e3b

Browse files
authored
fix: calculate kafka topic uncosummed messages correctly (#3430)
Signed-off-by: Uroš Marolt <uros@marolt.me>
1 parent 139604e commit 17c5e3b

File tree

3 files changed

+54
-9
lines changed

3 files changed

+54
-9
lines changed

services/apps/cron_service/src/jobs/queueMonitoring.job.ts

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -204,10 +204,19 @@ async function getMessageCounts(
204204
if (topicOffset) {
205205
// Total messages is the latest offset
206206
totalMessages += Number(topicOffset.offset)
207-
// Consumed messages is the consumer group's offset
208-
consumedMessages += Number(offset.offset)
209-
// Unconsumed is the difference
210-
totalLeft += Number(topicOffset.offset) - Number(offset.offset)
207+
208+
// Handle -1 offsets (no committed offset)
209+
if (offset.offset === '-1') {
210+
// No committed offset means no messages consumed from this partition
211+
consumedMessages += 0
212+
// Unconsumed is the total messages in the partition
213+
totalLeft += Number(topicOffset.offset) - Number(topicOffset.low)
214+
} else {
215+
// Consumed messages is the consumer group's offset
216+
consumedMessages += Number(offset.offset)
217+
// Unconsumed is the difference
218+
totalLeft += Number(topicOffset.offset) - Number(offset.offset)
219+
}
211220
}
212221
}
213222

services/apps/integration_run_worker/src/queue/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ export class WorkerQueueReceiver extends PrioritizedQueueReciever {
4444

4545
override async processMessage(message: IQueueMessage): Promise<void> {
4646
try {
47-
this.log.trace({ messageType: message.type }, 'Processing message!')
47+
this.log.info({ messageType: message.type }, 'Processing message!')
4848

4949
const service = new IntegrationRunService(
5050
this.redisClient,

services/libs/queue/src/vendors/kafka/client.ts

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,32 +55,68 @@ export class KafkaQueueService extends LoggerBase implements IQueue {
5555
}
5656

5757
async getQueueMessageCount(conf: IKafkaChannelConfig): Promise<number> {
58-
const groupId = conf.name
5958
const topic = conf.name
59+
// The consumer group ID is the same as the topic name
60+
const groupId = topic
6061

6162
const admin = this.client.admin()
6263
await admin.connect()
6364

6465
try {
66+
this.log.debug({ topic, groupId }, 'Fetching message count for topic and consumer group')
67+
6568
const topicOffsets = await admin.fetchTopicOffsets(topic)
69+
this.log.debug({ topic, groupId, topicOffsets }, 'Topic offsets fetched')
70+
6671
const offsetsResponse = await admin.fetchOffsets({
6772
groupId: groupId,
6873
topics: [topic],
6974
})
75+
this.log.debug({ topic, groupId, offsetsResponse }, 'Consumer group offsets fetched')
7076

7177
const offsets = offsetsResponse[0].partitions
78+
this.log.debug({ topic, groupId, offsets }, 'Consumer group offsets')
7279

7380
let totalLeft = 0
7481
for (const offset of offsets) {
7582
const topicOffset = topicOffsets.find((p) => p.partition === offset.partition)
76-
if (topicOffset.offset !== offset.offset) {
77-
totalLeft += Number(topicOffset.offset) - Number(offset.offset)
83+
if (topicOffset) {
84+
// If consumer offset is -1, it means no committed offset, so lag is the total messages in partition
85+
if (offset.offset === '-1') {
86+
const lag = Number(topicOffset.offset) - Number(topicOffset.low)
87+
totalLeft += lag
88+
this.log.debug(
89+
{
90+
partition: offset.partition,
91+
topicOffset: topicOffset.offset,
92+
topicLow: topicOffset.low,
93+
consumerOffset: offset.offset,
94+
lag,
95+
},
96+
'Partition lag calculated (no committed offset)',
97+
)
98+
} else {
99+
const lag = Number(topicOffset.offset) - Number(offset.offset)
100+
totalLeft += lag
101+
this.log.debug(
102+
{
103+
partition: offset.partition,
104+
topicOffset: topicOffset.offset,
105+
consumerOffset: offset.offset,
106+
lag,
107+
},
108+
'Partition lag calculated',
109+
)
110+
}
111+
} else {
112+
this.log.debug({ partition: offset.partition }, 'No topic offset found for partition')
78113
}
79114
}
80115

116+
this.log.debug({ topic, groupId, totalLeft }, 'Total messages left calculated')
81117
return totalLeft
82118
} catch (err) {
83-
this.log.error(err, 'Failed to get message count!')
119+
this.log.error({ topic, groupId, err }, 'Failed to get message count!')
84120
throw err
85121
} finally {
86122
await admin.disconnect()

0 commit comments

Comments
 (0)