Skip to content

Commit 00187f4

Browse files
committed
Fix use warmup messages with headers in case messages from other partitions are consumed first
1 parent bd14669 commit 00187f4

File tree

3 files changed

+14
-6
lines changed

3 files changed

+14
-6
lines changed

ci/tests/run_perf_test.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,8 @@ async function main() {
9797
const TERMINATE_TIMEOUT_MS_CONSUMERS = TERMINATE_TIMEOUT_MS + INITIAL_DELAY_MS + 2000;
9898
const TERMINATE_TIMEOUT_MS_LAG_MONITORING = TERMINATE_TIMEOUT_MS + 1000;
9999

100-
await runCommand(`MODE=${mode} node performance-consolidated.js --create-topics`);
100+
const createTopicResult = await runCommand(`MODE=${mode} node performance-consolidated.js --create-topics`);
101+
let ret = [createTopicResult];
101102

102103
console.log(`Waiting 10s after topic creation before starting producer and consumers...`);
103104
await new Promise(resolve => setTimeout(resolve, 10000));
@@ -118,7 +119,7 @@ async function main() {
118119
allPromises.push(runCommand(`MODE=${mode} INITIAL_DELAY_MS=${INITIAL_DELAY_MS} TERMINATE_TIMEOUT_MS=${TERMINATE_TIMEOUT_MS_LAG_MONITORING} GROUPID_MONITOR=${groupIdEachBatch} node performance-consolidated.js --monitor-lag`));
119120
}
120121
const results = await Promise.allSettled(allPromises);
121-
return results.map(r => r.status === 'fulfilled' ? r.value : '').join('\n');
122+
return ret.concat(results.map(r => r.status === 'fulfilled' ? r.value : '')).join('\n');
122123
} else {
123124
console.log(`Running ${modeLabel} Producer/Consumer test...`);
124125
return runCommand(`MODE=${mode} MESSAGE_COUNT=${messageCount} GROUPID_MESSAGE=${groupIdEachMessage} GROUPID_BATCH=${groupIdEachBatch} node performance-consolidated.js --create-topics ${consumerParam} ${produceToSecondTopicParam} --producer`);

examples/performance/performance-consolidated.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ const randomness = process.env.RANDOMNESS ? +process.env.RANDOMNESS : 0.5;
2828
const initialDelayMs = process.env.INITIAL_DELAY_MS ? +process.env.INITIAL_DELAY_MS : 0;
2929
const numPartitions = process.env.PARTITIONS ? +process.env.PARTITIONS : 3;
3030
const partitionsConsumedConcurrently = process.env.PARTITIONS_CONSUMED_CONCURRENTLY ? +process.env.PARTITIONS_CONSUMED_CONCURRENTLY : 1;
31-
const warmupMessages = process.env.WARMUP_MESSAGES ? +process.env.WARMUP_MESSAGES : (batchSize * 10);
31+
const warmupMessages = process.env.WARMUP_MESSAGES ? +process.env.WARMUP_MESSAGES : ((batchSize * 10 > 1024) ? 1024 : (batchSize * 10));
3232
const messageProcessTimeMs = process.env.MESSAGE_PROCESS_TIME_MS ? +process.env.MESSAGE_PROCESS_TIME_MS : 5;
3333
const ctpConcurrency = process.env.CONSUME_TRANSFORM_PRODUCE_CONCURRENCY ? +process.env.CONSUME_TRANSFORM_PRODUCE_CONCURRENCY : 1;
3434
const consumerProcessingTime = process.env.CONSUMER_PROCESSING_TIME ? +process.env.CONSUMER_PROCESSING_TIME : 100;

examples/performance/performance-primitives-common.js

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
172172
const skippedMessages = warmupMessages;
173173

174174
const updateLatency = (receivedAt, numMessages, message, isT0T2) => {
175-
if (!stats)
175+
if (!stats || numMessages < 1)
176176
return;
177177

178178
if (!message.headers || !message.headers['timestamp']) {
@@ -402,11 +402,18 @@ async function runProducer(producer, topic, batchSize, warmupMessages, totalMess
402402

403403
console.log('Sending ' + warmupMessages + ' warmup messages.');
404404
while (warmupMessages > 0) {
405+
const toProduce = Math.min(batchSize, warmupMessages);
405406
await producer.send({
406407
topic,
407-
messages: messages.slice(0, batchSize)
408+
messages: messages.slice(0, toProduce).map((msg) => ({
409+
key: msg.key,
410+
value: msg.value,
411+
headers: {
412+
'timestamp': Date.now().toString(),
413+
}
414+
}))
408415
}, compression);
409-
warmupMessages -= batchSize;
416+
warmupMessages -= toProduce;
410417
}
411418
console.log('Sent warmup messages');
412419

0 commit comments

Comments
 (0)