Skip to content

Commit 3e3f0c4

Browse files
KAFKA-19946: Simplify skipping of empty ShareFetch requests (#21033)
ShareFetch requests can be used to fetch records for share consumers, but in some cases, no records are required and the request is just being used to update the share session or send acknowledgements. As a result, there are situations in which a ShareFetch request would be built, only for it to be entirely empty (no fetch, no asks, no share session update). This PR simplifies the logic for detecting when the request is empty and then to avoid building it entirely. It also adds some tests for this case. Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
1 parent c9fa8c5 commit 3e3f0c4

File tree

4 files changed

+110
-50
lines changed

4 files changed

+110
-50
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -253,24 +253,20 @@ public PollResult poll(long currentTimeMs) {
253253
Node target = entry.getKey();
254254
ShareSessionHandler handler = entry.getValue();
255255

256-
log.trace("Building ShareFetch request to send to node {}", target.id());
257-
ShareFetchRequest.Builder requestBuilder = handler.newShareFetchBuilder(groupId, shareFetchConfig);
258-
259256
// For record_limit mode, we only send a full ShareFetch to a single node at a time.
260257
// We prepare to build ShareFetch requests for all nodes with session handlers to permit
261258
// piggy-backing of acknowledgements, and also to adjust the topic-partitions
262-
// in the share session.
263-
if (isShareAcquireModeRecordLimit() && target.id() != fetchRecordsNodeId.get()) {
264-
ShareFetchRequestData data = requestBuilder.data();
265-
// If there's nothing to send, just skip building the record.
266-
if (data.topics().isEmpty() && data.forgottenTopicsData().isEmpty()) {
267-
return null;
268-
} else {
269-
// There is something to send, but we don't want to fetch any records.
270-
requestBuilder.data().setMaxRecords(0);
271-
}
259+
// in the share session, but if the request would contain neither of those, it can be skipped.
260+
boolean canSkipIfRequestEmpty = isShareAcquireModeRecordLimit() && target.id() != fetchRecordsNodeId.get();
261+
262+
ShareFetchRequest.Builder requestBuilder = handler.newShareFetchBuilder(groupId, shareFetchConfig, canSkipIfRequestEmpty);
263+
if (requestBuilder == null) {
264+
log.trace("Skipping ShareFetch request to send to node {}", target.id());
265+
return null;
272266
}
273267

268+
log.trace("Building ShareFetch request to send to node {}", target.id());
269+
274270
nodesWithPendingRequests.add(target.id());
275271

276272
BiConsumer<ClientResponse, Throwable> responseHandler = (clientResponse, error) -> {

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
* <p>ShareSessionHandler tracks the partitions which are in the session. It also determines
5555
* which partitions need to be included in each ShareFetch/ShareAcknowledge request.
5656
*/
57+
@SuppressWarnings({"NPathComplexity", "CyclomaticComplexity"})
5758
public class ShareSessionHandler {
5859
private final Logger log;
5960
private final int node;
@@ -112,7 +113,7 @@ public boolean isNewSession() {
112113
return nextMetadata.isNewSession();
113114
}
114115

115-
public ShareFetchRequest.Builder newShareFetchBuilder(String groupId, ShareFetchConfig shareFetchConfig) {
116+
public ShareFetchRequest.Builder newShareFetchBuilder(String groupId, ShareFetchConfig shareFetchConfig, boolean canSkipIfRequestEmpty) {
116117
List<TopicIdPartition> added = new ArrayList<>();
117118
List<TopicIdPartition> removed = new ArrayList<>();
118119
List<TopicIdPartition> replaced = new ArrayList<>();
@@ -158,15 +159,6 @@ public ShareFetchRequest.Builder newShareFetchBuilder(String groupId, ShareFetch
158159
}
159160
}
160161

161-
if (log.isDebugEnabled()) {
162-
log.debug("Build ShareFetch {} for node {}. Added {}, removed {}, replaced {} out of {}",
163-
nextMetadata, node,
164-
topicIdPartitionsToLogString(added),
165-
topicIdPartitionsToLogString(removed),
166-
topicIdPartitionsToLogString(replaced),
167-
topicIdPartitionsToLogString(sessionPartitions.values()));
168-
}
169-
170162
// The replaced topic-partitions need to be removed, and their replacements are already added
171163
removed.addAll(replaced);
172164

@@ -187,6 +179,19 @@ public ShareFetchRequest.Builder newShareFetchBuilder(String groupId, ShareFetch
187179
nextPartitions = new LinkedHashMap<>();
188180
nextAcknowledgements = new LinkedHashMap<>();
189181

182+
if (canSkipIfRequestEmpty && added.isEmpty() && removed.isEmpty() && acknowledgementBatches.isEmpty()) {
183+
return null;
184+
}
185+
186+
if (log.isDebugEnabled()) {
187+
log.debug("Build ShareFetch {} for node {}. Added {}, removed {}, replaced {} out of {}",
188+
nextMetadata, node,
189+
topicIdPartitionsToLogString(added),
190+
topicIdPartitionsToLogString(removed),
191+
topicIdPartitionsToLogString(replaced),
192+
topicIdPartitionsToLogString(sessionPartitions.values()));
193+
}
194+
190195
if (hasRenewAcknowledgements) {
191196
// If the request has renew acknowledgements, the ShareFetch is only used to send the acknowledgements
192197
// and potentially update the share session. The parameters for wait time, number of bytes and number of
@@ -196,6 +201,14 @@ public ShareFetchRequest.Builder newShareFetchBuilder(String groupId, ShareFetch
196201
0, 0, 0,
197202
0, shareFetchConfig.shareAcquireMode.id, true,
198203
added, removed, acknowledgementBatches);
204+
} else if (canSkipIfRequestEmpty) {
205+
// The request contains changes to the share session or acknowledgements only. The parameters for wait time,
206+
// number of bytes and number of records are all zero.
207+
return ShareFetchRequest.Builder.forConsumer(
208+
groupId, nextMetadata, 0,
209+
0, 0, 0,
210+
0, shareFetchConfig.shareAcquireMode.id, false,
211+
added, removed, acknowledgementBatches);
199212
} else {
200213
return ShareFetchRequest.Builder.forConsumer(
201214
groupId, nextMetadata, shareFetchConfig.maxWaitMs,

0 commit comments

Comments
 (0)