Skip to content

Commit 42d3ab1

Browse files
committed
Make consumer - consumer and poll wakeable
1 parent 36a9e6c commit 42d3ab1

File tree

3 files changed

+960
-17
lines changed

3 files changed

+960
-17
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# Confluent Python Client for Apache Kafka - CHANGELOG
22

3+
4+
### Fixes
5+
6+
- Fixed `Consumer.poll()` and `Consumer.consume()` blocking indefinitely and not responding to Ctrl+C (KeyboardInterrupt) signals. The implementation now uses a "wakeable poll" pattern that breaks long blocking calls into smaller chunks (200ms) and periodically re-acquires the Python GIL to check for pending signals. This allows Ctrl+C to properly interrupt blocking consumer operations. Fixes Issues [#209](https://github.com/confluentinc/confluent-kafka-python/issues/209) and [#807](https://github.com/confluentinc/confluent-kafka-python/issues/807).
7+
8+
39
## v2.12.1 - 2025-10-21
410

511
v2.12.1 is a maintenance release with the following fixes:

src/confluent_kafka/src/Consumer.c

Lines changed: 195 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,81 @@ static int Consumer_traverse (Handle *self,
106106
}
107107

108108

109+
/****************************************************************************
110+
*
111+
* Helper functions for implementing interruptible poll/consume operations
112+
* that allow Ctrl+C to terminate blocking calls. See Issues #209 and #807.
113+
*
114+
*
115+
****************************************************************************/
116+
117+
/**
118+
* @brief Calculate the timeout for the current chunk in wakeable poll pattern.
119+
*
120+
* @param total_timeout_ms Total timeout in milliseconds (-1 for infinite)
121+
* @param chunk_count Current chunk iteration count (0-based)
122+
* @param chunk_timeout_ms Chunk size in milliseconds (typically 200ms)
123+
* @return int Chunk timeout in milliseconds, or 0 if total timeout expired
124+
*/
125+
static int calculate_chunk_timeout(int total_timeout_ms, int chunk_count,
126+
int chunk_timeout_ms) {
127+
if (total_timeout_ms < 0) {
128+
/* Infinite timeout - use chunk size */
129+
return chunk_timeout_ms;
130+
} else {
131+
/* Finite timeout - calculate remaining */
132+
int remaining_ms = total_timeout_ms - (chunk_count * chunk_timeout_ms);
133+
if (remaining_ms <= 0) {
134+
/* Timeout expired */
135+
return 0;
136+
}
137+
return (remaining_ms < chunk_timeout_ms) ? remaining_ms : chunk_timeout_ms;
138+
}
139+
}
140+
141+
/**
142+
* @brief Check for pending signals between poll chunks.
143+
*
144+
* Re-acquires GIL, checks for signals, and handles cleanup if signal detected.
145+
* This allows Ctrl+C to interrupt blocking poll/consume operations.
146+
*
147+
* @param self Consumer handle
148+
* @param cs CallState structure (thread state will be updated)
149+
* @return int 0 if no signal detected (continue), 1 if signal detected (should return NULL)
150+
*/
151+
static int check_signals_between_chunks(Handle *self, CallState *cs) {
152+
/* Re-acquire GIL to check for signals */
153+
PyEval_RestoreThread(cs->thread_state);
154+
155+
/* Check for pending signals (KeyboardInterrupt, etc.) */
156+
/* PyErr_CheckSignals() already set the exception */
157+
if (PyErr_CheckSignals() == -1) {
158+
/* Note: GIL is already held, but CallState_end expects to restore it */
159+
/* Save thread state again so CallState_end can restore it properly */
160+
cs->thread_state = PyEval_SaveThread();
161+
if (!CallState_end(self, cs)) {
162+
/* CallState_end detected signal and cleaned up */
163+
return 1; /* Signal detected */
164+
}
165+
return 1;
166+
}
109167

168+
/* Re-release GIL for next iteration */
169+
cs->thread_state = PyEval_SaveThread();
170+
return 0; /* No signal, continue */
171+
}
110172

111173

174+
/****************************************************************************
175+
*
176+
*
177+
* Consumer Methods
178+
*
179+
*
180+
*
181+
*
182+
****************************************************************************/
183+
112184

113185
static PyObject *Consumer_subscribe (Handle *self, PyObject *args,
114186
PyObject *kwargs) {
@@ -984,14 +1056,37 @@ static PyObject *Consumer_offsets_for_times (Handle *self, PyObject *args,
9841056
#endif
9851057
}
9861058

987-
1059+
/**
1060+
* @brief Poll for a single message from the subscribed topics.
1061+
*
1062+
* Instead of a single blocking call to rd_kafka_consumer_poll() with the
1063+
* full timeout, this function:
1064+
* 1. Splits the timeout into 200ms chunks
1065+
* 2. Calls rd_kafka_consumer_poll() with chunk timeout
1066+
* 3. Between chunks, re-acquires GIL and calls PyErr_CheckSignals()
1067+
* 4. If signal detected, returns NULL (raises KeyboardInterrupt)
1068+
* 5. Continues until message received, timeout expired, or signal detected
1069+
*
1070+
*
1071+
* @param self Consumer handle
1072+
* @param args Positional arguments (unused)
1073+
* @param kwargs Keyword arguments:
1074+
* - timeout (float, optional): Timeout in seconds.
1075+
* Default: -1.0 (infinite timeout)
1076+
* @return PyObject* Message object, None if timeout, or NULL on error
1077+
* (raises KeyboardInterrupt if signal detected)
1078+
*/
9881079
static PyObject *Consumer_poll (Handle *self, PyObject *args,
9891080
PyObject *kwargs) {
9901081
double tmout = -1.0f;
9911082
static char *kws[] = { "timeout", NULL };
992-
rd_kafka_message_t *rkm;
1083+
rd_kafka_message_t *rkm = NULL;
9931084
PyObject *msgobj;
9941085
CallState cs;
1086+
const int CHUNK_TIMEOUT_MS = 200; /* 200ms chunks for signal checking */
1087+
int total_timeout_ms;
1088+
int chunk_timeout_ms;
1089+
int chunk_count = 0;
9951090

9961091
if (!self->rk) {
9971092
PyErr_SetString(PyExc_RuntimeError,
@@ -1002,16 +1097,43 @@ static PyObject *Consumer_poll (Handle *self, PyObject *args,
10021097
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|d", kws, &tmout))
10031098
return NULL;
10041099

1100+
total_timeout_ms = cfl_timeout_ms(tmout);
1101+
10051102
CallState_begin(self, &cs);
10061103

1007-
rkm = rd_kafka_consumer_poll(self->rk, cfl_timeout_ms(tmout));
1104+
while (1) {
1105+
/* Calculate timeout for this chunk */
1106+
chunk_timeout_ms = calculate_chunk_timeout(total_timeout_ms, chunk_count,
1107+
CHUNK_TIMEOUT_MS);
1108+
if (chunk_timeout_ms == 0) {
1109+
/* Timeout expired */
1110+
break;
1111+
}
1112+
1113+
/* Poll with chunk timeout */
1114+
rkm = rd_kafka_consumer_poll(self->rk, chunk_timeout_ms);
1115+
1116+
/* If we got a message, exit the loop */
1117+
if (rkm) {
1118+
break;
1119+
}
1120+
1121+
chunk_count++;
1122+
1123+
/* Check for signals between chunks */
1124+
if (check_signals_between_chunks(self, &cs)) {
1125+
return NULL;
1126+
}
1127+
}
10081128

1129+
/* Final GIL restore and signal check */
10091130
if (!CallState_end(self, &cs)) {
10101131
if (rkm)
10111132
rd_kafka_message_destroy(rkm);
10121133
return NULL;
10131134
}
10141135

1136+
/* Handle the message */
10151137
if (!rkm)
10161138
Py_RETURN_NONE;
10171139

@@ -1053,7 +1175,27 @@ static PyObject *Consumer_memberid (Handle *self, PyObject *args,
10531175
return memberidobj;
10541176
}
10551177

1056-
1178+
/**
1179+
* @brief Consume a batch of messages from the subscribed topics.
1180+
*
1181+
* Instead of a single blocking call to rd_kafka_consume_batch_queue() with the
1182+
* full timeout, this function:
1183+
* 1. Splits the timeout into 200ms chunks
1184+
* 2. Calls rd_kafka_consume_batch_queue() with chunk timeout
1185+
* 3. Between chunks, re-acquires GIL and calls PyErr_CheckSignals()
1186+
* 4. If signal detected, returns NULL (raises KeyboardInterrupt)
1187+
* 5. Continues until messages received, timeout expired, or signal detected.
1188+
*
1189+
* @param self Consumer handle
1190+
* @param args Positional arguments (unused)
1191+
* @param kwargs Keyword arguments:
1192+
* - num_messages (int, optional): Maximum number of messages to
1193+
* consume per call. Default: 1. Maximum: 1000000.
1194+
* - timeout (float, optional): Timeout in seconds.
1195+
* Default: -1.0 (infinite timeout)
1196+
* @return PyObject* List of Message objects, empty list if timeout, or NULL on error
1197+
* (raises KeyboardInterrupt if signal detected)
1198+
*/
10571199
static PyObject *Consumer_consume (Handle *self, PyObject *args,
10581200
PyObject *kwargs) {
10591201
unsigned int num_messages = 1;
@@ -1063,7 +1205,11 @@ static PyObject *Consumer_consume (Handle *self, PyObject *args,
10631205
PyObject *msglist;
10641206
rd_kafka_queue_t *rkqu = self->u.Consumer.rkqu;
10651207
CallState cs;
1066-
Py_ssize_t i, n;
1208+
Py_ssize_t i, n = 0;
1209+
const int CHUNK_TIMEOUT_MS = 200; /* 200ms chunks for signal checking */
1210+
int total_timeout_ms;
1211+
int chunk_timeout_ms;
1212+
int chunk_count = 0;
10671213

10681214
if (!self->rk) {
10691215
PyErr_SetString(PyExc_RuntimeError,
@@ -1081,14 +1227,53 @@ static PyObject *Consumer_consume (Handle *self, PyObject *args,
10811227
return NULL;
10821228
}
10831229

1084-
CallState_begin(self, &cs);
1230+
total_timeout_ms = cfl_timeout_ms(tmout);
10851231

10861232
rkmessages = malloc(num_messages * sizeof(rd_kafka_message_t *));
1233+
if (!rkmessages) {
1234+
PyErr_NoMemory();
1235+
return NULL;
1236+
}
1237+
1238+
CallState_begin(self, &cs);
1239+
1240+
while (1) {
1241+
/* Calculate timeout for this chunk */
1242+
chunk_timeout_ms = calculate_chunk_timeout(total_timeout_ms, chunk_count,
1243+
CHUNK_TIMEOUT_MS);
1244+
if (chunk_timeout_ms == 0) {
1245+
/* Timeout expired */
1246+
break;
1247+
}
1248+
1249+
/* Consume with chunk timeout */
1250+
n = (Py_ssize_t)rd_kafka_consume_batch_queue(rkqu, chunk_timeout_ms,
1251+
rkmessages, num_messages);
1252+
1253+
if (n < 0) {
1254+
/* Error - need to restore GIL before setting error */
1255+
PyEval_RestoreThread(cs.thread_state);
1256+
free(rkmessages);
1257+
cfl_PyErr_Format(rd_kafka_last_error(),
1258+
"%s", rd_kafka_err2str(rd_kafka_last_error()));
1259+
return NULL;
1260+
}
1261+
1262+
/* If we got messages, exit the loop */
1263+
if (n > 0) {
1264+
break;
1265+
}
10871266

1088-
n = (Py_ssize_t)rd_kafka_consume_batch_queue(rkqu,
1089-
cfl_timeout_ms(tmout),
1090-
rkmessages, num_messages);
1267+
chunk_count++;
10911268

1269+
/* Check for signals between chunks */
1270+
if (check_signals_between_chunks(self, &cs)) {
1271+
free(rkmessages);
1272+
return NULL;
1273+
}
1274+
}
1275+
1276+
/* Final GIL restore and signal check */
10921277
if (!CallState_end(self, &cs)) {
10931278
for (i = 0; i < n; i++) {
10941279
rd_kafka_message_destroy(rkmessages[i]);
@@ -1097,13 +1282,7 @@ static PyObject *Consumer_consume (Handle *self, PyObject *args,
10971282
return NULL;
10981283
}
10991284

1100-
if (n < 0) {
1101-
free(rkmessages);
1102-
cfl_PyErr_Format(rd_kafka_last_error(),
1103-
"%s", rd_kafka_err2str(rd_kafka_last_error()));
1104-
return NULL;
1105-
}
1106-
1285+
/* Create Python list from messages */
11071286
msglist = PyList_New(n);
11081287

11091288
for (i = 0; i < n; i++) {

0 commit comments

Comments
 (0)