From 42d3ab19aa7b1ed33834f1663d9dd64eabf07b1b Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Fri, 7 Nov 2025 21:04:21 +0530 Subject: [PATCH 1/6] Make consumer - consumer and poll wakeable --- CHANGELOG.md | 6 + src/confluent_kafka/src/Consumer.c | 211 +++++++- tests/test_Consumer.py | 760 ++++++++++++++++++++++++++++- 3 files changed, 960 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ce2b6a63b..4005b52e7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Confluent Python Client for Apache Kafka - CHANGELOG + +### Fixes + +- Fixed `Consumer.poll()` and `Consumer.consume()` blocking indefinitely and not responding to Ctrl+C (KeyboardInterrupt) signals. The implementation now uses a "wakeable poll" pattern that breaks long blocking calls into smaller chunks (200ms) and periodically re-acquires the Python GIL to check for pending signals. This allows Ctrl+C to properly interrupt blocking consumer operations. Fixes Issues [#209](https://github.com/confluentinc/confluent-kafka-python/issues/209) and [#807](https://github.com/confluentinc/confluent-kafka-python/issues/807). + + ## v2.12.1 - 2025-10-21 v2.12.1 is a maintenance release with the following fixes: diff --git a/src/confluent_kafka/src/Consumer.c b/src/confluent_kafka/src/Consumer.c index 9a747862d..9a2fcb7ca 100644 --- a/src/confluent_kafka/src/Consumer.c +++ b/src/confluent_kafka/src/Consumer.c @@ -106,9 +106,81 @@ static int Consumer_traverse (Handle *self, } +/**************************************************************************** + * + * Helper functions for implementing interruptible poll/consume operations + * that allow Ctrl+C to terminate blocking calls. See Issues #209 and #807. + * + * + ****************************************************************************/ + +/** + * @brief Calculate the timeout for the current chunk in wakeable poll pattern. + * + * @param total_timeout_ms Total timeout in milliseconds (-1 for infinite) + * @param chunk_count Current chunk iteration count (0-based) + * @param chunk_timeout_ms Chunk size in milliseconds (typically 200ms) + * @return int Chunk timeout in milliseconds, or 0 if total timeout expired + */ +static int calculate_chunk_timeout(int total_timeout_ms, int chunk_count, + int chunk_timeout_ms) { + if (total_timeout_ms < 0) { + /* Infinite timeout - use chunk size */ + return chunk_timeout_ms; + } else { + /* Finite timeout - calculate remaining */ + int remaining_ms = total_timeout_ms - (chunk_count * chunk_timeout_ms); + if (remaining_ms <= 0) { + /* Timeout expired */ + return 0; + } + return (remaining_ms < chunk_timeout_ms) ? remaining_ms : chunk_timeout_ms; + } +} + +/** + * @brief Check for pending signals between poll chunks. + * + * Re-acquires GIL, checks for signals, and handles cleanup if signal detected. + * This allows Ctrl+C to interrupt blocking poll/consume operations. + * + * @param self Consumer handle + * @param cs CallState structure (thread state will be updated) + * @return int 0 if no signal detected (continue), 1 if signal detected (should return NULL) + */ +static int check_signals_between_chunks(Handle *self, CallState *cs) { + /* Re-acquire GIL to check for signals */ + PyEval_RestoreThread(cs->thread_state); + + /* Check for pending signals (KeyboardInterrupt, etc.) */ + /* PyErr_CheckSignals() already set the exception */ + if (PyErr_CheckSignals() == -1) { + /* Note: GIL is already held, but CallState_end expects to restore it */ + /* Save thread state again so CallState_end can restore it properly */ + cs->thread_state = PyEval_SaveThread(); + if (!CallState_end(self, cs)) { + /* CallState_end detected signal and cleaned up */ + return 1; /* Signal detected */ + } + return 1; + } + /* Re-release GIL for next iteration */ + cs->thread_state = PyEval_SaveThread(); + return 0; /* No signal, continue */ +} +/**************************************************************************** + * + * + * Consumer Methods + * + * + * + * + ****************************************************************************/ + static PyObject *Consumer_subscribe (Handle *self, PyObject *args, PyObject *kwargs) { @@ -984,14 +1056,37 @@ static PyObject *Consumer_offsets_for_times (Handle *self, PyObject *args, #endif } - +/** + * @brief Poll for a single message from the subscribed topics. + * + * Instead of a single blocking call to rd_kafka_consumer_poll() with the + * full timeout, this function: + * 1. Splits the timeout into 200ms chunks + * 2. Calls rd_kafka_consumer_poll() with chunk timeout + * 3. Between chunks, re-acquires GIL and calls PyErr_CheckSignals() + * 4. If signal detected, returns NULL (raises KeyboardInterrupt) + * 5. Continues until message received, timeout expired, or signal detected + * + * + * @param self Consumer handle + * @param args Positional arguments (unused) + * @param kwargs Keyword arguments: + * - timeout (float, optional): Timeout in seconds. + * Default: -1.0 (infinite timeout) + * @return PyObject* Message object, None if timeout, or NULL on error + * (raises KeyboardInterrupt if signal detected) + */ static PyObject *Consumer_poll (Handle *self, PyObject *args, PyObject *kwargs) { double tmout = -1.0f; static char *kws[] = { "timeout", NULL }; - rd_kafka_message_t *rkm; + rd_kafka_message_t *rkm = NULL; PyObject *msgobj; CallState cs; + const int CHUNK_TIMEOUT_MS = 200; /* 200ms chunks for signal checking */ + int total_timeout_ms; + int chunk_timeout_ms; + int chunk_count = 0; if (!self->rk) { PyErr_SetString(PyExc_RuntimeError, @@ -1002,16 +1097,43 @@ static PyObject *Consumer_poll (Handle *self, PyObject *args, if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|d", kws, &tmout)) return NULL; + total_timeout_ms = cfl_timeout_ms(tmout); + CallState_begin(self, &cs); - rkm = rd_kafka_consumer_poll(self->rk, cfl_timeout_ms(tmout)); + while (1) { + /* Calculate timeout for this chunk */ + chunk_timeout_ms = calculate_chunk_timeout(total_timeout_ms, chunk_count, + CHUNK_TIMEOUT_MS); + if (chunk_timeout_ms == 0) { + /* Timeout expired */ + break; + } + + /* Poll with chunk timeout */ + rkm = rd_kafka_consumer_poll(self->rk, chunk_timeout_ms); + + /* If we got a message, exit the loop */ + if (rkm) { + break; + } + + chunk_count++; + + /* Check for signals between chunks */ + if (check_signals_between_chunks(self, &cs)) { + return NULL; + } + } + /* Final GIL restore and signal check */ if (!CallState_end(self, &cs)) { if (rkm) rd_kafka_message_destroy(rkm); return NULL; } + /* Handle the message */ if (!rkm) Py_RETURN_NONE; @@ -1053,7 +1175,27 @@ static PyObject *Consumer_memberid (Handle *self, PyObject *args, return memberidobj; } - +/** + * @brief Consume a batch of messages from the subscribed topics. + * + * Instead of a single blocking call to rd_kafka_consume_batch_queue() with the + * full timeout, this function: + * 1. Splits the timeout into 200ms chunks + * 2. Calls rd_kafka_consume_batch_queue() with chunk timeout + * 3. Between chunks, re-acquires GIL and calls PyErr_CheckSignals() + * 4. If signal detected, returns NULL (raises KeyboardInterrupt) + * 5. Continues until messages received, timeout expired, or signal detected. + * + * @param self Consumer handle + * @param args Positional arguments (unused) + * @param kwargs Keyword arguments: + * - num_messages (int, optional): Maximum number of messages to + * consume per call. Default: 1. Maximum: 1000000. + * - timeout (float, optional): Timeout in seconds. + * Default: -1.0 (infinite timeout) + * @return PyObject* List of Message objects, empty list if timeout, or NULL on error + * (raises KeyboardInterrupt if signal detected) + */ static PyObject *Consumer_consume (Handle *self, PyObject *args, PyObject *kwargs) { unsigned int num_messages = 1; @@ -1063,7 +1205,11 @@ static PyObject *Consumer_consume (Handle *self, PyObject *args, PyObject *msglist; rd_kafka_queue_t *rkqu = self->u.Consumer.rkqu; CallState cs; - Py_ssize_t i, n; + Py_ssize_t i, n = 0; + const int CHUNK_TIMEOUT_MS = 200; /* 200ms chunks for signal checking */ + int total_timeout_ms; + int chunk_timeout_ms; + int chunk_count = 0; if (!self->rk) { PyErr_SetString(PyExc_RuntimeError, @@ -1081,14 +1227,53 @@ static PyObject *Consumer_consume (Handle *self, PyObject *args, return NULL; } - CallState_begin(self, &cs); + total_timeout_ms = cfl_timeout_ms(tmout); rkmessages = malloc(num_messages * sizeof(rd_kafka_message_t *)); + if (!rkmessages) { + PyErr_NoMemory(); + return NULL; + } + + CallState_begin(self, &cs); + + while (1) { + /* Calculate timeout for this chunk */ + chunk_timeout_ms = calculate_chunk_timeout(total_timeout_ms, chunk_count, + CHUNK_TIMEOUT_MS); + if (chunk_timeout_ms == 0) { + /* Timeout expired */ + break; + } + + /* Consume with chunk timeout */ + n = (Py_ssize_t)rd_kafka_consume_batch_queue(rkqu, chunk_timeout_ms, + rkmessages, num_messages); + + if (n < 0) { + /* Error - need to restore GIL before setting error */ + PyEval_RestoreThread(cs.thread_state); + free(rkmessages); + cfl_PyErr_Format(rd_kafka_last_error(), + "%s", rd_kafka_err2str(rd_kafka_last_error())); + return NULL; + } + + /* If we got messages, exit the loop */ + if (n > 0) { + break; + } - n = (Py_ssize_t)rd_kafka_consume_batch_queue(rkqu, - cfl_timeout_ms(tmout), - rkmessages, num_messages); + chunk_count++; + /* Check for signals between chunks */ + if (check_signals_between_chunks(self, &cs)) { + free(rkmessages); + return NULL; + } + } + + /* Final GIL restore and signal check */ if (!CallState_end(self, &cs)) { for (i = 0; i < n; i++) { rd_kafka_message_destroy(rkmessages[i]); @@ -1097,13 +1282,7 @@ static PyObject *Consumer_consume (Handle *self, PyObject *args, return NULL; } - if (n < 0) { - free(rkmessages); - cfl_PyErr_Format(rd_kafka_last_error(), - "%s", rd_kafka_err2str(rd_kafka_last_error())); - return NULL; - } - + /* Create Python list from messages */ msglist = PyList_New(n); for (i = 0; i < n; i++) { diff --git a/tests/test_Consumer.py b/tests/test_Consumer.py index 0b212d497..639470418 100644 --- a/tests/test_Consumer.py +++ b/tests/test_Consumer.py @@ -1,14 +1,35 @@ #!/usr/bin/env python +import os +import signal +import threading +import time + import pytest -from confluent_kafka import (Consumer, TopicPartition, KafkaError, +from confluent_kafka import (Consumer, Producer, TopicPartition, KafkaError, KafkaException, TIMESTAMP_NOT_AVAILABLE, OFFSET_INVALID) from tests.common import TestConsumer +def send_sigint_after_delay(delay_seconds): + """Send SIGINT to current process after delay. + + Utility function for testing interruptible poll/consume operations. + Used to simulate Ctrl+C in automated tests. + + Args: + delay_seconds: Delay in seconds before sending SIGINT + """ + time.sleep(delay_seconds) + try: + os.kill(os.getpid(), signal.SIGINT) + except Exception: + pass + + def test_basic_api(): """ Basic API tests, these wont really do anything since there is no broker configured. """ @@ -676,3 +697,740 @@ def __init__(self, config): with pytest.raises(RuntimeError, match="Consumer closed"): consumer.consumer_group_metadata() + + +def test_calculate_chunk_timeout_utility_function(): + """Test calculate_chunk_timeout() utility function through poll() API. + """ + # Assertion 1: Infinite timeout chunks forever with 200ms intervals + consumer1 = TestConsumer({ + 'group.id': 'test-chunk-infinite', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer1.subscribe(['test-topic']) + + interrupt_thread = threading.Thread(target=lambda: send_sigint_after_delay(0.3)) + interrupt_thread.daemon = True + interrupt_thread.start() + + start = time.time() + try: + consumer1.poll() # Infinite timeout - should chunk every 200ms + except KeyboardInterrupt: + elapsed = time.time() - start + # Should interrupt within ~0.5s (200ms chunk + overhead) + assert elapsed < 1.0, f"Assertion 1 failed: Infinite timeout chunking took {elapsed:.2f}s" + consumer1.close() + + # Assertion 2: Finite timeout exact multiple (1.0s = 5 chunks of 200ms) + consumer2 = TestConsumer({ + 'group.id': 'test-chunk-exact-multiple', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer2.subscribe(['test-topic']) + + start = time.time() + msg = consumer2.poll(timeout=1.0) # Exactly 1000ms (5 chunks) + elapsed = time.time() - start + + assert msg is None, "Assertion 2 failed: Expected None (timeout)" + assert 0.8 <= elapsed <= 1.2, f"Assertion 2 failed: Timeout took {elapsed:.2f}s, expected ~1.0s" + consumer2.close() + + # Assertion 3: Finite timeout not multiple (0.35s = 1 chunk + 150ms partial) + consumer3 = TestConsumer({ + 'group.id': 'test-chunk-not-multiple', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer3.subscribe(['test-topic']) + + start = time.time() + msg = consumer3.poll(timeout=0.35) # 350ms (1 full chunk + 150ms partial) + elapsed = time.time() - start + + assert msg is None, "Assertion 3 failed: Expected None (timeout)" + assert 0.25 <= elapsed <= 0.45, f"Assertion 3 failed: Timeout took {elapsed:.2f}s, expected ~0.35s" + consumer3.close() + + # Assertion 4: Very short timeout (< 200ms chunk size) + consumer4 = TestConsumer({ + 'group.id': 'test-chunk-very-short', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer4.subscribe(['test-topic']) + + start = time.time() + msg = consumer4.poll(timeout=0.05) # 50ms (less than 200ms chunk) + elapsed = time.time() - start + + assert msg is None, "Assertion 4 failed: Expected None (timeout)" + assert 0.03 <= elapsed <= 0.15, f"Assertion 4 failed: Timeout took {elapsed:.2f}s, expected ~0.05s (not 0.2s)" + consumer4.close() + + # Assertion 5: Zero timeout (non-blocking) + consumer5 = TestConsumer({ + 'group.id': 'test-chunk-zero', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer5.subscribe(['test-topic']) + + start = time.time() + msg = consumer5.poll(timeout=0.0) # Non-blocking + elapsed = time.time() - start + + assert elapsed < 0.1, f"Assertion 5 failed: Zero timeout took {elapsed:.2f}s, expected immediate return" + consumer5.close() + + # Assertion 6: Large finite timeout (10s = 50 chunks) + consumer6 = TestConsumer({ + 'group.id': 'test-chunk-large', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer6.subscribe(['test-topic']) + + start = time.time() + msg = consumer6.poll(timeout=10.0) # 10 seconds (50 chunks) + elapsed = time.time() - start + + assert msg is None, "Assertion 6 failed: Expected None (timeout)" + assert 9.5 <= elapsed <= 10.5, f"Assertion 6 failed: Timeout took {elapsed:.2f}s, expected ~10.0s" + consumer6.close() + + # Assertion 7: Finite timeout with interruption (chunk calculation continues correctly) + consumer7 = TestConsumer({ + 'group.id': 'test-chunk-finite-interrupt', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer7.subscribe(['test-topic']) + + interrupt_thread = threading.Thread(target=lambda: send_sigint_after_delay(0.4)) + interrupt_thread.daemon = True + interrupt_thread.start() + + start = time.time() + try: + consumer7.poll(timeout=1.0) # 1 second, but interrupt after 0.4s + except KeyboardInterrupt: + elapsed = time.time() - start + # Should interrupt quickly, not wait for full 1 second + assert elapsed < 1.0, f"Assertion 7 failed: Interruption took {elapsed:.2f}s, expected < 1.0s" + consumer7.close() + + +def test_check_signals_between_chunks_utility_function(): + """Test check_signals_between_chunks() utility function through poll() API. + """ + # Assertion 1: Signal detected on first chunk check + consumer1 = TestConsumer({ + 'group.id': 'test-signal-first-chunk', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer1.subscribe(['test-topic']) + + interrupt_thread = threading.Thread(target=lambda: send_sigint_after_delay(0.05)) + interrupt_thread.daemon = True + interrupt_thread.start() + + start = time.time() + try: + consumer1.poll() # Infinite timeout + except KeyboardInterrupt: + elapsed = time.time() - start + # Should interrupt within ~200ms (first chunk check) + assert elapsed < 0.5, f"Assertion 1 failed: First chunk signal check took {elapsed:.2f}s, expected < 0.5s" + consumer1.close() + + # Assertion 2: Signal detected on later chunk check + consumer2 = TestConsumer({ + 'group.id': 'test-signal-later-chunk', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer2.subscribe(['test-topic']) + + interrupt_thread = threading.Thread(target=lambda: send_sigint_after_delay(0.5)) + interrupt_thread.daemon = True + interrupt_thread.start() + + start = time.time() + try: + consumer2.poll() # Infinite timeout + except KeyboardInterrupt: + elapsed = time.time() - start + # Should interrupt within ~200ms of signal being sent (0.5s + 0.2s = 0.7s max) + assert elapsed < 0.8, f"Assertion 2 failed: Later chunk signal check took {elapsed:.2f}s, expected < 0.8s" + consumer2.close() + + # Assertion 3: No signal - continues polling (returns 0) + consumer3 = TestConsumer({ + 'group.id': 'test-signal-no-signal', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer3.subscribe(['test-topic']) + + start = time.time() + msg = consumer3.poll(timeout=0.5) # 500ms, no signal + elapsed = time.time() - start + + assert msg is None, "Assertion 3 failed: Expected None (timeout), no signal should not interrupt" + assert 0.4 <= elapsed <= 0.6, f"Assertion 3 failed: No signal timeout took {elapsed:.2f}s, expected ~0.5s" + consumer3.close() + + # Assertion 4: Signal checked every chunk (not just once) + consumer4 = TestConsumer({ + 'group.id': 'test-signal-every-chunk', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer4.subscribe(['test-topic']) + + # Send signal after 0.6 seconds (3 chunks should have passed) + interrupt_thread = threading.Thread(target=lambda: send_sigint_after_delay(0.6)) + interrupt_thread.daemon = True + interrupt_thread.start() + + start = time.time() + try: + consumer4.poll() # Infinite timeout + except KeyboardInterrupt: + elapsed = time.time() - start + # Should interrupt quickly after signal (within one chunk period) + # Signal sent at 0.6s, should interrupt by ~0.8s (0.6 + 0.2) + assert 0.6 <= elapsed <= 0.9, f"Assertion 4 failed: Every chunk check took {elapsed:.2f}s, expected 0.6-0.9s" + consumer4.close() + + # Assertion 5: Signal check works during finite timeout + consumer5 = TestConsumer({ + 'group.id': 'test-signal-finite-timeout', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer5.subscribe(['test-topic']) + + interrupt_thread = threading.Thread(target=lambda: send_sigint_after_delay(0.3)) + interrupt_thread.daemon = True + interrupt_thread.start() + + start = time.time() + try: + consumer5.poll(timeout=2.0) # 2 seconds, but interrupt after 0.3s + except KeyboardInterrupt: + elapsed = time.time() - start + # Should interrupt quickly, not wait for full 2 seconds + assert elapsed < 1.0, f"Assertion 5 failed: Signal during finite timeout took {elapsed:.2f}s, expected < 1.0s" + consumer5.close() + + +def test_wakeable_poll_utility_functions_interaction(): + """Test interaction between calculate_chunk_timeout() and check_signals_between_chunks(). + """ + # Assertion 1: Both functions work together - chunk calculation + signal check + consumer1 = TestConsumer({ + 'group.id': 'test-interaction-chunk-signal', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer1.subscribe(['test-topic']) + + interrupt_thread = threading.Thread(target=lambda: send_sigint_after_delay(0.4)) + interrupt_thread.daemon = True + interrupt_thread.start() + + start = time.time() + try: + consumer1.poll(timeout=1.0) # 1 second timeout, interrupt after 0.4s + except KeyboardInterrupt: + elapsed = time.time() - start + # Chunk calculation should work (200ms chunks), signal check should detect signal + # Should interrupt within ~0.6s (0.4s signal + 0.2s chunk) + assert elapsed < 0.8, f"Assertion 1 failed: Interaction test took {elapsed:.2f}s, expected < 0.8s" + # Verify it didn't wait for full 1 second timeout + assert elapsed < 1.0, f"Assertion 1 failed: Should interrupt before timeout, took {elapsed:.2f}s" + consumer1.close() + + # Assertion 2: Multiple chunks before signal - both functions work over multiple iterations + consumer2 = TestConsumer({ + 'group.id': 'test-interaction-multiple-chunks', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer2.subscribe(['test-topic']) + + # Send signal after 0.6 seconds (3 chunks should have passed: 0.2s, 0.4s, 0.6s) + interrupt_thread = threading.Thread(target=lambda: send_sigint_after_delay(0.6)) + interrupt_thread.daemon = True + interrupt_thread.start() + + start = time.time() + try: + consumer2.poll() # Infinite timeout + except KeyboardInterrupt: + elapsed = time.time() - start + # Chunk calculation should continue correctly (200ms each) + # Signal check should happen every chunk + # Should interrupt within ~0.8s (0.6s signal + 0.2s chunk) + assert 0.6 <= elapsed <= 0.9, f"Assertion 2 failed: Multiple chunks interaction took {elapsed:.2f}s, expected 0.6-0.9s" + # Verify chunking was happening (elapsed should be close to signal time + one chunk) + assert elapsed >= 0.6, f"Assertion 2 failed: Should wait for signal at 0.6s, but interrupted at {elapsed:.2f}s" + consumer2.close() + + +def test_poll_interruptibility_and_messages(): + """Test poll() interruptibility (main fix) and message handling. + """ + topic = 'test-poll-interrupt-topic' + + # Assertion 1: Infinite timeout can be interrupted immediately + consumer1 = TestConsumer({ + 'group.id': 'test-poll-infinite-immediate', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer1.subscribe([topic]) + + interrupt_thread = threading.Thread(target=lambda: send_sigint_after_delay(0.1)) + interrupt_thread.daemon = True + interrupt_thread.start() + + start = time.time() + try: + consumer1.poll() # Infinite timeout + assert False, "Assertion 1 failed: Should have raised KeyboardInterrupt" + except KeyboardInterrupt: + elapsed = time.time() - start + # Should interrupt within first chunk (~200ms) + assert elapsed < 0.5, f"Assertion 1 failed: Immediate interrupt took {elapsed:.2f}s, expected < 0.5s" + consumer1.close() + + # Assertion 2: Finite timeout can be interrupted before timeout expires + consumer2 = TestConsumer({ + 'group.id': 'test-poll-finite-interrupt', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer2.subscribe([topic]) + + interrupt_thread = threading.Thread(target=lambda: send_sigint_after_delay(0.3)) + interrupt_thread.daemon = True + interrupt_thread.start() + + start = time.time() + try: + consumer2.poll(timeout=2.0) # 2 seconds, but interrupt after 0.3s + assert False, "Assertion 2 failed: Should have raised KeyboardInterrupt" + except KeyboardInterrupt: + elapsed = time.time() - start + # Should interrupt quickly, not wait for full 2 seconds + assert elapsed < 1.0, f"Assertion 2 failed: Finite timeout interrupt took {elapsed:.2f}s, expected < 1.0s" + assert elapsed < 2.0, f"Assertion 2 failed: Should interrupt before timeout, took {elapsed:.2f}s" + consumer2.close() + + # Assertion 3: Signal sent after multiple chunks still interrupts quickly + consumer3 = TestConsumer({ + 'group.id': 'test-poll-multiple-chunks', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer3.subscribe([topic]) + + interrupt_thread = threading.Thread(target=lambda: send_sigint_after_delay(0.6)) + interrupt_thread.daemon = True + interrupt_thread.start() + + start = time.time() + try: + consumer3.poll() # Infinite timeout + assert False, "Assertion 3 failed: Should have raised KeyboardInterrupt" + except KeyboardInterrupt: + elapsed = time.time() - start + # Should interrupt within one chunk period after signal (0.6s + 0.2s = 0.8s max) + assert 0.6 <= elapsed <= 0.9, f"Assertion 3 failed: Multiple chunks interrupt took {elapsed:.2f}s, expected 0.6-0.9s" + consumer3.close() + + # Assertion 4: No signal - timeout works normally + consumer4 = TestConsumer({ + 'group.id': 'test-poll-timeout-normal', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer4.subscribe([topic]) + + start = time.time() + msg = consumer4.poll(timeout=0.5) # 500ms, no signal + elapsed = time.time() - start + + assert msg is None, "Assertion 4 failed: Expected None (timeout), no signal should not interrupt" + assert 0.4 <= elapsed <= 0.6, f"Assertion 4 failed: Normal timeout took {elapsed:.2f}s, expected ~0.5s" + consumer4.close() + + # Assertion 5: Message available - returns immediately + producer = Producer({'bootstrap.servers': 'localhost:9092'}) + producer.produce(topic, value=b'test-message') + producer.flush(timeout=1.0) + producer = None + + consumer5 = TestConsumer({ + 'bootstrap.servers': 'localhost:9092', + 'group.id': 'test-poll-message-available', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 6000, + 'auto.offset.reset': 'earliest' + }) + consumer5.subscribe([topic]) + + # Wait for subscription and message availability + time.sleep(2.0) + + start = time.time() + msg = consumer5.poll(timeout=2.0) + elapsed = time.time() - start + + # Message should be available and return quickly (after consumer is ready) + assert msg is not None, "Assertion 5 failed: Expected message, got None" + assert not msg.error(), f"Assertion 5 failed: Message has error: {msg.error()}" + # Allow more time for initial consumer setup, but once ready, should return quickly + assert elapsed < 2.5, f"Assertion 5 failed: Message available but took {elapsed:.2f}s, expected < 2.5s" + assert msg.value() == b'test-message', "Assertion 5 failed: Message value mismatch" + consumer5.close() + + +def test_poll_edge_cases(): + """Test poll() edge cases. + """ + topic = 'test-poll-edge-topic' + + # Assertion 1: Zero timeout returns immediately (non-blocking) + consumer1 = TestConsumer({ + 'group.id': 'test-poll-zero-timeout', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer1.subscribe([topic]) + + start = time.time() + msg = consumer1.poll(timeout=0.0) # Zero timeout + elapsed = time.time() - start + + assert elapsed < 0.1, f"Assertion 1 failed: Zero timeout took {elapsed:.2f}s, expected < 0.1s" + assert msg is None, "Assertion 1 failed: Zero timeout with no messages should return None" + consumer1.close() + + # Assertion 2: Closed consumer raises RuntimeError + consumer2 = TestConsumer({ + 'group.id': 'test-poll-closed', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000 + }) + consumer2.close() + + with pytest.raises(RuntimeError) as exc_info: + consumer2.poll(timeout=0.1) + assert 'Consumer closed' in str(exc_info.value), f"Assertion 2 failed: Expected 'Consumer closed' error, got: {exc_info.value}" + + # Assertion 3: Short timeout works correctly (no signal) + consumer3 = TestConsumer({ + 'group.id': 'test-poll-short-timeout', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer3.subscribe([topic]) + + start = time.time() + msg = consumer3.poll(timeout=0.1) # 100ms timeout + elapsed = time.time() - start + + assert msg is None, "Assertion 3 failed: Short timeout with no messages should return None" + assert 0.05 <= elapsed <= 0.2, f"Assertion 3 failed: Short timeout took {elapsed:.2f}s, expected ~0.1s" + consumer3.close() + + # Assertion 4: Very short timeout (less than chunk size) works + consumer4 = TestConsumer({ + 'group.id': 'test-poll-very-short', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer4.subscribe([topic]) + + start = time.time() + msg = consumer4.poll(timeout=0.05) # 50ms timeout (less than 200ms chunk) + elapsed = time.time() - start + + assert msg is None, "Assertion 4 failed: Very short timeout should return None" + assert elapsed < 0.2, f"Assertion 4 failed: Very short timeout took {elapsed:.2f}s, expected < 0.2s" + consumer4.close() + + +def test_consume_interruptibility_and_messages(): + """Test consume() interruptibility (main fix) and message handling. + """ + topic = 'test-consume-interrupt-topic' + + # Assertion 1: Infinite timeout can be interrupted immediately + consumer1 = TestConsumer({ + 'group.id': 'test-consume-infinite-immediate', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer1.subscribe([topic]) + + interrupt_thread = threading.Thread(target=lambda: send_sigint_after_delay(0.1)) + interrupt_thread.daemon = True + interrupt_thread.start() + + start = time.time() + try: + consumer1.consume() # Infinite timeout, default num_messages=1 + assert False, "Assertion 1 failed: Should have raised KeyboardInterrupt" + except KeyboardInterrupt: + elapsed = time.time() - start + # Should interrupt within first chunk (~200ms) + assert elapsed < 0.5, f"Assertion 1 failed: Immediate interrupt took {elapsed:.2f}s, expected < 0.5s" + consumer1.close() + + # Assertion 2: Finite timeout can be interrupted before timeout expires + consumer2 = TestConsumer({ + 'group.id': 'test-consume-finite-interrupt', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer2.subscribe([topic]) + + interrupt_thread = threading.Thread(target=lambda: send_sigint_after_delay(0.3)) + interrupt_thread.daemon = True + interrupt_thread.start() + + start = time.time() + try: + consumer2.consume(num_messages=10, timeout=2.0) # 2 seconds, but interrupt after 0.3s + assert False, "Assertion 2 failed: Should have raised KeyboardInterrupt" + except KeyboardInterrupt: + elapsed = time.time() - start + # Should interrupt quickly, not wait for full 2 seconds + assert elapsed < 1.0, f"Assertion 2 failed: Finite timeout interrupt took {elapsed:.2f}s, expected < 1.0s" + assert elapsed < 2.0, f"Assertion 2 failed: Should interrupt before timeout, took {elapsed:.2f}s" + consumer2.close() + + # Assertion 3: Signal sent after multiple chunks still interrupts quickly + consumer3 = TestConsumer({ + 'group.id': 'test-consume-multiple-chunks', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer3.subscribe([topic]) + + interrupt_thread = threading.Thread(target=lambda: send_sigint_after_delay(0.6)) + interrupt_thread.daemon = True + interrupt_thread.start() + + start = time.time() + try: + consumer3.consume(num_messages=5) # Infinite timeout + assert False, "Assertion 3 failed: Should have raised KeyboardInterrupt" + except KeyboardInterrupt: + elapsed = time.time() - start + # Should interrupt within one chunk period after signal (0.6s + 0.2s = 0.8s max) + assert 0.6 <= elapsed <= 0.9, f"Assertion 3 failed: Multiple chunks interrupt took {elapsed:.2f}s, expected 0.6-0.9s" + consumer3.close() + + # Assertion 4: No signal - timeout works normally, returns empty list + consumer4 = TestConsumer({ + 'group.id': 'test-consume-timeout-normal', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer4.subscribe([topic]) + + start = time.time() + msglist = consumer4.consume(num_messages=10, timeout=0.5) # 500ms, no signal + elapsed = time.time() - start + + assert isinstance(msglist, list), "Assertion 4 failed: consume() should return a list" + assert len(msglist) == 0, f"Assertion 4 failed: Expected empty list (timeout), got {len(msglist)} messages" + assert 0.4 <= elapsed <= 0.6, f"Assertion 4 failed: Normal timeout took {elapsed:.2f}s, expected ~0.5s" + consumer4.close() + + # Assertion 5: num_messages=0 returns empty list immediately + consumer5 = TestConsumer({ + 'group.id': 'test-consume-zero-messages', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer5.subscribe([topic]) + + start = time.time() + msglist = consumer5.consume(num_messages=0, timeout=1.0) + elapsed = time.time() - start + + assert isinstance(msglist, list), "Assertion 5 failed: consume() should return a list" + assert len(msglist) == 0, "Assertion 5 failed: num_messages=0 should return empty list" + assert elapsed < 0.1, f"Assertion 5 failed: num_messages=0 took {elapsed:.2f}s, expected < 0.1s" + consumer5.close() + + # Assertion 6: Message available - returns messages + producer = Producer({'bootstrap.servers': 'localhost:9092'}) + for i in range(3): + producer.produce(topic, value=f'test-message-{i}'.encode()) + producer.flush(timeout=1.0) + producer = None + + consumer6 = TestConsumer({ + 'bootstrap.servers': 'localhost:9092', + 'group.id': 'test-consume-messages-available', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 6000, + 'auto.offset.reset': 'earliest' + }) + consumer6.subscribe([topic]) + + # Wait for subscription and message availability + time.sleep(2.0) + + start = time.time() + msglist = consumer6.consume(num_messages=5, timeout=2.0) + elapsed = time.time() - start + + # Messages should be available and return quickly (after consumer is ready) + assert len(msglist) > 0, f"Assertion 6 failed: Expected messages, got empty list" + assert len(msglist) <= 5, f"Assertion 6 failed: Should return at most 5 messages, got {len(msglist)}" + # Allow more time for initial consumer setup, but once ready, should return quickly + assert elapsed < 2.5, f"Assertion 6 failed: Messages available but took {elapsed:.2f}s, expected < 2.5s" + # Verify message values + for i, msg in enumerate(msglist): + assert not msg.error(), f"Assertion 6 failed: Message {i} has error: {msg.error()}" + assert msg.value() is not None, f"Assertion 6 failed: Message {i} has no value" + consumer6.close() + + +def test_consume_edge_cases(): + """Test consume() edge cases. + """ + topic = 'test-consume-edge-topic' + + # Assertion 1: Zero timeout returns immediately (non-blocking) + consumer1 = TestConsumer({ + 'group.id': 'test-consume-zero-timeout', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer1.subscribe([topic]) + + start = time.time() + msglist = consumer1.consume(num_messages=10, timeout=0.0) # Zero timeout + elapsed = time.time() - start + + assert elapsed < 0.1, f"Assertion 1 failed: Zero timeout took {elapsed:.2f}s, expected < 0.1s" + assert isinstance(msglist, list), "Assertion 1 failed: consume() should return a list" + assert len(msglist) == 0, "Assertion 1 failed: Zero timeout with no messages should return empty list" + consumer1.close() + + # Assertion 2: Closed consumer raises RuntimeError + consumer2 = TestConsumer({ + 'group.id': 'test-consume-closed', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000 + }) + consumer2.close() + + with pytest.raises(RuntimeError) as exc_info: + consumer2.consume(num_messages=10, timeout=0.1) + assert 'Consumer closed' in str(exc_info.value), f"Assertion 2 failed: Expected 'Consumer closed' error, got: {exc_info.value}" + + # Assertion 3: Invalid num_messages (negative) raises ValueError + consumer3 = TestConsumer({ + 'group.id': 'test-consume-invalid-negative', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer3.subscribe([topic]) + + with pytest.raises(ValueError) as exc_info: + consumer3.consume(num_messages=-1, timeout=0.1) + assert 'num_messages must be between 0 and 1000000' in str(exc_info.value), f"Assertion 3 failed: Expected num_messages range error, got: {exc_info.value}" + consumer3.close() + + # Assertion 4: Invalid num_messages (too large) raises ValueError + consumer4 = TestConsumer({ + 'group.id': 'test-consume-invalid-large', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer4.subscribe([topic]) + + with pytest.raises(ValueError) as exc_info: + consumer4.consume(num_messages=1000001, timeout=0.1) + assert 'num_messages must be between 0 and 1000000' in str(exc_info.value), f"Assertion 4 failed: Expected num_messages range error, got: {exc_info.value}" + consumer4.close() + + # Assertion 5: Short timeout works correctly (no signal) + consumer5 = TestConsumer({ + 'group.id': 'test-consume-short-timeout', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer5.subscribe([topic]) + + start = time.time() + msglist = consumer5.consume(num_messages=10, timeout=0.1) # 100ms timeout + elapsed = time.time() - start + + assert isinstance(msglist, list), "Assertion 5 failed: consume() should return a list" + assert len(msglist) == 0, "Assertion 5 failed: Short timeout with no messages should return empty list" + assert 0.05 <= elapsed <= 0.2, f"Assertion 5 failed: Short timeout took {elapsed:.2f}s, expected ~0.1s" + consumer5.close() + + # Assertion 6: Very short timeout (less than chunk size) works + consumer6 = TestConsumer({ + 'group.id': 'test-consume-very-short', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer6.subscribe([topic]) + + start = time.time() + msglist = consumer6.consume(num_messages=5, timeout=0.05) # 50ms timeout (less than 200ms chunk) + elapsed = time.time() - start + + assert isinstance(msglist, list), "Assertion 6 failed: consume() should return a list" + assert len(msglist) == 0, "Assertion 6 failed: Very short timeout should return empty list" + assert elapsed < 0.2, f"Assertion 6 failed: Very short timeout took {elapsed:.2f}s, expected < 0.2s" + consumer6.close() From 56dc957c414f7ba3cd42ff87f97c88c1b2a81512 Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Mon, 10 Nov 2025 21:04:47 +0530 Subject: [PATCH 2/6] Dont mask error in send signal function --- tests/test_Consumer.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/test_Consumer.py b/tests/test_Consumer.py index 639470418..1ed41e3a0 100644 --- a/tests/test_Consumer.py +++ b/tests/test_Consumer.py @@ -24,10 +24,7 @@ def send_sigint_after_delay(delay_seconds): delay_seconds: Delay in seconds before sending SIGINT """ time.sleep(delay_seconds) - try: - os.kill(os.getpid(), signal.SIGINT) - except Exception: - pass + os.kill(os.getpid(), signal.SIGINT) def test_basic_api(): From 1513f949e37fcbbffbbb4482a7cfa18e7cfa18b8 Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Mon, 10 Nov 2025 21:22:51 +0530 Subject: [PATCH 3/6] Fix flake8 --- tests/test_Consumer.py | 242 ++++++++++++++++++++++------------------- 1 file changed, 128 insertions(+), 114 deletions(-) diff --git a/tests/test_Consumer.py b/tests/test_Consumer.py index 1ed41e3a0..80af6c617 100644 --- a/tests/test_Consumer.py +++ b/tests/test_Consumer.py @@ -16,10 +16,10 @@ def send_sigint_after_delay(delay_seconds): """Send SIGINT to current process after delay. - + Utility function for testing interruptible poll/consume operations. Used to simulate Ctrl+C in automated tests. - + Args: delay_seconds: Delay in seconds before sending SIGINT """ @@ -707,11 +707,11 @@ def test_calculate_chunk_timeout_utility_function(): 'auto.offset.reset': 'latest' }) consumer1.subscribe(['test-topic']) - + interrupt_thread = threading.Thread(target=lambda: send_sigint_after_delay(0.3)) interrupt_thread.daemon = True interrupt_thread.start() - + start = time.time() try: consumer1.poll() # Infinite timeout - should chunk every 200ms @@ -720,7 +720,7 @@ def test_calculate_chunk_timeout_utility_function(): # Should interrupt within ~0.5s (200ms chunk + overhead) assert elapsed < 1.0, f"Assertion 1 failed: Infinite timeout chunking took {elapsed:.2f}s" consumer1.close() - + # Assertion 2: Finite timeout exact multiple (1.0s = 5 chunks of 200ms) consumer2 = TestConsumer({ 'group.id': 'test-chunk-exact-multiple', @@ -729,15 +729,15 @@ def test_calculate_chunk_timeout_utility_function(): 'auto.offset.reset': 'latest' }) consumer2.subscribe(['test-topic']) - + start = time.time() msg = consumer2.poll(timeout=1.0) # Exactly 1000ms (5 chunks) elapsed = time.time() - start - + assert msg is None, "Assertion 2 failed: Expected None (timeout)" assert 0.8 <= elapsed <= 1.2, f"Assertion 2 failed: Timeout took {elapsed:.2f}s, expected ~1.0s" consumer2.close() - + # Assertion 3: Finite timeout not multiple (0.35s = 1 chunk + 150ms partial) consumer3 = TestConsumer({ 'group.id': 'test-chunk-not-multiple', @@ -746,15 +746,15 @@ def test_calculate_chunk_timeout_utility_function(): 'auto.offset.reset': 'latest' }) consumer3.subscribe(['test-topic']) - + start = time.time() msg = consumer3.poll(timeout=0.35) # 350ms (1 full chunk + 150ms partial) elapsed = time.time() - start - + assert msg is None, "Assertion 3 failed: Expected None (timeout)" assert 0.25 <= elapsed <= 0.45, f"Assertion 3 failed: Timeout took {elapsed:.2f}s, expected ~0.35s" consumer3.close() - + # Assertion 4: Very short timeout (< 200ms chunk size) consumer4 = TestConsumer({ 'group.id': 'test-chunk-very-short', @@ -763,15 +763,15 @@ def test_calculate_chunk_timeout_utility_function(): 'auto.offset.reset': 'latest' }) consumer4.subscribe(['test-topic']) - + start = time.time() msg = consumer4.poll(timeout=0.05) # 50ms (less than 200ms chunk) elapsed = time.time() - start - + assert msg is None, "Assertion 4 failed: Expected None (timeout)" assert 0.03 <= elapsed <= 0.15, f"Assertion 4 failed: Timeout took {elapsed:.2f}s, expected ~0.05s (not 0.2s)" consumer4.close() - + # Assertion 5: Zero timeout (non-blocking) consumer5 = TestConsumer({ 'group.id': 'test-chunk-zero', @@ -780,14 +780,14 @@ def test_calculate_chunk_timeout_utility_function(): 'auto.offset.reset': 'latest' }) consumer5.subscribe(['test-topic']) - + start = time.time() msg = consumer5.poll(timeout=0.0) # Non-blocking elapsed = time.time() - start - + assert elapsed < 0.1, f"Assertion 5 failed: Zero timeout took {elapsed:.2f}s, expected immediate return" consumer5.close() - + # Assertion 6: Large finite timeout (10s = 50 chunks) consumer6 = TestConsumer({ 'group.id': 'test-chunk-large', @@ -796,15 +796,15 @@ def test_calculate_chunk_timeout_utility_function(): 'auto.offset.reset': 'latest' }) consumer6.subscribe(['test-topic']) - + start = time.time() msg = consumer6.poll(timeout=10.0) # 10 seconds (50 chunks) elapsed = time.time() - start - + assert msg is None, "Assertion 6 failed: Expected None (timeout)" assert 9.5 <= elapsed <= 10.5, f"Assertion 6 failed: Timeout took {elapsed:.2f}s, expected ~10.0s" consumer6.close() - + # Assertion 7: Finite timeout with interruption (chunk calculation continues correctly) consumer7 = TestConsumer({ 'group.id': 'test-chunk-finite-interrupt', @@ -813,11 +813,11 @@ def test_calculate_chunk_timeout_utility_function(): 'auto.offset.reset': 'latest' }) consumer7.subscribe(['test-topic']) - + interrupt_thread = threading.Thread(target=lambda: send_sigint_after_delay(0.4)) interrupt_thread.daemon = True interrupt_thread.start() - + start = time.time() try: consumer7.poll(timeout=1.0) # 1 second, but interrupt after 0.4s @@ -839,11 +839,11 @@ def test_check_signals_between_chunks_utility_function(): 'auto.offset.reset': 'latest' }) consumer1.subscribe(['test-topic']) - + interrupt_thread = threading.Thread(target=lambda: send_sigint_after_delay(0.05)) interrupt_thread.daemon = True interrupt_thread.start() - + start = time.time() try: consumer1.poll() # Infinite timeout @@ -852,7 +852,7 @@ def test_check_signals_between_chunks_utility_function(): # Should interrupt within ~200ms (first chunk check) assert elapsed < 0.5, f"Assertion 1 failed: First chunk signal check took {elapsed:.2f}s, expected < 0.5s" consumer1.close() - + # Assertion 2: Signal detected on later chunk check consumer2 = TestConsumer({ 'group.id': 'test-signal-later-chunk', @@ -861,11 +861,11 @@ def test_check_signals_between_chunks_utility_function(): 'auto.offset.reset': 'latest' }) consumer2.subscribe(['test-topic']) - + interrupt_thread = threading.Thread(target=lambda: send_sigint_after_delay(0.5)) interrupt_thread.daemon = True interrupt_thread.start() - + start = time.time() try: consumer2.poll() # Infinite timeout @@ -874,7 +874,7 @@ def test_check_signals_between_chunks_utility_function(): # Should interrupt within ~200ms of signal being sent (0.5s + 0.2s = 0.7s max) assert elapsed < 0.8, f"Assertion 2 failed: Later chunk signal check took {elapsed:.2f}s, expected < 0.8s" consumer2.close() - + # Assertion 3: No signal - continues polling (returns 0) consumer3 = TestConsumer({ 'group.id': 'test-signal-no-signal', @@ -883,15 +883,15 @@ def test_check_signals_between_chunks_utility_function(): 'auto.offset.reset': 'latest' }) consumer3.subscribe(['test-topic']) - + start = time.time() msg = consumer3.poll(timeout=0.5) # 500ms, no signal elapsed = time.time() - start - + assert msg is None, "Assertion 3 failed: Expected None (timeout), no signal should not interrupt" assert 0.4 <= elapsed <= 0.6, f"Assertion 3 failed: No signal timeout took {elapsed:.2f}s, expected ~0.5s" consumer3.close() - + # Assertion 4: Signal checked every chunk (not just once) consumer4 = TestConsumer({ 'group.id': 'test-signal-every-chunk', @@ -900,12 +900,12 @@ def test_check_signals_between_chunks_utility_function(): 'auto.offset.reset': 'latest' }) consumer4.subscribe(['test-topic']) - + # Send signal after 0.6 seconds (3 chunks should have passed) interrupt_thread = threading.Thread(target=lambda: send_sigint_after_delay(0.6)) interrupt_thread.daemon = True interrupt_thread.start() - + start = time.time() try: consumer4.poll() # Infinite timeout @@ -915,7 +915,7 @@ def test_check_signals_between_chunks_utility_function(): # Signal sent at 0.6s, should interrupt by ~0.8s (0.6 + 0.2) assert 0.6 <= elapsed <= 0.9, f"Assertion 4 failed: Every chunk check took {elapsed:.2f}s, expected 0.6-0.9s" consumer4.close() - + # Assertion 5: Signal check works during finite timeout consumer5 = TestConsumer({ 'group.id': 'test-signal-finite-timeout', @@ -924,11 +924,11 @@ def test_check_signals_between_chunks_utility_function(): 'auto.offset.reset': 'latest' }) consumer5.subscribe(['test-topic']) - + interrupt_thread = threading.Thread(target=lambda: send_sigint_after_delay(0.3)) interrupt_thread.daemon = True interrupt_thread.start() - + start = time.time() try: consumer5.poll(timeout=2.0) # 2 seconds, but interrupt after 0.3s @@ -950,11 +950,11 @@ def test_wakeable_poll_utility_functions_interaction(): 'auto.offset.reset': 'latest' }) consumer1.subscribe(['test-topic']) - + interrupt_thread = threading.Thread(target=lambda: send_sigint_after_delay(0.4)) interrupt_thread.daemon = True interrupt_thread.start() - + start = time.time() try: consumer1.poll(timeout=1.0) # 1 second timeout, interrupt after 0.4s @@ -966,7 +966,7 @@ def test_wakeable_poll_utility_functions_interaction(): # Verify it didn't wait for full 1 second timeout assert elapsed < 1.0, f"Assertion 1 failed: Should interrupt before timeout, took {elapsed:.2f}s" consumer1.close() - + # Assertion 2: Multiple chunks before signal - both functions work over multiple iterations consumer2 = TestConsumer({ 'group.id': 'test-interaction-multiple-chunks', @@ -975,12 +975,12 @@ def test_wakeable_poll_utility_functions_interaction(): 'auto.offset.reset': 'latest' }) consumer2.subscribe(['test-topic']) - + # Send signal after 0.6 seconds (3 chunks should have passed: 0.2s, 0.4s, 0.6s) interrupt_thread = threading.Thread(target=lambda: send_sigint_after_delay(0.6)) interrupt_thread.daemon = True interrupt_thread.start() - + start = time.time() try: consumer2.poll() # Infinite timeout @@ -989,17 +989,19 @@ def test_wakeable_poll_utility_functions_interaction(): # Chunk calculation should continue correctly (200ms each) # Signal check should happen every chunk # Should interrupt within ~0.8s (0.6s signal + 0.2s chunk) - assert 0.6 <= elapsed <= 0.9, f"Assertion 2 failed: Multiple chunks interaction took {elapsed:.2f}s, expected 0.6-0.9s" + msg = (f"Assertion 2 failed: Multiple chunks interaction took " + f"{elapsed:.2f}s, expected 0.6-0.9s") + assert 0.6 <= elapsed <= 0.9, msg # Verify chunking was happening (elapsed should be close to signal time + one chunk) assert elapsed >= 0.6, f"Assertion 2 failed: Should wait for signal at 0.6s, but interrupted at {elapsed:.2f}s" consumer2.close() def test_poll_interruptibility_and_messages(): - """Test poll() interruptibility (main fix) and message handling. + """Test poll() interruptibility (main fix) and message handling. """ topic = 'test-poll-interrupt-topic' - + # Assertion 1: Infinite timeout can be interrupted immediately consumer1 = TestConsumer({ 'group.id': 'test-poll-infinite-immediate', @@ -1008,11 +1010,11 @@ def test_poll_interruptibility_and_messages(): 'auto.offset.reset': 'latest' }) consumer1.subscribe([topic]) - + interrupt_thread = threading.Thread(target=lambda: send_sigint_after_delay(0.1)) interrupt_thread.daemon = True interrupt_thread.start() - + start = time.time() try: consumer1.poll() # Infinite timeout @@ -1022,7 +1024,7 @@ def test_poll_interruptibility_and_messages(): # Should interrupt within first chunk (~200ms) assert elapsed < 0.5, f"Assertion 1 failed: Immediate interrupt took {elapsed:.2f}s, expected < 0.5s" consumer1.close() - + # Assertion 2: Finite timeout can be interrupted before timeout expires consumer2 = TestConsumer({ 'group.id': 'test-poll-finite-interrupt', @@ -1031,11 +1033,11 @@ def test_poll_interruptibility_and_messages(): 'auto.offset.reset': 'latest' }) consumer2.subscribe([topic]) - + interrupt_thread = threading.Thread(target=lambda: send_sigint_after_delay(0.3)) interrupt_thread.daemon = True interrupt_thread.start() - + start = time.time() try: consumer2.poll(timeout=2.0) # 2 seconds, but interrupt after 0.3s @@ -1046,7 +1048,7 @@ def test_poll_interruptibility_and_messages(): assert elapsed < 1.0, f"Assertion 2 failed: Finite timeout interrupt took {elapsed:.2f}s, expected < 1.0s" assert elapsed < 2.0, f"Assertion 2 failed: Should interrupt before timeout, took {elapsed:.2f}s" consumer2.close() - + # Assertion 3: Signal sent after multiple chunks still interrupts quickly consumer3 = TestConsumer({ 'group.id': 'test-poll-multiple-chunks', @@ -1055,11 +1057,11 @@ def test_poll_interruptibility_and_messages(): 'auto.offset.reset': 'latest' }) consumer3.subscribe([topic]) - + interrupt_thread = threading.Thread(target=lambda: send_sigint_after_delay(0.6)) interrupt_thread.daemon = True interrupt_thread.start() - + start = time.time() try: consumer3.poll() # Infinite timeout @@ -1067,9 +1069,11 @@ def test_poll_interruptibility_and_messages(): except KeyboardInterrupt: elapsed = time.time() - start # Should interrupt within one chunk period after signal (0.6s + 0.2s = 0.8s max) - assert 0.6 <= elapsed <= 0.9, f"Assertion 3 failed: Multiple chunks interrupt took {elapsed:.2f}s, expected 0.6-0.9s" + msg = (f"Assertion 3 failed: Multiple chunks interrupt took " + f"{elapsed:.2f}s, expected 0.6-0.9s") + assert 0.6 <= elapsed <= 0.9, msg consumer3.close() - + # Assertion 4: No signal - timeout works normally consumer4 = TestConsumer({ 'group.id': 'test-poll-timeout-normal', @@ -1078,21 +1082,21 @@ def test_poll_interruptibility_and_messages(): 'auto.offset.reset': 'latest' }) consumer4.subscribe([topic]) - + start = time.time() msg = consumer4.poll(timeout=0.5) # 500ms, no signal elapsed = time.time() - start - + assert msg is None, "Assertion 4 failed: Expected None (timeout), no signal should not interrupt" assert 0.4 <= elapsed <= 0.6, f"Assertion 4 failed: Normal timeout took {elapsed:.2f}s, expected ~0.5s" consumer4.close() - + # Assertion 5: Message available - returns immediately producer = Producer({'bootstrap.servers': 'localhost:9092'}) producer.produce(topic, value=b'test-message') producer.flush(timeout=1.0) producer = None - + consumer5 = TestConsumer({ 'bootstrap.servers': 'localhost:9092', 'group.id': 'test-poll-message-available', @@ -1101,14 +1105,14 @@ def test_poll_interruptibility_and_messages(): 'auto.offset.reset': 'earliest' }) consumer5.subscribe([topic]) - + # Wait for subscription and message availability time.sleep(2.0) - + start = time.time() msg = consumer5.poll(timeout=2.0) elapsed = time.time() - start - + # Message should be available and return quickly (after consumer is ready) assert msg is not None, "Assertion 5 failed: Expected message, got None" assert not msg.error(), f"Assertion 5 failed: Message has error: {msg.error()}" @@ -1122,7 +1126,7 @@ def test_poll_edge_cases(): """Test poll() edge cases. """ topic = 'test-poll-edge-topic' - + # Assertion 1: Zero timeout returns immediately (non-blocking) consumer1 = TestConsumer({ 'group.id': 'test-poll-zero-timeout', @@ -1131,15 +1135,15 @@ def test_poll_edge_cases(): 'auto.offset.reset': 'latest' }) consumer1.subscribe([topic]) - + start = time.time() msg = consumer1.poll(timeout=0.0) # Zero timeout elapsed = time.time() - start - + assert elapsed < 0.1, f"Assertion 1 failed: Zero timeout took {elapsed:.2f}s, expected < 0.1s" assert msg is None, "Assertion 1 failed: Zero timeout with no messages should return None" consumer1.close() - + # Assertion 2: Closed consumer raises RuntimeError consumer2 = TestConsumer({ 'group.id': 'test-poll-closed', @@ -1147,11 +1151,13 @@ def test_poll_edge_cases(): 'session.timeout.ms': 1000 }) consumer2.close() - + with pytest.raises(RuntimeError) as exc_info: consumer2.poll(timeout=0.1) - assert 'Consumer closed' in str(exc_info.value), f"Assertion 2 failed: Expected 'Consumer closed' error, got: {exc_info.value}" - + msg = (f"Assertion 2 failed: Expected 'Consumer closed' error, " + f"got: {exc_info.value}") + assert 'Consumer closed' in str(exc_info.value), msg + # Assertion 3: Short timeout works correctly (no signal) consumer3 = TestConsumer({ 'group.id': 'test-poll-short-timeout', @@ -1160,15 +1166,15 @@ def test_poll_edge_cases(): 'auto.offset.reset': 'latest' }) consumer3.subscribe([topic]) - + start = time.time() msg = consumer3.poll(timeout=0.1) # 100ms timeout elapsed = time.time() - start - + assert msg is None, "Assertion 3 failed: Short timeout with no messages should return None" assert 0.05 <= elapsed <= 0.2, f"Assertion 3 failed: Short timeout took {elapsed:.2f}s, expected ~0.1s" consumer3.close() - + # Assertion 4: Very short timeout (less than chunk size) works consumer4 = TestConsumer({ 'group.id': 'test-poll-very-short', @@ -1177,21 +1183,21 @@ def test_poll_edge_cases(): 'auto.offset.reset': 'latest' }) consumer4.subscribe([topic]) - + start = time.time() msg = consumer4.poll(timeout=0.05) # 50ms timeout (less than 200ms chunk) elapsed = time.time() - start - + assert msg is None, "Assertion 4 failed: Very short timeout should return None" assert elapsed < 0.2, f"Assertion 4 failed: Very short timeout took {elapsed:.2f}s, expected < 0.2s" consumer4.close() def test_consume_interruptibility_and_messages(): - """Test consume() interruptibility (main fix) and message handling. + """Test consume() interruptibility (main fix) and message handling. """ topic = 'test-consume-interrupt-topic' - + # Assertion 1: Infinite timeout can be interrupted immediately consumer1 = TestConsumer({ 'group.id': 'test-consume-infinite-immediate', @@ -1200,11 +1206,11 @@ def test_consume_interruptibility_and_messages(): 'auto.offset.reset': 'latest' }) consumer1.subscribe([topic]) - + interrupt_thread = threading.Thread(target=lambda: send_sigint_after_delay(0.1)) interrupt_thread.daemon = True interrupt_thread.start() - + start = time.time() try: consumer1.consume() # Infinite timeout, default num_messages=1 @@ -1214,7 +1220,7 @@ def test_consume_interruptibility_and_messages(): # Should interrupt within first chunk (~200ms) assert elapsed < 0.5, f"Assertion 1 failed: Immediate interrupt took {elapsed:.2f}s, expected < 0.5s" consumer1.close() - + # Assertion 2: Finite timeout can be interrupted before timeout expires consumer2 = TestConsumer({ 'group.id': 'test-consume-finite-interrupt', @@ -1223,11 +1229,11 @@ def test_consume_interruptibility_and_messages(): 'auto.offset.reset': 'latest' }) consumer2.subscribe([topic]) - + interrupt_thread = threading.Thread(target=lambda: send_sigint_after_delay(0.3)) interrupt_thread.daemon = True interrupt_thread.start() - + start = time.time() try: consumer2.consume(num_messages=10, timeout=2.0) # 2 seconds, but interrupt after 0.3s @@ -1238,7 +1244,7 @@ def test_consume_interruptibility_and_messages(): assert elapsed < 1.0, f"Assertion 2 failed: Finite timeout interrupt took {elapsed:.2f}s, expected < 1.0s" assert elapsed < 2.0, f"Assertion 2 failed: Should interrupt before timeout, took {elapsed:.2f}s" consumer2.close() - + # Assertion 3: Signal sent after multiple chunks still interrupts quickly consumer3 = TestConsumer({ 'group.id': 'test-consume-multiple-chunks', @@ -1247,11 +1253,11 @@ def test_consume_interruptibility_and_messages(): 'auto.offset.reset': 'latest' }) consumer3.subscribe([topic]) - + interrupt_thread = threading.Thread(target=lambda: send_sigint_after_delay(0.6)) interrupt_thread.daemon = True interrupt_thread.start() - + start = time.time() try: consumer3.consume(num_messages=5) # Infinite timeout @@ -1259,9 +1265,11 @@ def test_consume_interruptibility_and_messages(): except KeyboardInterrupt: elapsed = time.time() - start # Should interrupt within one chunk period after signal (0.6s + 0.2s = 0.8s max) - assert 0.6 <= elapsed <= 0.9, f"Assertion 3 failed: Multiple chunks interrupt took {elapsed:.2f}s, expected 0.6-0.9s" + msg = (f"Assertion 3 failed: Multiple chunks interrupt took " + f"{elapsed:.2f}s, expected 0.6-0.9s") + assert 0.6 <= elapsed <= 0.9, msg consumer3.close() - + # Assertion 4: No signal - timeout works normally, returns empty list consumer4 = TestConsumer({ 'group.id': 'test-consume-timeout-normal', @@ -1270,16 +1278,16 @@ def test_consume_interruptibility_and_messages(): 'auto.offset.reset': 'latest' }) consumer4.subscribe([topic]) - + start = time.time() msglist = consumer4.consume(num_messages=10, timeout=0.5) # 500ms, no signal elapsed = time.time() - start - + assert isinstance(msglist, list), "Assertion 4 failed: consume() should return a list" assert len(msglist) == 0, f"Assertion 4 failed: Expected empty list (timeout), got {len(msglist)} messages" assert 0.4 <= elapsed <= 0.6, f"Assertion 4 failed: Normal timeout took {elapsed:.2f}s, expected ~0.5s" consumer4.close() - + # Assertion 5: num_messages=0 returns empty list immediately consumer5 = TestConsumer({ 'group.id': 'test-consume-zero-messages', @@ -1288,23 +1296,23 @@ def test_consume_interruptibility_and_messages(): 'auto.offset.reset': 'latest' }) consumer5.subscribe([topic]) - + start = time.time() msglist = consumer5.consume(num_messages=0, timeout=1.0) elapsed = time.time() - start - + assert isinstance(msglist, list), "Assertion 5 failed: consume() should return a list" assert len(msglist) == 0, "Assertion 5 failed: num_messages=0 should return empty list" assert elapsed < 0.1, f"Assertion 5 failed: num_messages=0 took {elapsed:.2f}s, expected < 0.1s" consumer5.close() - + # Assertion 6: Message available - returns messages producer = Producer({'bootstrap.servers': 'localhost:9092'}) for i in range(3): producer.produce(topic, value=f'test-message-{i}'.encode()) producer.flush(timeout=1.0) producer = None - + consumer6 = TestConsumer({ 'bootstrap.servers': 'localhost:9092', 'group.id': 'test-consume-messages-available', @@ -1313,16 +1321,16 @@ def test_consume_interruptibility_and_messages(): 'auto.offset.reset': 'earliest' }) consumer6.subscribe([topic]) - + # Wait for subscription and message availability time.sleep(2.0) - + start = time.time() msglist = consumer6.consume(num_messages=5, timeout=2.0) elapsed = time.time() - start - + # Messages should be available and return quickly (after consumer is ready) - assert len(msglist) > 0, f"Assertion 6 failed: Expected messages, got empty list" + assert len(msglist) > 0, "Assertion 6 failed: Expected messages, got empty list" assert len(msglist) <= 5, f"Assertion 6 failed: Should return at most 5 messages, got {len(msglist)}" # Allow more time for initial consumer setup, but once ready, should return quickly assert elapsed < 2.5, f"Assertion 6 failed: Messages available but took {elapsed:.2f}s, expected < 2.5s" @@ -1337,7 +1345,7 @@ def test_consume_edge_cases(): """Test consume() edge cases. """ topic = 'test-consume-edge-topic' - + # Assertion 1: Zero timeout returns immediately (non-blocking) consumer1 = TestConsumer({ 'group.id': 'test-consume-zero-timeout', @@ -1346,16 +1354,16 @@ def test_consume_edge_cases(): 'auto.offset.reset': 'latest' }) consumer1.subscribe([topic]) - + start = time.time() msglist = consumer1.consume(num_messages=10, timeout=0.0) # Zero timeout elapsed = time.time() - start - + assert elapsed < 0.1, f"Assertion 1 failed: Zero timeout took {elapsed:.2f}s, expected < 0.1s" assert isinstance(msglist, list), "Assertion 1 failed: consume() should return a list" assert len(msglist) == 0, "Assertion 1 failed: Zero timeout with no messages should return empty list" consumer1.close() - + # Assertion 2: Closed consumer raises RuntimeError consumer2 = TestConsumer({ 'group.id': 'test-consume-closed', @@ -1363,11 +1371,13 @@ def test_consume_edge_cases(): 'session.timeout.ms': 1000 }) consumer2.close() - + with pytest.raises(RuntimeError) as exc_info: consumer2.consume(num_messages=10, timeout=0.1) - assert 'Consumer closed' in str(exc_info.value), f"Assertion 2 failed: Expected 'Consumer closed' error, got: {exc_info.value}" - + msg = (f"Assertion 2 failed: Expected 'Consumer closed' error, " + f"got: {exc_info.value}") + assert 'Consumer closed' in str(exc_info.value), msg + # Assertion 3: Invalid num_messages (negative) raises ValueError consumer3 = TestConsumer({ 'group.id': 'test-consume-invalid-negative', @@ -1376,12 +1386,14 @@ def test_consume_edge_cases(): 'auto.offset.reset': 'latest' }) consumer3.subscribe([topic]) - + with pytest.raises(ValueError) as exc_info: consumer3.consume(num_messages=-1, timeout=0.1) - assert 'num_messages must be between 0 and 1000000' in str(exc_info.value), f"Assertion 3 failed: Expected num_messages range error, got: {exc_info.value}" + msg = (f"Assertion 3 failed: Expected num_messages range error, " + f"got: {exc_info.value}") + assert 'num_messages must be between 0 and 1000000' in str(exc_info.value), msg consumer3.close() - + # Assertion 4: Invalid num_messages (too large) raises ValueError consumer4 = TestConsumer({ 'group.id': 'test-consume-invalid-large', @@ -1390,12 +1402,14 @@ def test_consume_edge_cases(): 'auto.offset.reset': 'latest' }) consumer4.subscribe([topic]) - + with pytest.raises(ValueError) as exc_info: consumer4.consume(num_messages=1000001, timeout=0.1) - assert 'num_messages must be between 0 and 1000000' in str(exc_info.value), f"Assertion 4 failed: Expected num_messages range error, got: {exc_info.value}" + msg = (f"Assertion 4 failed: Expected num_messages range error, " + f"got: {exc_info.value}") + assert 'num_messages must be between 0 and 1000000' in str(exc_info.value), msg consumer4.close() - + # Assertion 5: Short timeout works correctly (no signal) consumer5 = TestConsumer({ 'group.id': 'test-consume-short-timeout', @@ -1404,16 +1418,16 @@ def test_consume_edge_cases(): 'auto.offset.reset': 'latest' }) consumer5.subscribe([topic]) - + start = time.time() msglist = consumer5.consume(num_messages=10, timeout=0.1) # 100ms timeout elapsed = time.time() - start - + assert isinstance(msglist, list), "Assertion 5 failed: consume() should return a list" assert len(msglist) == 0, "Assertion 5 failed: Short timeout with no messages should return empty list" assert 0.05 <= elapsed <= 0.2, f"Assertion 5 failed: Short timeout took {elapsed:.2f}s, expected ~0.1s" consumer5.close() - + # Assertion 6: Very short timeout (less than chunk size) works consumer6 = TestConsumer({ 'group.id': 'test-consume-very-short', @@ -1422,11 +1436,11 @@ def test_consume_edge_cases(): 'auto.offset.reset': 'latest' }) consumer6.subscribe([topic]) - + start = time.time() msglist = consumer6.consume(num_messages=5, timeout=0.05) # 50ms timeout (less than 200ms chunk) elapsed = time.time() - start - + assert isinstance(msglist, list), "Assertion 6 failed: consume() should return a list" assert len(msglist) == 0, "Assertion 6 failed: Very short timeout should return empty list" assert elapsed < 0.2, f"Assertion 6 failed: Very short timeout took {elapsed:.2f}s, expected < 0.2s" From 0df67a54bf0b36604bb04cca760a2a2e1b3d8d61 Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Tue, 11 Nov 2025 00:58:21 +0530 Subject: [PATCH 4/6] Skip lock release reacquire for small timeouts --- src/confluent_kafka/src/Consumer.c | 104 ++++++++++++++++++----------- 1 file changed, 65 insertions(+), 39 deletions(-) diff --git a/src/confluent_kafka/src/Consumer.c b/src/confluent_kafka/src/Consumer.c index 9a2fcb7ca..8e755e2d0 100644 --- a/src/confluent_kafka/src/Consumer.c +++ b/src/confluent_kafka/src/Consumer.c @@ -1101,28 +1101,36 @@ static PyObject *Consumer_poll (Handle *self, PyObject *args, CallState_begin(self, &cs); - while (1) { - /* Calculate timeout for this chunk */ - chunk_timeout_ms = calculate_chunk_timeout(total_timeout_ms, chunk_count, - CHUNK_TIMEOUT_MS); - if (chunk_timeout_ms == 0) { - /* Timeout expired */ - break; - } + /* Skip wakeable poll pattern for non-blocking or very short timeouts. + * This avoids unnecessary GIL re-acquisition that can interfere with + * ThreadPool. Only use wakeable poll for + * blocking calls that need to be interruptible. */ + if (total_timeout_ms >= 0 && total_timeout_ms < CHUNK_TIMEOUT_MS) { + rkm = rd_kafka_consumer_poll(self->rk, total_timeout_ms); + } else { + while (1) { + /* Calculate timeout for this chunk */ + chunk_timeout_ms = calculate_chunk_timeout(total_timeout_ms, chunk_count, + CHUNK_TIMEOUT_MS); + if (chunk_timeout_ms == 0) { + /* Timeout expired */ + break; + } - /* Poll with chunk timeout */ - rkm = rd_kafka_consumer_poll(self->rk, chunk_timeout_ms); + /* Poll with chunk timeout */ + rkm = rd_kafka_consumer_poll(self->rk, chunk_timeout_ms); - /* If we got a message, exit the loop */ - if (rkm) { - break; - } + /* If we got a message, exit the loop */ + if (rkm) { + break; + } - chunk_count++; + chunk_count++; - /* Check for signals between chunks */ - if (check_signals_between_chunks(self, &cs)) { - return NULL; + /* Check for signals between chunks */ + if (check_signals_between_chunks(self, &cs)) { + return NULL; + } } } @@ -1237,18 +1245,13 @@ static PyObject *Consumer_consume (Handle *self, PyObject *args, CallState_begin(self, &cs); - while (1) { - /* Calculate timeout for this chunk */ - chunk_timeout_ms = calculate_chunk_timeout(total_timeout_ms, chunk_count, - CHUNK_TIMEOUT_MS); - if (chunk_timeout_ms == 0) { - /* Timeout expired */ - break; - } - - /* Consume with chunk timeout */ - n = (Py_ssize_t)rd_kafka_consume_batch_queue(rkqu, chunk_timeout_ms, - rkmessages, num_messages); + /* Skip wakeable poll pattern for non-blocking or very short timeouts. + * This avoids unnecessary GIL re-acquisition that can interfere with + * ThreadPool. Only use wakeable poll for + * blocking calls that need to be interruptible. */ + if (total_timeout_ms >= 0 && total_timeout_ms < CHUNK_TIMEOUT_MS) { + n = (Py_ssize_t)rd_kafka_consume_batch_queue(rkqu, total_timeout_ms, + rkmessages, num_messages); if (n < 0) { /* Error - need to restore GIL before setting error */ @@ -1258,18 +1261,41 @@ static PyObject *Consumer_consume (Handle *self, PyObject *args, "%s", rd_kafka_err2str(rd_kafka_last_error())); return NULL; } + } else { + while (1) { + /* Calculate timeout for this chunk */ + chunk_timeout_ms = calculate_chunk_timeout(total_timeout_ms, chunk_count, + CHUNK_TIMEOUT_MS); + if (chunk_timeout_ms == 0) { + /* Timeout expired */ + break; + } - /* If we got messages, exit the loop */ - if (n > 0) { - break; - } + /* Consume with chunk timeout */ + n = (Py_ssize_t)rd_kafka_consume_batch_queue(rkqu, chunk_timeout_ms, + rkmessages, num_messages); + + if (n < 0) { + /* Error - need to restore GIL before setting error */ + PyEval_RestoreThread(cs.thread_state); + free(rkmessages); + cfl_PyErr_Format(rd_kafka_last_error(), + "%s", rd_kafka_err2str(rd_kafka_last_error())); + return NULL; + } - chunk_count++; + /* If we got messages, exit the loop */ + if (n > 0) { + break; + } - /* Check for signals between chunks */ - if (check_signals_between_chunks(self, &cs)) { - free(rkmessages); - return NULL; + chunk_count++; + + /* Check for signals between chunks */ + if (check_signals_between_chunks(self, &cs)) { + free(rkmessages); + return NULL; + } } } From 265db0489b48856f5e4c59eab38b033df24e4a7a Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Tue, 11 Nov 2025 01:41:50 +0530 Subject: [PATCH 5/6] Move real broker tests to integration tests --- .../test_consumer_wakeable_poll_consume.py | 113 ++++++++++++++++++ tests/test_Consumer.py | 64 ---------- 2 files changed, 113 insertions(+), 64 deletions(-) create mode 100644 tests/integration/consumer/test_consumer_wakeable_poll_consume.py diff --git a/tests/integration/consumer/test_consumer_wakeable_poll_consume.py b/tests/integration/consumer/test_consumer_wakeable_poll_consume.py new file mode 100644 index 000000000..31cd2a5a7 --- /dev/null +++ b/tests/integration/consumer/test_consumer_wakeable_poll_consume.py @@ -0,0 +1,113 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright 2024 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import pytest +import time + +from tests.common import TestConsumer + + +def test_poll_message_delivery_with_wakeable_pattern(kafka_cluster): + """Test that poll() correctly returns messages when available. + + This integration test verifies that the wakeable poll pattern doesn't + interfere with normal message delivery. + """ + topic = kafka_cluster.create_topic_and_wait_propogation('test-poll-message-delivery') + + # Produce a test message (use cimpl_producer for raw bytes) + producer = kafka_cluster.cimpl_producer() + producer.produce(topic, value=b'test-message') + producer.flush(timeout=1.0) + + # Create consumer with wakeable poll pattern settings + consumer_conf = kafka_cluster.client_conf({ + 'group.id': 'test-poll-message-available', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 6000, + 'auto.offset.reset': 'earliest' + }) + consumer = TestConsumer(consumer_conf) + consumer.subscribe([topic]) + + # Wait for subscription and message availability + time.sleep(2.0) + + # Poll for message - should return immediately when available + start = time.time() + msg = consumer.poll(timeout=2.0) + elapsed = time.time() - start + + # Verify message was returned correctly + assert msg is not None, "Expected message, got None" + assert not msg.error(), f"Message has error: {msg.error()}" + # Allow more time for initial consumer setup, but once ready, should return quickly + assert elapsed < 2.5, f"Message available but took {elapsed:.2f}s, expected < 2.5s" + assert msg.value() == b'test-message', "Message value mismatch" + + consumer.close() + + +def test_consume_message_delivery_with_wakeable_pattern(kafka_cluster): + """Test that consume() correctly returns messages when available. + + This integration test verifies that the wakeable poll pattern doesn't + interfere with normal batch message delivery. + """ + topic = kafka_cluster.create_topic_and_wait_propogation('test-consume-message-delivery') + + # Produce multiple test messages (use cimpl_producer for raw bytes) + producer = kafka_cluster.cimpl_producer() + for i in range(3): + producer.produce(topic, value=f'test-message-{i}'.encode()) + producer.flush(timeout=1.0) + + # Create consumer with wakeable poll pattern settings + consumer_conf = kafka_cluster.client_conf({ + 'group.id': 'test-consume-messages-available', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 6000, + 'auto.offset.reset': 'earliest' + }) + consumer = TestConsumer(consumer_conf) + consumer.subscribe([topic]) + + # Wait for subscription and message availability + time.sleep(2.0) + + # Consume messages - should return immediately when available + start = time.time() + msglist = consumer.consume(num_messages=5, timeout=2.0) + elapsed = time.time() - start + + # Verify messages were returned correctly + assert len(msglist) > 0, "Expected messages, got empty list" + assert len(msglist) <= 5, f"Should return at most 5 messages, got {len(msglist)}" + # Allow more time for initial consumer setup, but once ready, should return quickly + assert elapsed < 2.5, f"Messages available but took {elapsed:.2f}s, expected < 2.5s" + + # Verify message values + for i, msg in enumerate(msglist): + assert not msg.error(), f"Message {i} has error: {msg.error()}" + assert msg.value() is not None, f"Message {i} has no value" + # Verify we got the expected messages + expected_value = f'test-message-{i}'.encode() + assert msg.value() == expected_value, f"Message {i} value mismatch: expected {expected_value}, got {msg.value()}" + + consumer.close() + diff --git a/tests/test_Consumer.py b/tests/test_Consumer.py index 80af6c617..638892a66 100644 --- a/tests/test_Consumer.py +++ b/tests/test_Consumer.py @@ -1091,36 +1091,6 @@ def test_poll_interruptibility_and_messages(): assert 0.4 <= elapsed <= 0.6, f"Assertion 4 failed: Normal timeout took {elapsed:.2f}s, expected ~0.5s" consumer4.close() - # Assertion 5: Message available - returns immediately - producer = Producer({'bootstrap.servers': 'localhost:9092'}) - producer.produce(topic, value=b'test-message') - producer.flush(timeout=1.0) - producer = None - - consumer5 = TestConsumer({ - 'bootstrap.servers': 'localhost:9092', - 'group.id': 'test-poll-message-available', - 'socket.timeout.ms': 100, - 'session.timeout.ms': 6000, - 'auto.offset.reset': 'earliest' - }) - consumer5.subscribe([topic]) - - # Wait for subscription and message availability - time.sleep(2.0) - - start = time.time() - msg = consumer5.poll(timeout=2.0) - elapsed = time.time() - start - - # Message should be available and return quickly (after consumer is ready) - assert msg is not None, "Assertion 5 failed: Expected message, got None" - assert not msg.error(), f"Assertion 5 failed: Message has error: {msg.error()}" - # Allow more time for initial consumer setup, but once ready, should return quickly - assert elapsed < 2.5, f"Assertion 5 failed: Message available but took {elapsed:.2f}s, expected < 2.5s" - assert msg.value() == b'test-message', "Assertion 5 failed: Message value mismatch" - consumer5.close() - def test_poll_edge_cases(): """Test poll() edge cases. @@ -1306,40 +1276,6 @@ def test_consume_interruptibility_and_messages(): assert elapsed < 0.1, f"Assertion 5 failed: num_messages=0 took {elapsed:.2f}s, expected < 0.1s" consumer5.close() - # Assertion 6: Message available - returns messages - producer = Producer({'bootstrap.servers': 'localhost:9092'}) - for i in range(3): - producer.produce(topic, value=f'test-message-{i}'.encode()) - producer.flush(timeout=1.0) - producer = None - - consumer6 = TestConsumer({ - 'bootstrap.servers': 'localhost:9092', - 'group.id': 'test-consume-messages-available', - 'socket.timeout.ms': 100, - 'session.timeout.ms': 6000, - 'auto.offset.reset': 'earliest' - }) - consumer6.subscribe([topic]) - - # Wait for subscription and message availability - time.sleep(2.0) - - start = time.time() - msglist = consumer6.consume(num_messages=5, timeout=2.0) - elapsed = time.time() - start - - # Messages should be available and return quickly (after consumer is ready) - assert len(msglist) > 0, "Assertion 6 failed: Expected messages, got empty list" - assert len(msglist) <= 5, f"Assertion 6 failed: Should return at most 5 messages, got {len(msglist)}" - # Allow more time for initial consumer setup, but once ready, should return quickly - assert elapsed < 2.5, f"Assertion 6 failed: Messages available but took {elapsed:.2f}s, expected < 2.5s" - # Verify message values - for i, msg in enumerate(msglist): - assert not msg.error(), f"Assertion 6 failed: Message {i} has error: {msg.error()}" - assert msg.value() is not None, f"Assertion 6 failed: Message {i} has no value" - consumer6.close() - def test_consume_edge_cases(): """Test consume() edge cases. From 0ae56c6ae0334fc96113dd5ed19ad28d0153dbbe Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Tue, 11 Nov 2025 01:44:32 +0530 Subject: [PATCH 6/6] Fix flake8 --- .../test_consumer_wakeable_poll_consume.py | 34 +++++++++---------- tests/test_Consumer.py | 2 +- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/tests/integration/consumer/test_consumer_wakeable_poll_consume.py b/tests/integration/consumer/test_consumer_wakeable_poll_consume.py index 31cd2a5a7..77e5444a5 100644 --- a/tests/integration/consumer/test_consumer_wakeable_poll_consume.py +++ b/tests/integration/consumer/test_consumer_wakeable_poll_consume.py @@ -16,7 +16,6 @@ # limitations under the License. # -import pytest import time from tests.common import TestConsumer @@ -29,12 +28,12 @@ def test_poll_message_delivery_with_wakeable_pattern(kafka_cluster): interfere with normal message delivery. """ topic = kafka_cluster.create_topic_and_wait_propogation('test-poll-message-delivery') - + # Produce a test message (use cimpl_producer for raw bytes) producer = kafka_cluster.cimpl_producer() producer.produce(topic, value=b'test-message') producer.flush(timeout=1.0) - + # Create consumer with wakeable poll pattern settings consumer_conf = kafka_cluster.client_conf({ 'group.id': 'test-poll-message-available', @@ -44,39 +43,39 @@ def test_poll_message_delivery_with_wakeable_pattern(kafka_cluster): }) consumer = TestConsumer(consumer_conf) consumer.subscribe([topic]) - + # Wait for subscription and message availability time.sleep(2.0) - + # Poll for message - should return immediately when available start = time.time() msg = consumer.poll(timeout=2.0) elapsed = time.time() - start - + # Verify message was returned correctly assert msg is not None, "Expected message, got None" assert not msg.error(), f"Message has error: {msg.error()}" # Allow more time for initial consumer setup, but once ready, should return quickly assert elapsed < 2.5, f"Message available but took {elapsed:.2f}s, expected < 2.5s" assert msg.value() == b'test-message', "Message value mismatch" - + consumer.close() def test_consume_message_delivery_with_wakeable_pattern(kafka_cluster): """Test that consume() correctly returns messages when available. - + This integration test verifies that the wakeable poll pattern doesn't interfere with normal batch message delivery. """ topic = kafka_cluster.create_topic_and_wait_propogation('test-consume-message-delivery') - + # Produce multiple test messages (use cimpl_producer for raw bytes) producer = kafka_cluster.cimpl_producer() for i in range(3): producer.produce(topic, value=f'test-message-{i}'.encode()) producer.flush(timeout=1.0) - + # Create consumer with wakeable poll pattern settings consumer_conf = kafka_cluster.client_conf({ 'group.id': 'test-consume-messages-available', @@ -86,28 +85,29 @@ def test_consume_message_delivery_with_wakeable_pattern(kafka_cluster): }) consumer = TestConsumer(consumer_conf) consumer.subscribe([topic]) - + # Wait for subscription and message availability time.sleep(2.0) - + # Consume messages - should return immediately when available start = time.time() msglist = consumer.consume(num_messages=5, timeout=2.0) elapsed = time.time() - start - + # Verify messages were returned correctly assert len(msglist) > 0, "Expected messages, got empty list" assert len(msglist) <= 5, f"Should return at most 5 messages, got {len(msglist)}" # Allow more time for initial consumer setup, but once ready, should return quickly assert elapsed < 2.5, f"Messages available but took {elapsed:.2f}s, expected < 2.5s" - + # Verify message values for i, msg in enumerate(msglist): assert not msg.error(), f"Message {i} has error: {msg.error()}" assert msg.value() is not None, f"Message {i} has no value" # Verify we got the expected messages expected_value = f'test-message-{i}'.encode() - assert msg.value() == expected_value, f"Message {i} value mismatch: expected {expected_value}, got {msg.value()}" - - consumer.close() + expected_msg = (f"Message {i} value mismatch: expected {expected_value}, " + f"got {msg.value()}") + assert msg.value() == expected_value, expected_msg + consumer.close() diff --git a/tests/test_Consumer.py b/tests/test_Consumer.py index 638892a66..1e2e6331b 100644 --- a/tests/test_Consumer.py +++ b/tests/test_Consumer.py @@ -7,7 +7,7 @@ import pytest -from confluent_kafka import (Consumer, Producer, TopicPartition, KafkaError, +from confluent_kafka import (Consumer, TopicPartition, KafkaError, KafkaException, TIMESTAMP_NOT_AVAILABLE, OFFSET_INVALID)