From 51c104dd4840ace1f90f8bbc33ec395aae25f4f9 Mon Sep 17 00:00:00 2001 From: Branislav Katreniak Date: Mon, 6 Nov 2023 13:31:05 +0100 Subject: [PATCH 1/3] e2e/both: fix default consume timeout in reaching eof test One round poll to broker is needed to read the message. Second round poll to broker is needed to detect EOF. The test is asserting that we get message and EOF within 2s. But the consumer is configured with 'fetch.wait.max.ms': 1000. We don't fit 2 timout out polls within 2s. We need to increase the consume timeout to 3s. --- e2e/both.spec.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e/both.spec.js b/e2e/both.spec.js index a8289ec3..9898b659 100644 --- a/e2e/both.spec.js +++ b/e2e/both.spec.js @@ -229,7 +229,7 @@ describe('Consumer/Producer', function() { setTimeout(function() { producer.produce(topic, null, buffer, null); }, 500) - consumer.setDefaultConsumeTimeout(2000); + consumer.setDefaultConsumeTimeout(3000); consumer.consume(1000, function(err, messages) { t.ifError(err); t.equal(messages.length, 1); From 095352acf4710064d42ff3991c26a8021dcf4316 Mon Sep 17 00:00:00 2001 From: Branislav Katreniak Date: Mon, 6 Nov 2023 13:37:57 +0100 Subject: [PATCH 2/3] e2e/both: fix default consume timeout in already at eof test One round poll to broker is needed to read the message. Second round poll to broker is needed to detect EOF. The test is asserting that we get EOF, data and EOF when data is produced at 2nd second. But the consumer is configured with 'fetch.wait.max.ms': 1000. We read the data in poll to broker slightly after 3rd second. We detect the 2nd EOF slightly after 4th second. We need to increase the consume timeout to 5s. --- e2e/both.spec.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e/both.spec.js b/e2e/both.spec.js index 9898b659..e54f4e29 100644 --- a/e2e/both.spec.js +++ b/e2e/both.spec.js @@ -262,7 +262,7 @@ describe('Consumer/Producer', function() { setTimeout(function() { producer.produce(topic, null, buffer, null); }, 2000) - consumer.setDefaultConsumeTimeout(3000); + consumer.setDefaultConsumeTimeout(5000); consumer.consume(1000, function(err, messages) { t.ifError(err); t.equal(messages.length, 1); From 1a6877a70eb5ca7e444450e8b23d560426d5cad2 Mon Sep 17 00:00:00 2001 From: Branislav Katreniak Date: Mon, 6 Nov 2023 14:17:03 +0100 Subject: [PATCH 3/3] consumer: fix consumeNum to respect the consume timeout Steps to reproduce: * configure consumer with setDefaultConsumeTimeout(1000) * produce 1 message every 500ms * call consumer.consume(128, cb) Actual outcome: * consume returns 128 messages after 64 seconds Expected outcome: * consumer returns ~2 messages after 1 second KafkaConsumerConsumeNum call underlaying c++ m_consumer->Consume in cycle until * either the accumulated batch is full * or the call to c++ m_consumer->Consume times out on the total timeout KafkaConsumerConsumeNum must enforce its timeout over all m_consumer->Consume invocations altogether. --- e2e/both.spec.js | 43 +++++++++++++++++++++++++++++++++++++++++++ src/workers.cc | 5 +++++ 2 files changed, 48 insertions(+) diff --git a/e2e/both.spec.js b/e2e/both.spec.js index e54f4e29..452227d5 100644 --- a/e2e/both.spec.js +++ b/e2e/both.spec.js @@ -272,6 +272,49 @@ describe('Consumer/Producer', function() { }); }); + it('should stop consuming batch after consume timeout', function(done) { + crypto.randomBytes(4096, function(ex, buffer) { + producer.setPollInterval(10); + + producer.once('delivery-report', function(err, report) { + t.ifError(err); + }); + + consumer.subscribe([topic]); + + var events = []; + + consumer.once('data', function(msg) { + events.push("data"); + }); + + consumer.on('partition.eof', function(eof) { + events.push("partition.eof"); + }); + + let timeoutId; + let toProduce = 10; + produceLoop = () => { + producer.produce(topic, null, buffer, null); + if (--toProduce > 0) { + timeoutId = setTimeout(produceLoop, 500); + } + }; + produceLoop(); + + consumer.setDefaultConsumeTimeout(2000); + const startedAt = Date.now(); + consumer.consume(100, function(err, messages) { + t.ifError(err); + t(Date.now() - startedAt < 3000, 'Consume took longer than consume timeout'); + t(messages.length > 2, 'Too few messages consumed within batch'); + t(messages.length < 8, 'Too many messages consumed within batch'); + clearTimeout(timeoutId); + done(); + }); + }); + }); + it('should be able to produce and consume messages: consumeLoop', function(done) { var key = 'key'; diff --git a/src/workers.cc b/src/workers.cc index 55d3dd50..b4ac77a1 100644 --- a/src/workers.cc +++ b/src/workers.cc @@ -7,6 +7,8 @@ * of the MIT license. See the LICENSE.txt file for details. */ +#include +#include #include #include @@ -810,6 +812,7 @@ void KafkaConsumerConsumeNum::Execute() { bool looping = true; int timeout_ms = m_timeout_ms; std::size_t eof_event_count = 0; + const auto end = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms); while (m_messages.size() - eof_event_count < max && looping) { // Get a message @@ -838,6 +841,8 @@ void KafkaConsumerConsumeNum::Execute() { break; case RdKafka::ERR_NO_ERROR: m_messages.push_back(b.data()); + timeout_ms = std::max(0, static_cast(std::chrono::duration_cast( + end - std::chrono::steady_clock::now()).count())); break; default: // Set the error for any other errors and break