Skip to content

Commit 458a531

Browse files
authored
Merge pull request #34 from aiven/gqmelo/fix-sending-thread-wakeup
Fix thread not waking up when there is still data to be sent
2 parents e0ab864 + 719cf45 commit 458a531

File tree

2 files changed

+64
-3
lines changed

2 files changed

+64
-3
lines changed

kafka/conn.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1040,6 +1040,13 @@ def send_pending_requests_v2(self):
10401040
total_bytes = self._send_bytes(self._send_buffer)
10411041
self._send_buffer = self._send_buffer[total_bytes:]
10421042

1043+
# If all data was sent, we need to get the new data from the protocol now, otherwise
1044+
# this function would return True, indicating that there are no more pending
1045+
# requests. This could cause the calling thread to wait indefinitely as it won't
1046+
# know that there is still buffered data to send.
1047+
if not self._send_buffer:
1048+
self._send_buffer = self._protocol.send_bytes()
1049+
10431050
if self._sensors:
10441051
self._sensors.bytes_sent.record(total_bytes)
10451052
# Return True iff send buffer is empty

test/test_conn.py

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
import mock
88
import pytest
99

10-
from kafka.conn import BrokerConnection, ConnectionStates, collect_hosts
10+
from kafka.conn import BrokerConnection, ConnectionStates, SSLWantWriteError, collect_hosts
11+
from kafka.metrics.metrics import Metrics
12+
from kafka.metrics.stats.sensor import Sensor
1113
from kafka.protocol.api import RequestHeader
1214
from kafka.protocol.metadata import MetadataRequest
1315
from kafka.protocol.produce import ProduceRequest
@@ -31,8 +33,20 @@ def _socket(mocker):
3133

3234

3335
@pytest.fixture
34-
def conn(_socket, dns_lookup):
35-
conn = BrokerConnection('localhost', 9092, socket.AF_INET)
36+
def metrics(mocker):
37+
metrics = mocker.MagicMock(Metrics)
38+
metrics.mocked_sensors = {}
39+
def sensor(name, **kwargs):
40+
if name not in metrics.mocked_sensors:
41+
metrics.mocked_sensors[name] = mocker.MagicMock(Sensor)
42+
return metrics.mocked_sensors[name]
43+
metrics.sensor.side_effect = sensor
44+
return metrics
45+
46+
47+
@pytest.fixture
48+
def conn(_socket, dns_lookup, metrics):
49+
conn = BrokerConnection('localhost', 9092, socket.AF_INET, metrics=metrics)
3650
return conn
3751

3852

@@ -161,6 +175,46 @@ def test_send_response(_socket, conn):
161175
assert len(conn.in_flight_requests) == 1
162176

163177

178+
def test_send_async_request_while_other_request_is_already_in_buffer(_socket, conn, metrics):
179+
conn.connect()
180+
assert conn.state is ConnectionStates.CONNECTED
181+
assert 'node-0.bytes-sent' in metrics.mocked_sensors
182+
bytes_sent_sensor = metrics.mocked_sensors['node-0.bytes-sent']
183+
184+
req1 = MetadataRequest[0](topics='foo')
185+
header1 = RequestHeader(req1, client_id=conn.config['client_id'])
186+
payload_bytes1 = len(header1.encode()) + len(req1.encode())
187+
req2 = MetadataRequest[0]([])
188+
header2 = RequestHeader(req2, client_id=conn.config['client_id'])
189+
payload_bytes2 = len(header2.encode()) + len(req2.encode())
190+
191+
# The first call to the socket will raise a transient SSL exception. This will make the first
192+
# request to be kept in the internal buffer to be sent in the next call of
193+
# send_pending_requests_v2.
194+
_socket.send.side_effect = [SSLWantWriteError, 4 + payload_bytes1, 4 + payload_bytes2]
195+
196+
conn.send(req1, blocking=False)
197+
# This won't send any bytes because of the SSL exception and the request bytes will be kept in
198+
# the buffer.
199+
assert conn.send_pending_requests_v2() is False
200+
assert bytes_sent_sensor.record.call_args_list[0].args == (0,)
201+
202+
conn.send(req2, blocking=False)
203+
# This will send the remaining bytes in the buffer from the first request, but should notice
204+
# that the second request was queued, therefore it should return False.
205+
bytes_sent_sensor.record.reset_mock()
206+
assert conn.send_pending_requests_v2() is False
207+
bytes_sent_sensor.record.assert_called_once_with(4 + payload_bytes1)
208+
209+
bytes_sent_sensor.record.reset_mock()
210+
assert conn.send_pending_requests_v2() is True
211+
bytes_sent_sensor.record.assert_called_once_with(4 + payload_bytes2)
212+
213+
bytes_sent_sensor.record.reset_mock()
214+
assert conn.send_pending_requests_v2() is True
215+
bytes_sent_sensor.record.assert_called_once_with(0)
216+
217+
164218
def test_send_error(_socket, conn):
165219
conn.connect()
166220
assert conn.state is ConnectionStates.CONNECTED

0 commit comments

Comments
 (0)