Skip to content

Commit f66095b

Browse files
authored
Propagate SplitShardCountSummary to ESQL data nodes (#137773)
1 parent 6b63b13 commit f66095b

File tree

22 files changed

+344
-174
lines changed

22 files changed

+344
-174
lines changed

server/src/main/java/org/elasticsearch/action/fieldcaps/RequestDispatcher.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.elasticsearch.cluster.node.DiscoveryNode;
2222
import org.elasticsearch.cluster.project.ProjectResolver;
2323
import org.elasticsearch.cluster.routing.SearchShardRouting;
24-
import org.elasticsearch.cluster.routing.ShardIterator;
2524
import org.elasticsearch.cluster.routing.ShardRouting;
2625
import org.elasticsearch.cluster.service.ClusterService;
2726
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@@ -271,18 +270,24 @@ private static class IndexSelector {
271270

272271
IndexSelector(
273272
String clusterAlias,
274-
List<SearchShardRouting> shardIts,
273+
List<SearchShardRouting> shards,
275274
QueryBuilder indexFilter,
276275
long nowInMillis,
277276
CoordinatorRewriteContextProvider coordinatorRewriteContextProvider
278277
) {
279-
for (ShardIterator shardIt : shardIts) {
278+
for (SearchShardRouting searchShardRouting : shards) {
280279
boolean canMatch = true;
281-
final ShardId shardId = shardIt.shardId();
280+
final ShardId shardId = searchShardRouting.shardId();
282281
if (indexFilter != null && indexFilter instanceof MatchAllQueryBuilder == false) {
283282
var coordinatorRewriteContext = coordinatorRewriteContextProvider.getCoordinatorRewriteContext(shardId.getIndex());
284283
if (coordinatorRewriteContext != null) {
285-
var shardRequest = new ShardSearchRequest(shardId, nowInMillis, AliasFilter.EMPTY, clusterAlias);
284+
var shardRequest = new ShardSearchRequest(
285+
shardId,
286+
nowInMillis,
287+
AliasFilter.EMPTY,
288+
clusterAlias,
289+
searchShardRouting.reshardSplitShardCountSummary()
290+
);
286291
shardRequest.source(new SearchSourceBuilder().query(indexFilter));
287292
try {
288293
canMatch = SearchService.queryStillMatchesAfterRewrite(shardRequest, coordinatorRewriteContext);
@@ -292,7 +297,7 @@ private static class IndexSelector {
292297
}
293298
}
294299
if (canMatch) {
295-
for (ShardRouting shard : shardIt) {
300+
for (ShardRouting shard : searchShardRouting) {
296301
nodeToShards.computeIfAbsent(shard.currentNodeId(), node -> new ArrayList<>()).add(shard);
297302
}
298303
} else {

server/src/main/java/org/elasticsearch/action/search/CanMatchNodeRequest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ public Shard(StreamInput in) throws IOException {
102102
waitForCheckpoint = in.readLong();
103103
assert keepAlive == null || readerId != null : "readerId: " + readerId + " keepAlive: " + keepAlive;
104104
if (in.getTransportVersion().supports(ShardSearchRequest.SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)) {
105-
reshardSplitShardCountSummary = SplitShardCountSummary.fromInt(in.readVInt());
105+
reshardSplitShardCountSummary = new SplitShardCountSummary(in);
106106
} else {
107107
reshardSplitShardCountSummary = SplitShardCountSummary.UNSET;
108108
}
@@ -119,7 +119,7 @@ public void writeTo(StreamOutput out) throws IOException {
119119
out.writeOptionalTimeValue(keepAlive);
120120
out.writeLong(waitForCheckpoint);
121121
if (out.getTransportVersion().supports(ShardSearchRequest.SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)) {
122-
out.writeVInt(reshardSplitShardCountSummary.asInt());
122+
reshardSplitShardCountSummary.writeTo(out);
123123
}
124124
}
125125

server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,7 @@ static ShardToQuery readFrom(StreamInput in) throws IOException {
403403
new ShardId(in),
404404
in.readOptionalWriteable(ShardSearchContextId::new),
405405
in.getTransportVersion().supports(ShardSearchRequest.SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)
406-
? SplitShardCountSummary.fromInt(in.readVInt())
406+
? new SplitShardCountSummary(in)
407407
: SplitShardCountSummary.UNSET
408408
);
409409
}
@@ -416,7 +416,7 @@ public void writeTo(StreamOutput out) throws IOException {
416416
shardId.writeTo(out);
417417
out.writeOptionalWriteable(contextId);
418418
if (out.getTransportVersion().supports(ShardSearchRequest.SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)) {
419-
out.writeVInt(reshardSplitShardCountSummary.asInt());
419+
reshardSplitShardCountSummary.writeTo(out);
420420
}
421421
}
422422
}

server/src/main/java/org/elasticsearch/action/search/SearchShardsGroup.java

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@
1111

1212
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
1313
import org.elasticsearch.cluster.routing.ShardRouting;
14+
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
1415
import org.elasticsearch.common.io.stream.StreamInput;
1516
import org.elasticsearch.common.io.stream.StreamOutput;
1617
import org.elasticsearch.common.io.stream.Writeable;
18+
import org.elasticsearch.index.IndexReshardService;
1719
import org.elasticsearch.index.shard.ShardId;
1820

1921
import java.io.IOException;
@@ -29,12 +31,19 @@ public class SearchShardsGroup implements Writeable {
2931
private final ShardId shardId;
3032
private final List<String> allocatedNodes;
3133
private final boolean skipped;
34+
private final SplitShardCountSummary reshardSplitShardCountSummary;
3235
private final transient boolean preFiltered;
3336

34-
public SearchShardsGroup(ShardId shardId, List<String> allocatedNodes, boolean skipped) {
37+
public SearchShardsGroup(
38+
ShardId shardId,
39+
List<String> allocatedNodes,
40+
boolean skipped,
41+
SplitShardCountSummary reshardSplitShardCountSummary
42+
) {
3543
this.shardId = shardId;
3644
this.allocatedNodes = allocatedNodes;
3745
this.skipped = skipped;
46+
this.reshardSplitShardCountSummary = reshardSplitShardCountSummary;
3847
this.preFiltered = true;
3948
}
4049

@@ -45,13 +54,20 @@ public SearchShardsGroup(ShardId shardId, List<String> allocatedNodes, boolean s
4554
this.shardId = oldGroup.getShardId();
4655
this.allocatedNodes = Arrays.stream(oldGroup.getShards()).map(ShardRouting::currentNodeId).toList();
4756
this.skipped = false;
57+
// This value is specific to resharding feature and this code path is specific to CCS
58+
// involving 8.x remote cluster.
59+
// We don't currently expect resharding to be used in such conditions so it's unset.
60+
this.reshardSplitShardCountSummary = SplitShardCountSummary.UNSET;
4861
this.preFiltered = false;
4962
}
5063

5164
public SearchShardsGroup(StreamInput in) throws IOException {
5265
this.shardId = new ShardId(in);
5366
this.allocatedNodes = in.readStringCollectionAsList();
5467
this.skipped = in.readBoolean();
68+
this.reshardSplitShardCountSummary = in.getTransportVersion().supports(IndexReshardService.RESHARDING_SHARD_SUMMARY_IN_ESQL)
69+
? SplitShardCountSummary.fromInt(in.readVInt())
70+
: SplitShardCountSummary.UNSET;
5571
this.preFiltered = true;
5672
}
5773

@@ -64,6 +80,9 @@ public void writeTo(StreamOutput out) throws IOException {
6480
shardId.writeTo(out);
6581
out.writeStringCollection(allocatedNodes);
6682
out.writeBoolean(skipped);
83+
if (out.getTransportVersion().supports(IndexReshardService.RESHARDING_SHARD_SUMMARY_IN_ESQL)) {
84+
reshardSplitShardCountSummary.writeTo(out);
85+
}
6786
}
6887

6988
public ShardId shardId() {
@@ -92,20 +111,24 @@ public List<String> allocatedNodes() {
92111
return allocatedNodes;
93112
}
94113

114+
public SplitShardCountSummary reshardSplitShardCountSummary() {
115+
return reshardSplitShardCountSummary;
116+
}
117+
95118
@Override
96119
public boolean equals(Object o) {
97-
if (this == o) return true;
98120
if (o == null || getClass() != o.getClass()) return false;
99-
SearchShardsGroup group = (SearchShardsGroup) o;
100-
return skipped == group.skipped
101-
&& preFiltered == group.preFiltered
102-
&& shardId.equals(group.shardId)
103-
&& allocatedNodes.equals(group.allocatedNodes);
121+
SearchShardsGroup that = (SearchShardsGroup) o;
122+
return skipped == that.skipped
123+
&& preFiltered == that.preFiltered
124+
&& Objects.equals(shardId, that.shardId)
125+
&& Objects.equals(allocatedNodes, that.allocatedNodes)
126+
&& Objects.equals(reshardSplitShardCountSummary, that.reshardSplitShardCountSummary);
104127
}
105128

106129
@Override
107130
public int hashCode() {
108-
return Objects.hash(shardId, allocatedNodes, skipped, preFiltered);
131+
return Objects.hash(shardId, allocatedNodes, skipped, reshardSplitShardCountSummary, preFiltered);
109132
}
110133

111134
@Override

server/src/main/java/org/elasticsearch/action/search/TransportSearchShardsAction.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,14 @@ public void searchShards(Task task, SearchShardsRequest searchShardsRequest, Act
210210
private static List<SearchShardsGroup> toGroups(List<SearchShardIterator> shardIts) {
211211
List<SearchShardsGroup> groups = new ArrayList<>(shardIts.size());
212212
for (SearchShardIterator shardIt : shardIts) {
213-
groups.add(new SearchShardsGroup(shardIt.shardId(), shardIt.getTargetNodeIds(), shardIt.skip()));
213+
groups.add(
214+
new SearchShardsGroup(
215+
shardIt.shardId(),
216+
shardIt.getTargetNodeIds(),
217+
shardIt.skip(),
218+
shardIt.getReshardSplitShardCountSummary()
219+
)
220+
);
214221
}
215222
return groups;
216223
}

server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ public ReplicationRequest(@Nullable ShardId shardId, SplitShardCountSummary resh
102102
this.reshardSplitShardCountSummary = reshardSplitShardCountSummary;
103103
} else {
104104
if (in.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SMALL)) {
105-
this.reshardSplitShardCountSummary = SplitShardCountSummary.fromInt(in.readVInt());
105+
this.reshardSplitShardCountSummary = new SplitShardCountSummary(in);
106106
} else if (in.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SUMMARY)) {
107107
this.reshardSplitShardCountSummary = SplitShardCountSummary.fromInt(in.readInt());
108108
} else {
@@ -237,7 +237,7 @@ public void writeTo(StreamOutput out) throws IOException {
237237
out.writeString(index);
238238
out.writeVLong(routedBasedOnClusterVersion);
239239
if (out.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SMALL)) {
240-
out.writeVInt(reshardSplitShardCountSummary.asInt());
240+
reshardSplitShardCountSummary.writeTo(out);
241241
} else if (out.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SUMMARY)) {
242242
out.writeInt(reshardSplitShardCountSummary.asInt());
243243
}

server/src/main/java/org/elasticsearch/cluster/routing/SplitShardCountSummary.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,11 @@
1212
import org.elasticsearch.cluster.metadata.IndexMetadata;
1313
import org.elasticsearch.cluster.metadata.IndexReshardingMetadata;
1414
import org.elasticsearch.cluster.metadata.IndexReshardingState;
15+
import org.elasticsearch.common.io.stream.StreamInput;
16+
import org.elasticsearch.common.io.stream.StreamOutput;
17+
import org.elasticsearch.common.io.stream.Writeable;
18+
19+
import java.io.IOException;
1520

1621
/**
1722
* The SplitShardCountSummary has been added to accommodate in-place index resharding.
@@ -84,7 +89,7 @@
8489
* will be treated as a Summary mismatch on the source shard node.
8590
*/
8691

87-
public class SplitShardCountSummary {
92+
public class SplitShardCountSummary implements Writeable {
8893
public static final SplitShardCountSummary UNSET = new SplitShardCountSummary(0);
8994

9095
/**
@@ -161,14 +166,21 @@ private static SplitShardCountSummary getReshardSplitShardCountSummary(
161166

162167
/**
163168
* Construct a SplitShardCountSummary from an integer
164-
* Used for deserialization.
169+
* Used for deserialization in versions that use int instead of vInt for serialization.
165170
*/
166171
public static SplitShardCountSummary fromInt(int payload) {
167172
return new SplitShardCountSummary(payload);
168173
}
169174

170175
private final int shardCountSummary;
171176

177+
/**
178+
* Deserialize a SplitShardCountSummary using a canonical vInt-based serialization protocol.
179+
*/
180+
public SplitShardCountSummary(StreamInput in) throws IOException {
181+
this.shardCountSummary = in.readVInt();
182+
}
183+
172184
/**
173185
* Return an integer representation of this summary
174186
* Used for serialization.
@@ -189,6 +201,14 @@ public boolean isUnset() {
189201
this.shardCountSummary = shardCountSummary;
190202
}
191203

204+
/**
205+
* Serializes a SplitShardCountSummary using a canonical vInt-based serialization protocol.
206+
*/
207+
@Override
208+
public void writeTo(StreamOutput out) throws IOException {
209+
out.writeVInt(shardCountSummary);
210+
}
211+
192212
@Override
193213
public boolean equals(Object other) {
194214
if (this == other) {

server/src/main/java/org/elasticsearch/index/IndexReshardService.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.index;
1111

12+
import org.elasticsearch.TransportVersion;
1213
import org.elasticsearch.cluster.ProjectState;
1314
import org.elasticsearch.cluster.metadata.IndexMetadata;
1415

@@ -20,6 +21,8 @@
2021
* needed by other services.
2122
*/
2223
public class IndexReshardService {
24+
public static TransportVersion RESHARDING_SHARD_SUMMARY_IN_ESQL = TransportVersion.fromName("resharding_shard_summary_in_esql");
25+
2326
/**
2427
* Returns the indices from the provided set that are currently being resharded.
2528
*/

server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -196,11 +196,18 @@ public static long computeWaitForCheckpoint(Map<String, long[]> indexToWaitForCh
196196

197197
// Used by ValidateQueryAction, ExplainAction, FieldCaps, TermsEnumAction, lookup join in ESQL
198198
public ShardSearchRequest(ShardId shardId, long nowInMillis, AliasFilter aliasFilter) {
199-
this(shardId, nowInMillis, aliasFilter, null);
199+
// TODO fix SplitShardCountSummary
200+
this(shardId, nowInMillis, aliasFilter, null, SplitShardCountSummary.UNSET);
200201
}
201202

202203
// Used by ESQL and field_caps API
203-
public ShardSearchRequest(ShardId shardId, long nowInMillis, AliasFilter aliasFilter, String clusterAlias) {
204+
public ShardSearchRequest(
205+
ShardId shardId,
206+
long nowInMillis,
207+
AliasFilter aliasFilter,
208+
String clusterAlias,
209+
SplitShardCountSummary reshardSplitShardCountSummary
210+
) {
204211
this(
205212
OriginalIndices.NONE,
206213
shardId,
@@ -220,10 +227,7 @@ public ShardSearchRequest(ShardId shardId, long nowInMillis, AliasFilter aliasFi
220227
SequenceNumbers.UNASSIGNED_SEQ_NO,
221228
SearchService.NO_TIMEOUT,
222229
false,
223-
// This parameter is specific to the resharding feature.
224-
// TODO
225-
// It is currently only supported in _search API and is stubbed here as a result.
226-
SplitShardCountSummary.UNSET
230+
reshardSplitShardCountSummary
227231
);
228232
}
229233

@@ -363,7 +367,7 @@ public ShardSearchRequest(StreamInput in) throws IOException {
363367
forceSyntheticSource = false;
364368
}
365369
if (in.getTransportVersion().supports(SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)) {
366-
reshardSplitShardCountSummary = SplitShardCountSummary.fromInt(in.readVInt());
370+
reshardSplitShardCountSummary = new SplitShardCountSummary(in);
367371
} else {
368372
reshardSplitShardCountSummary = SplitShardCountSummary.UNSET;
369373
}
@@ -429,7 +433,7 @@ protected final void innerWriteTo(StreamOutput out, boolean asKey) throws IOExce
429433
}
430434
}
431435
if (out.getTransportVersion().supports(SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)) {
432-
out.writeVInt(reshardSplitShardCountSummary.asInt());
436+
reshardSplitShardCountSummary.writeTo(out);
433437
}
434438
}
435439

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
9217000

0 commit comments

Comments
 (0)