-
Notifications
You must be signed in to change notification settings - Fork 14.8k
MINOR: Initialize fetchPartitionStatus as a Map type to reduce collection conversions #20768
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| val fetchInfos = fetchPartitionStatus.asScala.map { case (tp, status) => | ||
| tp -> status.fetchInfo | ||
| } | ||
| }.toSeq |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to have duplicate TPs in this "fetch" path?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, we are doing two conversions here - you can avoid one by using a view or iterator after asScala and using toBuffer instead of toSeq (the latter can result in a lazy collection being created which can result in problems).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The source of fetchPartitionStatus comes from fetchData().
kafka/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
Lines 395 to 420 in 53e1172
| public Map<TopicIdPartition, PartitionData> fetchData(Map<Uuid, String> topicNames) { | |
| final LinkedHashMap<TopicIdPartition, PartitionData> fetchData = new LinkedHashMap<>(); | |
| final short version = version(); | |
| data.topics().forEach(fetchTopic -> { | |
| String name; | |
| if (version < 13) { | |
| name = fetchTopic.topic(); // can't be null | |
| } else { | |
| name = topicNames.get(fetchTopic.topicId()); | |
| } | |
| fetchTopic.partitions().forEach(fetchPartition -> | |
| // Topic name may be null here if the topic name was unable to be resolved using the topicNames map. | |
| fetchData.put(new TopicIdPartition(fetchTopic.topicId(), new TopicPartition(name, fetchPartition.partition())), | |
| new PartitionData( | |
| fetchTopic.topicId(), | |
| fetchPartition.fetchOffset(), | |
| fetchPartition.logStartOffset(), | |
| fetchPartition.partitionMaxBytes(), | |
| optionalEpoch(fetchPartition.currentLeaderEpoch()), | |
| optionalEpoch(fetchPartition.lastFetchedEpoch()) | |
| ) | |
| ) | |
| ); | |
| }); | |
| return fetchData; | |
| } |
Its type is
LinkedHashMap<TopicIdPartition, PartitionData>, so there will be no duplicate TPs.
|
|
||
| // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation | ||
| val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => new TopicPartitionOperationKey(tp) }.toList | ||
| val delayedFetchKeys = fetchPartitionStatus.asScala.map { case (tp, _) => new TopicPartitionOperationKey(tp) }.toList |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we convert to a Scala List if we convert to a Java collection right after? We should probably use Java's Stream here and avoid the Scala collections altogether.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching this! I've fixed it. PTAL when you get a chance.
core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
@DL1231 please rebase code to include latest fixes |
| val delayedFetchKeys = fetchPartitionStatus.keySet() | ||
| .stream() | ||
| .map(new TopicPartitionOperationKey(_)) | ||
| .collect(Collectors.toList[TopicPartitionOperationKey]()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can use the Stream.toList() directly (no need for collect). This method was added to Stream in Java 16.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@DL1231 thanks for updates
| } | ||
| }.toBuffer | ||
|
|
||
| val logReadResults = replicaManager.readFromLog( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps readFromLog could accept a util.Iterator instead of a Seq. This would help avoid creating an extra collection in this hot operation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Iterators can be exhausted and hence are a bit more brittle. I would only use them as a method parameter if there is a meaningful difference in performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fair enough
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
I plan to merge this later, assuming there are not future comments
see #19876 (comment)
Initialize
fetchPartitionStatusas aMaptype to reduce unnecessarycollection conversions.
Reviewers: Ismael Juma ismael@juma.me.uk, Kamal Chandraprakash
kamal.chandraprakash@gmail.com, Chia-Ping Tsai chia7712@gmail.com