Skip to content

Commit 880fe0e

Browse files
committed
Fix thread not waking up when there is still data to be sent
When producing messages quickly without waiting for the future of previous requests, there could be some situations when the last batch was not sent. That seemed to be more frequent with larger messages (~100KiB), but apparently it could happen to any message when `linger_ms` is 0. Not sure if it could happen when it is non-zero though. The reason is that `BrokerConnection.send_pending_requests_v2` would fill the internal buffer with the bytes from a request and try to send it. https://github.com/aiven/kafka-python/blob/e0ab864f7aca3961e729cf03d1caa3899fbee617/kafka/conn.py#L1036 If it couldn't send it completely for some reason, it would try to send again in the next call to `send_pending_requests_v2`. But if between those 2 calls, `BrokerConnection.send` was called, new data would be appended to self._protocol: KafkaProtocol: https://github.com/aiven/kafka-python/blob/b01ffb6a004480635751e325db2ded20bcdc0d2f/kafka/conn.py#L981 but the second call to `send_pending_requests_v2` wouldn't check if any new data was available and would return False: https://github.com/aiven/kafka-python/blob/e0ab864f7aca3961e729cf03d1caa3899fbee617/kafka/conn.py#L1035 This would tell `KafkaClient._poll` that all pending data was sent, which would make the client not listen to socked write readiness anymore: https://github.com/aiven/kafka-python/blob/b01ffb6a004480635751e325db2ded20bcdc0d2f/kafka/client_async.py#L663-L667
1 parent e0ab864 commit 880fe0e

File tree

2 files changed

+60
-3
lines changed

2 files changed

+60
-3
lines changed

kafka/conn.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1040,6 +1040,13 @@ def send_pending_requests_v2(self):
10401040
total_bytes = self._send_bytes(self._send_buffer)
10411041
self._send_buffer = self._send_buffer[total_bytes:]
10421042

1043+
# If all data was sent, we need to get the new data from the protocol now, otherwise
1044+
# this function would return True, indicating that there is no more pending
1045+
# requests. This could cause the calling thread to wait indefinitely as it won't
1046+
# know that there is still buffered data to send.
1047+
if not self._send_buffer:
1048+
self._send_buffer = self._protocol.send_bytes()
1049+
10431050
if self._sensors:
10441051
self._sensors.bytes_sent.record(total_bytes)
10451052
# Return True iff send buffer is empty

test/test_conn.py

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
import mock
88
import pytest
99

10-
from kafka.conn import BrokerConnection, ConnectionStates, collect_hosts
10+
from kafka.conn import BrokerConnection, ConnectionStates, SSLWantWriteError, collect_hosts
11+
from kafka.metrics.metrics import Metrics
12+
from kafka.metrics.stats.sensor import Sensor
1113
from kafka.protocol.api import RequestHeader
1214
from kafka.protocol.metadata import MetadataRequest
1315
from kafka.protocol.produce import ProduceRequest
@@ -31,8 +33,20 @@ def _socket(mocker):
3133

3234

3335
@pytest.fixture
34-
def conn(_socket, dns_lookup):
35-
conn = BrokerConnection('localhost', 9092, socket.AF_INET)
36+
def metrics(mocker):
37+
metrics = mocker.MagicMock(Metrics)
38+
metrics.mocked_sensors = {}
39+
def sensor(name, **kwargs):
40+
if name not in metrics.mocked_sensors:
41+
metrics.mocked_sensors[name] = mocker.MagicMock(Sensor)
42+
return metrics.mocked_sensors[name]
43+
metrics.sensor.side_effect = sensor
44+
return metrics
45+
46+
47+
@pytest.fixture
48+
def conn(_socket, dns_lookup, metrics):
49+
conn = BrokerConnection('localhost', 9092, socket.AF_INET, metrics=metrics)
3650
return conn
3751

3852

@@ -161,6 +175,42 @@ def test_send_response(_socket, conn):
161175
assert len(conn.in_flight_requests) == 1
162176

163177

178+
def test_send_async_request_while_other_request_is_already_in_buffer(_socket, conn, metrics):
179+
conn.connect()
180+
assert conn.state is ConnectionStates.CONNECTED
181+
assert 'node-0.bytes-sent' in metrics.mocked_sensors
182+
bytes_sent_sensor = metrics.mocked_sensors['node-0.bytes-sent']
183+
184+
req1 = MetadataRequest[0](topics='foo')
185+
header1 = RequestHeader(req1, client_id=conn.config['client_id'])
186+
payload_bytes1 = len(header1.encode()) + len(req1.encode())
187+
req2 = MetadataRequest[0]([])
188+
header2 = RequestHeader(req2, client_id=conn.config['client_id'])
189+
payload_bytes2 = len(header2.encode()) + len(req2.encode())
190+
191+
# The first call to the socket will raise a transient SSL exception. This will make the first
192+
# request to be kept in the internal buffer to be sent in the next call of
193+
# send_pending_requests_v2.
194+
_socket.send.side_effect = [SSLWantWriteError, 4 + payload_bytes1, 4 + payload_bytes2]
195+
196+
conn.send(req1, blocking=False)
197+
# This won't send any bytes because and the request bytes will be kept in the buffer.
198+
assert conn.send_pending_requests_v2() is False
199+
assert bytes_sent_sensor.record.call_args_list[0].args == (0,)
200+
201+
conn.send(req2, blocking=False)
202+
# This will send the remaining bytes in the buffer from the first request, but should notice
203+
# that the second request was queued, therefore it should return False.
204+
assert conn.send_pending_requests_v2() is False
205+
assert bytes_sent_sensor.record.call_args_list[1].args == (4 + payload_bytes1,)
206+
207+
assert conn.send_pending_requests_v2() is True
208+
assert bytes_sent_sensor.record.call_args_list[2].args == (4 + payload_bytes2,)
209+
210+
assert conn.send_pending_requests_v2() is True
211+
assert bytes_sent_sensor.record.call_args_list[3].args == (0,)
212+
213+
164214
def test_send_error(_socket, conn):
165215
conn.connect()
166216
assert conn.state is ConnectionStates.CONNECTED

0 commit comments

Comments
 (0)