From 719cf4586526ff9c6c56e70eeefbaf349166e2a6 Mon Sep 17 00:00:00 2001 From: Guilherme Quentel Melo Date: Fri, 4 Jul 2025 11:28:01 +0200 Subject: [PATCH] Fix thread not waking up when there is still data to be sent When producing messages quickly without waiting for the future of previous requests, there could be some situations when the last batch was not sent. That seemed to be more frequent with larger messages (~100KiB), but apparently it could happen to any message when `linger_ms` is 0. Not sure if it could happen when it is non-zero though. The reason is that `BrokerConnection.send_pending_requests_v2` would fill the internal buffer with the bytes from a request and try to send it. https://github.com/aiven/kafka-python/blob/e0ab864f7aca3961e729cf03d1caa3899fbee617/kafka/conn.py#L1036 If it couldn't send it completely for some reason, it would try to send again in the next call to `send_pending_requests_v2`. But if between those 2 calls, `BrokerConnection.send` was called, new data would be appended to self._protocol: KafkaProtocol: https://github.com/aiven/kafka-python/blob/b01ffb6a004480635751e325db2ded20bcdc0d2f/kafka/conn.py#L981 but the second call to `send_pending_requests_v2` wouldn't check if any new data was available and would return False: https://github.com/aiven/kafka-python/blob/e0ab864f7aca3961e729cf03d1caa3899fbee617/kafka/conn.py#L1035 This would tell `KafkaClient._poll` that all pending data was sent, which would make the client not listen to socked write readiness anymore: https://github.com/aiven/kafka-python/blob/b01ffb6a004480635751e325db2ded20bcdc0d2f/kafka/client_async.py#L663-L667 --- kafka/conn.py | 7 ++++++ test/test_conn.py | 60 ++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 64 insertions(+), 3 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index df903f264..572e93d5d 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -1040,6 +1040,13 @@ def send_pending_requests_v2(self): total_bytes = self._send_bytes(self._send_buffer) self._send_buffer = self._send_buffer[total_bytes:] + # If all data was sent, we need to get the new data from the protocol now, otherwise + # this function would return True, indicating that there are no more pending + # requests. This could cause the calling thread to wait indefinitely as it won't + # know that there is still buffered data to send. + if not self._send_buffer: + self._send_buffer = self._protocol.send_bytes() + if self._sensors: self._sensors.bytes_sent.record(total_bytes) # Return True iff send buffer is empty diff --git a/test/test_conn.py b/test/test_conn.py index 966f7b34d..098bc37a0 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -7,7 +7,9 @@ import mock import pytest -from kafka.conn import BrokerConnection, ConnectionStates, collect_hosts +from kafka.conn import BrokerConnection, ConnectionStates, SSLWantWriteError, collect_hosts +from kafka.metrics.metrics import Metrics +from kafka.metrics.stats.sensor import Sensor from kafka.protocol.api import RequestHeader from kafka.protocol.metadata import MetadataRequest from kafka.protocol.produce import ProduceRequest @@ -31,8 +33,20 @@ def _socket(mocker): @pytest.fixture -def conn(_socket, dns_lookup): - conn = BrokerConnection('localhost', 9092, socket.AF_INET) +def metrics(mocker): + metrics = mocker.MagicMock(Metrics) + metrics.mocked_sensors = {} + def sensor(name, **kwargs): + if name not in metrics.mocked_sensors: + metrics.mocked_sensors[name] = mocker.MagicMock(Sensor) + return metrics.mocked_sensors[name] + metrics.sensor.side_effect = sensor + return metrics + + +@pytest.fixture +def conn(_socket, dns_lookup, metrics): + conn = BrokerConnection('localhost', 9092, socket.AF_INET, metrics=metrics) return conn @@ -161,6 +175,46 @@ def test_send_response(_socket, conn): assert len(conn.in_flight_requests) == 1 +def test_send_async_request_while_other_request_is_already_in_buffer(_socket, conn, metrics): + conn.connect() + assert conn.state is ConnectionStates.CONNECTED + assert 'node-0.bytes-sent' in metrics.mocked_sensors + bytes_sent_sensor = metrics.mocked_sensors['node-0.bytes-sent'] + + req1 = MetadataRequest[0](topics='foo') + header1 = RequestHeader(req1, client_id=conn.config['client_id']) + payload_bytes1 = len(header1.encode()) + len(req1.encode()) + req2 = MetadataRequest[0]([]) + header2 = RequestHeader(req2, client_id=conn.config['client_id']) + payload_bytes2 = len(header2.encode()) + len(req2.encode()) + + # The first call to the socket will raise a transient SSL exception. This will make the first + # request to be kept in the internal buffer to be sent in the next call of + # send_pending_requests_v2. + _socket.send.side_effect = [SSLWantWriteError, 4 + payload_bytes1, 4 + payload_bytes2] + + conn.send(req1, blocking=False) + # This won't send any bytes because of the SSL exception and the request bytes will be kept in + # the buffer. + assert conn.send_pending_requests_v2() is False + assert bytes_sent_sensor.record.call_args_list[0].args == (0,) + + conn.send(req2, blocking=False) + # This will send the remaining bytes in the buffer from the first request, but should notice + # that the second request was queued, therefore it should return False. + bytes_sent_sensor.record.reset_mock() + assert conn.send_pending_requests_v2() is False + bytes_sent_sensor.record.assert_called_once_with(4 + payload_bytes1) + + bytes_sent_sensor.record.reset_mock() + assert conn.send_pending_requests_v2() is True + bytes_sent_sensor.record.assert_called_once_with(4 + payload_bytes2) + + bytes_sent_sensor.record.reset_mock() + assert conn.send_pending_requests_v2() is True + bytes_sent_sensor.record.assert_called_once_with(0) + + def test_send_error(_socket, conn): conn.connect() assert conn.state is ConnectionStates.CONNECTED