Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Confluent Python Client for Apache Kafka - CHANGELOG


### Fixes

- Fixed `Consumer.poll()` and `Consumer.consume()` blocking indefinitely and not responding to Ctrl+C (KeyboardInterrupt) signals. The implementation now uses a "wakeable poll" pattern that breaks long blocking calls into smaller chunks (200ms) and periodically re-acquires the Python GIL to check for pending signals. This allows Ctrl+C to properly interrupt blocking consumer operations. Fixes Issues [#209](https://github.com/confluentinc/confluent-kafka-python/issues/209) and [#807](https://github.com/confluentinc/confluent-kafka-python/issues/807).


## v2.12.1 - 2025-10-21

v2.12.1 is a maintenance release with the following fixes:
Expand Down
211 changes: 195 additions & 16 deletions src/confluent_kafka/src/Consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,81 @@ static int Consumer_traverse (Handle *self,
}


/****************************************************************************
*
* Helper functions for implementing interruptible poll/consume operations
* that allow Ctrl+C to terminate blocking calls. See Issues #209 and #807.
*
*
****************************************************************************/

/**
* @brief Calculate the timeout for the current chunk in wakeable poll pattern.
*
* @param total_timeout_ms Total timeout in milliseconds (-1 for infinite)
* @param chunk_count Current chunk iteration count (0-based)
* @param chunk_timeout_ms Chunk size in milliseconds (typically 200ms)
* @return int Chunk timeout in milliseconds, or 0 if total timeout expired
*/
static int calculate_chunk_timeout(int total_timeout_ms, int chunk_count,
int chunk_timeout_ms) {
if (total_timeout_ms < 0) {
/* Infinite timeout - use chunk size */
return chunk_timeout_ms;
} else {
/* Finite timeout - calculate remaining */
int remaining_ms = total_timeout_ms - (chunk_count * chunk_timeout_ms);
if (remaining_ms <= 0) {
/* Timeout expired */
return 0;
}
return (remaining_ms < chunk_timeout_ms) ? remaining_ms : chunk_timeout_ms;
}
}

/**
* @brief Check for pending signals between poll chunks.
*
* Re-acquires GIL, checks for signals, and handles cleanup if signal detected.
* This allows Ctrl+C to interrupt blocking poll/consume operations.
*
* @param self Consumer handle
* @param cs CallState structure (thread state will be updated)
* @return int 0 if no signal detected (continue), 1 if signal detected (should return NULL)
*/
static int check_signals_between_chunks(Handle *self, CallState *cs) {
/* Re-acquire GIL to check for signals */
PyEval_RestoreThread(cs->thread_state);

/* Check for pending signals (KeyboardInterrupt, etc.) */
/* PyErr_CheckSignals() already set the exception */
if (PyErr_CheckSignals() == -1) {
/* Note: GIL is already held, but CallState_end expects to restore it */
/* Save thread state again so CallState_end can restore it properly */
cs->thread_state = PyEval_SaveThread();
if (!CallState_end(self, cs)) {
/* CallState_end detected signal and cleaned up */
return 1; /* Signal detected */
}
return 1;
}

/* Re-release GIL for next iteration */
cs->thread_state = PyEval_SaveThread();
return 0; /* No signal, continue */
}


/****************************************************************************
*
*
* Consumer Methods
*
*
*
*
****************************************************************************/


static PyObject *Consumer_subscribe (Handle *self, PyObject *args,
PyObject *kwargs) {
Expand Down Expand Up @@ -984,14 +1056,37 @@ static PyObject *Consumer_offsets_for_times (Handle *self, PyObject *args,
#endif
}


/**
* @brief Poll for a single message from the subscribed topics.
*
* Instead of a single blocking call to rd_kafka_consumer_poll() with the
* full timeout, this function:
* 1. Splits the timeout into 200ms chunks
Copy link

Copilot AI Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The function documentation would benefit from documenting the CHUNK_TIMEOUT_MS constant value (200ms) in the description to match the implementation details mentioned in comment lines.

Suggested change
* 1. Splits the timeout into 200ms chunks
* 1. Splits the timeout into 200ms chunks (CHUNK_TIMEOUT_MS = 200ms)

Copilot uses AI. Check for mistakes.
* 2. Calls rd_kafka_consumer_poll() with chunk timeout
* 3. Between chunks, re-acquires GIL and calls PyErr_CheckSignals()
* 4. If signal detected, returns NULL (raises KeyboardInterrupt)
* 5. Continues until message received, timeout expired, or signal detected
*
*
* @param self Consumer handle
* @param args Positional arguments (unused)
* @param kwargs Keyword arguments:
* - timeout (float, optional): Timeout in seconds.
* Default: -1.0 (infinite timeout)
* @return PyObject* Message object, None if timeout, or NULL on error
* (raises KeyboardInterrupt if signal detected)
*/
static PyObject *Consumer_poll (Handle *self, PyObject *args,
PyObject *kwargs) {
double tmout = -1.0f;
static char *kws[] = { "timeout", NULL };
rd_kafka_message_t *rkm;
rd_kafka_message_t *rkm = NULL;
PyObject *msgobj;
CallState cs;
const int CHUNK_TIMEOUT_MS = 200; /* 200ms chunks for signal checking */
int total_timeout_ms;
int chunk_timeout_ms;
int chunk_count = 0;

if (!self->rk) {
PyErr_SetString(PyExc_RuntimeError,
Expand All @@ -1002,16 +1097,43 @@ static PyObject *Consumer_poll (Handle *self, PyObject *args,
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|d", kws, &tmout))
return NULL;

total_timeout_ms = cfl_timeout_ms(tmout);

CallState_begin(self, &cs);

rkm = rd_kafka_consumer_poll(self->rk, cfl_timeout_ms(tmout));
while (1) {
/* Calculate timeout for this chunk */
chunk_timeout_ms = calculate_chunk_timeout(total_timeout_ms, chunk_count,
CHUNK_TIMEOUT_MS);
if (chunk_timeout_ms == 0) {
/* Timeout expired */
break;
}

/* Poll with chunk timeout */
rkm = rd_kafka_consumer_poll(self->rk, chunk_timeout_ms);

/* If we got a message, exit the loop */
if (rkm) {
break;
}

chunk_count++;

/* Check for signals between chunks */
if (check_signals_between_chunks(self, &cs)) {
return NULL;
}
}

/* Final GIL restore and signal check */
if (!CallState_end(self, &cs)) {
if (rkm)
rd_kafka_message_destroy(rkm);
return NULL;
}

/* Handle the message */
if (!rkm)
Py_RETURN_NONE;

Expand Down Expand Up @@ -1053,7 +1175,27 @@ static PyObject *Consumer_memberid (Handle *self, PyObject *args,
return memberidobj;
}


/**
* @brief Consume a batch of messages from the subscribed topics.
*
* Instead of a single blocking call to rd_kafka_consume_batch_queue() with the
* full timeout, this function:
* 1. Splits the timeout into 200ms chunks
* 2. Calls rd_kafka_consume_batch_queue() with chunk timeout
* 3. Between chunks, re-acquires GIL and calls PyErr_CheckSignals()
* 4. If signal detected, returns NULL (raises KeyboardInterrupt)
* 5. Continues until messages received, timeout expired, or signal detected.
*
* @param self Consumer handle
* @param args Positional arguments (unused)
* @param kwargs Keyword arguments:
* - num_messages (int, optional): Maximum number of messages to
* consume per call. Default: 1. Maximum: 1000000.
* - timeout (float, optional): Timeout in seconds.
* Default: -1.0 (infinite timeout)
* @return PyObject* List of Message objects, empty list if timeout, or NULL on error
* (raises KeyboardInterrupt if signal detected)
*/
static PyObject *Consumer_consume (Handle *self, PyObject *args,
PyObject *kwargs) {
unsigned int num_messages = 1;
Expand All @@ -1063,7 +1205,11 @@ static PyObject *Consumer_consume (Handle *self, PyObject *args,
PyObject *msglist;
rd_kafka_queue_t *rkqu = self->u.Consumer.rkqu;
CallState cs;
Py_ssize_t i, n;
Py_ssize_t i, n = 0;
const int CHUNK_TIMEOUT_MS = 200; /* 200ms chunks for signal checking */
int total_timeout_ms;
int chunk_timeout_ms;
int chunk_count = 0;

if (!self->rk) {
PyErr_SetString(PyExc_RuntimeError,
Expand All @@ -1081,14 +1227,53 @@ static PyObject *Consumer_consume (Handle *self, PyObject *args,
return NULL;
}

CallState_begin(self, &cs);
total_timeout_ms = cfl_timeout_ms(tmout);

rkmessages = malloc(num_messages * sizeof(rd_kafka_message_t *));
if (!rkmessages) {
PyErr_NoMemory();
return NULL;
}

CallState_begin(self, &cs);

while (1) {
/* Calculate timeout for this chunk */
chunk_timeout_ms = calculate_chunk_timeout(total_timeout_ms, chunk_count,
CHUNK_TIMEOUT_MS);
if (chunk_timeout_ms == 0) {
/* Timeout expired */
break;
}

/* Consume with chunk timeout */
n = (Py_ssize_t)rd_kafka_consume_batch_queue(rkqu, chunk_timeout_ms,
rkmessages, num_messages);

if (n < 0) {
/* Error - need to restore GIL before setting error */
PyEval_RestoreThread(cs.thread_state);
free(rkmessages);
cfl_PyErr_Format(rd_kafka_last_error(),
"%s", rd_kafka_err2str(rd_kafka_last_error()));
return NULL;
}

/* If we got messages, exit the loop */
if (n > 0) {
break;
}

n = (Py_ssize_t)rd_kafka_consume_batch_queue(rkqu,
cfl_timeout_ms(tmout),
rkmessages, num_messages);
chunk_count++;

/* Check for signals between chunks */
if (check_signals_between_chunks(self, &cs)) {
free(rkmessages);
return NULL;
}
}

/* Final GIL restore and signal check */
if (!CallState_end(self, &cs)) {
for (i = 0; i < n; i++) {
rd_kafka_message_destroy(rkmessages[i]);
Expand All @@ -1097,13 +1282,7 @@ static PyObject *Consumer_consume (Handle *self, PyObject *args,
return NULL;
}

if (n < 0) {
free(rkmessages);
cfl_PyErr_Format(rd_kafka_last_error(),
"%s", rd_kafka_err2str(rd_kafka_last_error()));
return NULL;
}

/* Create Python list from messages */
Copy link

Copilot AI Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra whitespace before closing comment marker.

Suggested change
/* Create Python list from messages */
/* Create Python list from messages */

Copilot uses AI. Check for mistakes.
msglist = PyList_New(n);

for (i = 0; i < n; i++) {
Expand Down
Loading