Skip to content

Conversation

@gqmelo
Copy link

@gqmelo gqmelo commented Jul 7, 2025

When producing messages quickly without waiting for the future of previous requests, there could be some situations when the last batch was not sent.

That seemed to be more frequent with larger messages (~100KiB), but apparently it could happen to any message when linger_ms is 0. Not sure if it could happen when it is non-zero though.

The reason is that BrokerConnection.send_pending_requests_v2 would fill the internal buffer with the bytes from a request and try to send it.

self._send_buffer = self._protocol.send_bytes()

If it couldn't send it completely for some reason, it would try to send again in the next call to send_pending_requests_v2.

But if between those 2 calls, BrokerConnection.send was called, new data would be appended to self._protocol: KafkaProtocol:

correlation_id = self._protocol.send_request(request)

but the second call to send_pending_requests_v2 wouldn't check if any new data was available and would return False:

if not self._send_buffer:

This would tell KafkaClient._poll that all pending data was sent, which would make the client not listen to socked write readiness anymore:

if conn.send_pending_requests_v2():
# If send is complete, we dont need to track write readiness
# for this socket anymore
if key.events ^ selectors.EVENT_WRITE:
self._selector.modify(

@gqmelo gqmelo force-pushed the gqmelo/fix-sending-thread-wakeup branch 3 times, most recently from 880fe0e to 42c6f8e Compare July 8, 2025 09:44
conn.send(req2, blocking=False)
# This will send the remaining bytes in the buffer from the first request, but should notice
# that the second request was queued, therefore it should return False.
assert conn.send_pending_requests_v2() is False
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where the test would fail before the fix.

@juha-aiven
Copy link

I think the PR is fine. But I'd like to somebody like e.g. @aiven-anton to have a look at it too. The PR touches kafka-python which we don't usually work on and that's used here and there in aiven-core. Thus, better review this a little more carefully.

Thus, not approving/merging.

@gqmelo gqmelo requested a review from aiven-anton August 11, 2025 11:55
Copy link

@aiven-anton aiven-anton left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not well versed in this code-base, but this seems like a well thought-through fix and I cannot spot any real issues with it. It seems sound to move ahead with this.

We should also notify the upstream(s) of this issue and fix.

I left two minor comments, let me know if you want to address those.

When producing messages quickly without waiting for the future of
previous requests, there could  be some situations when the last batch
was not sent.

That seemed to be more frequent with larger messages (~100KiB), but
apparently it could happen to any message when `linger_ms` is 0. Not
sure if it could happen when it is non-zero though.

The reason is that `BrokerConnection.send_pending_requests_v2` would
fill the internal buffer with the bytes from a request and try to send it.

https://github.com/aiven/kafka-python/blob/e0ab864f7aca3961e729cf03d1caa3899fbee617/kafka/conn.py#L1036

If it couldn't send it completely for some reason, it would try to send
again in the next call to `send_pending_requests_v2`.

But if between those 2 calls, `BrokerConnection.send` was called, new
data would be appended to self._protocol: KafkaProtocol:

https://github.com/aiven/kafka-python/blob/b01ffb6a004480635751e325db2ded20bcdc0d2f/kafka/conn.py#L981

but the second call to `send_pending_requests_v2` wouldn't check if
any new data was available and would return False:

https://github.com/aiven/kafka-python/blob/e0ab864f7aca3961e729cf03d1caa3899fbee617/kafka/conn.py#L1035

This would tell `KafkaClient._poll` that all pending data was sent,
which would make the client not listen to socked write readiness anymore:

https://github.com/aiven/kafka-python/blob/b01ffb6a004480635751e325db2ded20bcdc0d2f/kafka/client_async.py#L663-L667
@gqmelo gqmelo force-pushed the gqmelo/fix-sending-thread-wakeup branch from 42c6f8e to 719cf45 Compare August 13, 2025 10:00
@gqmelo gqmelo requested a review from aiven-anton August 13, 2025 10:02
@gqmelo
Copy link
Author

gqmelo commented Aug 13, 2025

After this is merged, I'll open a PR in the upstream repo

@aiven-anton aiven-anton merged commit 458a531 into main Aug 13, 2025
@aiven-anton aiven-anton deleted the gqmelo/fix-sending-thread-wakeup branch August 13, 2025 10:41
@gqmelo
Copy link
Author

gqmelo commented Aug 19, 2025

After this is merged, I'll open a PR in the upstream repo

Upstream PR: dpkp#2670

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants