Skip to content

Commit 0df67a5

Browse files
committed
Skip lock release reacquire for small timeouts
1 parent 1513f94 commit 0df67a5

File tree

1 file changed

+65
-39
lines changed

1 file changed

+65
-39
lines changed

src/confluent_kafka/src/Consumer.c

Lines changed: 65 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1101,28 +1101,36 @@ static PyObject *Consumer_poll (Handle *self, PyObject *args,
11011101

11021102
CallState_begin(self, &cs);
11031103

1104-
while (1) {
1105-
/* Calculate timeout for this chunk */
1106-
chunk_timeout_ms = calculate_chunk_timeout(total_timeout_ms, chunk_count,
1107-
CHUNK_TIMEOUT_MS);
1108-
if (chunk_timeout_ms == 0) {
1109-
/* Timeout expired */
1110-
break;
1111-
}
1104+
/* Skip wakeable poll pattern for non-blocking or very short timeouts.
1105+
* This avoids unnecessary GIL re-acquisition that can interfere with
1106+
* ThreadPool. Only use wakeable poll for
1107+
* blocking calls that need to be interruptible. */
1108+
if (total_timeout_ms >= 0 && total_timeout_ms < CHUNK_TIMEOUT_MS) {
1109+
rkm = rd_kafka_consumer_poll(self->rk, total_timeout_ms);
1110+
} else {
1111+
while (1) {
1112+
/* Calculate timeout for this chunk */
1113+
chunk_timeout_ms = calculate_chunk_timeout(total_timeout_ms, chunk_count,
1114+
CHUNK_TIMEOUT_MS);
1115+
if (chunk_timeout_ms == 0) {
1116+
/* Timeout expired */
1117+
break;
1118+
}
11121119

1113-
/* Poll with chunk timeout */
1114-
rkm = rd_kafka_consumer_poll(self->rk, chunk_timeout_ms);
1120+
/* Poll with chunk timeout */
1121+
rkm = rd_kafka_consumer_poll(self->rk, chunk_timeout_ms);
11151122

1116-
/* If we got a message, exit the loop */
1117-
if (rkm) {
1118-
break;
1119-
}
1123+
/* If we got a message, exit the loop */
1124+
if (rkm) {
1125+
break;
1126+
}
11201127

1121-
chunk_count++;
1128+
chunk_count++;
11221129

1123-
/* Check for signals between chunks */
1124-
if (check_signals_between_chunks(self, &cs)) {
1125-
return NULL;
1130+
/* Check for signals between chunks */
1131+
if (check_signals_between_chunks(self, &cs)) {
1132+
return NULL;
1133+
}
11261134
}
11271135
}
11281136

@@ -1237,18 +1245,13 @@ static PyObject *Consumer_consume (Handle *self, PyObject *args,
12371245

12381246
CallState_begin(self, &cs);
12391247

1240-
while (1) {
1241-
/* Calculate timeout for this chunk */
1242-
chunk_timeout_ms = calculate_chunk_timeout(total_timeout_ms, chunk_count,
1243-
CHUNK_TIMEOUT_MS);
1244-
if (chunk_timeout_ms == 0) {
1245-
/* Timeout expired */
1246-
break;
1247-
}
1248-
1249-
/* Consume with chunk timeout */
1250-
n = (Py_ssize_t)rd_kafka_consume_batch_queue(rkqu, chunk_timeout_ms,
1251-
rkmessages, num_messages);
1248+
/* Skip wakeable poll pattern for non-blocking or very short timeouts.
1249+
* This avoids unnecessary GIL re-acquisition that can interfere with
1250+
* ThreadPool. Only use wakeable poll for
1251+
* blocking calls that need to be interruptible. */
1252+
if (total_timeout_ms >= 0 && total_timeout_ms < CHUNK_TIMEOUT_MS) {
1253+
n = (Py_ssize_t)rd_kafka_consume_batch_queue(rkqu, total_timeout_ms,
1254+
rkmessages, num_messages);
12521255

12531256
if (n < 0) {
12541257
/* Error - need to restore GIL before setting error */
@@ -1258,18 +1261,41 @@ static PyObject *Consumer_consume (Handle *self, PyObject *args,
12581261
"%s", rd_kafka_err2str(rd_kafka_last_error()));
12591262
return NULL;
12601263
}
1264+
} else {
1265+
while (1) {
1266+
/* Calculate timeout for this chunk */
1267+
chunk_timeout_ms = calculate_chunk_timeout(total_timeout_ms, chunk_count,
1268+
CHUNK_TIMEOUT_MS);
1269+
if (chunk_timeout_ms == 0) {
1270+
/* Timeout expired */
1271+
break;
1272+
}
12611273

1262-
/* If we got messages, exit the loop */
1263-
if (n > 0) {
1264-
break;
1265-
}
1274+
/* Consume with chunk timeout */
1275+
n = (Py_ssize_t)rd_kafka_consume_batch_queue(rkqu, chunk_timeout_ms,
1276+
rkmessages, num_messages);
1277+
1278+
if (n < 0) {
1279+
/* Error - need to restore GIL before setting error */
1280+
PyEval_RestoreThread(cs.thread_state);
1281+
free(rkmessages);
1282+
cfl_PyErr_Format(rd_kafka_last_error(),
1283+
"%s", rd_kafka_err2str(rd_kafka_last_error()));
1284+
return NULL;
1285+
}
12661286

1267-
chunk_count++;
1287+
/* If we got messages, exit the loop */
1288+
if (n > 0) {
1289+
break;
1290+
}
12681291

1269-
/* Check for signals between chunks */
1270-
if (check_signals_between_chunks(self, &cs)) {
1271-
free(rkmessages);
1272-
return NULL;
1292+
chunk_count++;
1293+
1294+
/* Check for signals between chunks */
1295+
if (check_signals_between_chunks(self, &cs)) {
1296+
free(rkmessages);
1297+
return NULL;
1298+
}
12731299
}
12741300
}
12751301

0 commit comments

Comments
 (0)