Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.routing.SearchShardRouting;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
Expand Down Expand Up @@ -271,18 +270,24 @@ private static class IndexSelector {

IndexSelector(
String clusterAlias,
List<SearchShardRouting> shardIts,
List<SearchShardRouting> shards,
QueryBuilder indexFilter,
long nowInMillis,
CoordinatorRewriteContextProvider coordinatorRewriteContextProvider
) {
for (ShardIterator shardIt : shardIts) {
for (SearchShardRouting searchShardRouting : shards) {
boolean canMatch = true;
final ShardId shardId = shardIt.shardId();
final ShardId shardId = searchShardRouting.shardId();
if (indexFilter != null && indexFilter instanceof MatchAllQueryBuilder == false) {
var coordinatorRewriteContext = coordinatorRewriteContextProvider.getCoordinatorRewriteContext(shardId.getIndex());
if (coordinatorRewriteContext != null) {
var shardRequest = new ShardSearchRequest(shardId, nowInMillis, AliasFilter.EMPTY, clusterAlias);
var shardRequest = new ShardSearchRequest(
shardId,
nowInMillis,
AliasFilter.EMPTY,
clusterAlias,
searchShardRouting.reshardSplitShardCountSummary()
);
shardRequest.source(new SearchSourceBuilder().query(indexFilter));
try {
canMatch = SearchService.queryStillMatchesAfterRewrite(shardRequest, coordinatorRewriteContext);
Expand All @@ -292,7 +297,7 @@ private static class IndexSelector {
}
}
if (canMatch) {
for (ShardRouting shard : shardIt) {
for (ShardRouting shard : searchShardRouting) {
nodeToShards.computeIfAbsent(shard.currentNodeId(), node -> new ArrayList<>()).add(shard);
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public Shard(StreamInput in) throws IOException {
waitForCheckpoint = in.readLong();
assert keepAlive == null || readerId != null : "readerId: " + readerId + " keepAlive: " + keepAlive;
if (in.getTransportVersion().supports(ShardSearchRequest.SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)) {
reshardSplitShardCountSummary = SplitShardCountSummary.fromInt(in.readVInt());
reshardSplitShardCountSummary = new SplitShardCountSummary(in);
} else {
reshardSplitShardCountSummary = SplitShardCountSummary.UNSET;
}
Expand All @@ -119,7 +119,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalTimeValue(keepAlive);
out.writeLong(waitForCheckpoint);
if (out.getTransportVersion().supports(ShardSearchRequest.SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)) {
out.writeVInt(reshardSplitShardCountSummary.asInt());
reshardSplitShardCountSummary.writeTo(out);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ static ShardToQuery readFrom(StreamInput in) throws IOException {
new ShardId(in),
in.readOptionalWriteable(ShardSearchContextId::new),
in.getTransportVersion().supports(ShardSearchRequest.SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)
? SplitShardCountSummary.fromInt(in.readVInt())
? new SplitShardCountSummary(in)
: SplitShardCountSummary.UNSET
);
}
Expand All @@ -416,7 +416,7 @@ public void writeTo(StreamOutput out) throws IOException {
shardId.writeTo(out);
out.writeOptionalWriteable(contextId);
if (out.getTransportVersion().supports(ShardSearchRequest.SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)) {
out.writeVInt(reshardSplitShardCountSummary.asInt());
reshardSplitShardCountSummary.writeTo(out);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@

import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.index.IndexReshardService;
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;
Expand All @@ -29,12 +31,19 @@ public class SearchShardsGroup implements Writeable {
private final ShardId shardId;
private final List<String> allocatedNodes;
private final boolean skipped;
private final SplitShardCountSummary reshardSplitShardCountSummary;
private final transient boolean preFiltered;

public SearchShardsGroup(ShardId shardId, List<String> allocatedNodes, boolean skipped) {
public SearchShardsGroup(
ShardId shardId,
List<String> allocatedNodes,
boolean skipped,
SplitShardCountSummary reshardSplitShardCountSummary
) {
this.shardId = shardId;
this.allocatedNodes = allocatedNodes;
this.skipped = skipped;
this.reshardSplitShardCountSummary = reshardSplitShardCountSummary;
this.preFiltered = true;
}

Expand All @@ -45,13 +54,20 @@ public SearchShardsGroup(ShardId shardId, List<String> allocatedNodes, boolean s
this.shardId = oldGroup.getShardId();
this.allocatedNodes = Arrays.stream(oldGroup.getShards()).map(ShardRouting::currentNodeId).toList();
this.skipped = false;
// This value is specific to resharding feature and this code path is specific to CCS
// involving 8.x remote cluster.
// We don't currently expect resharding to be used in such conditions so it's unset.
this.reshardSplitShardCountSummary = SplitShardCountSummary.UNSET;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can open a PR to remove this BWC after your PR, as it is no longer needed in 9.x.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh nice, we don't allow CCS between majors?

this.preFiltered = false;
}

public SearchShardsGroup(StreamInput in) throws IOException {
this.shardId = new ShardId(in);
this.allocatedNodes = in.readStringCollectionAsList();
this.skipped = in.readBoolean();
this.reshardSplitShardCountSummary = in.getTransportVersion().supports(IndexReshardService.RESHARDING_SHARD_SUMMARY_IN_ESQL)
? SplitShardCountSummary.fromInt(in.readVInt())
: SplitShardCountSummary.UNSET;
this.preFiltered = true;
}

Expand All @@ -64,6 +80,9 @@ public void writeTo(StreamOutput out) throws IOException {
shardId.writeTo(out);
out.writeStringCollection(allocatedNodes);
out.writeBoolean(skipped);
if (out.getTransportVersion().supports(IndexReshardService.RESHARDING_SHARD_SUMMARY_IN_ESQL)) {
reshardSplitShardCountSummary.writeTo(out);
}
}

public ShardId shardId() {
Expand Down Expand Up @@ -92,20 +111,24 @@ public List<String> allocatedNodes() {
return allocatedNodes;
}

public SplitShardCountSummary reshardSplitShardCountSummary() {
return reshardSplitShardCountSummary;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SearchShardsGroup group = (SearchShardsGroup) o;
return skipped == group.skipped
&& preFiltered == group.preFiltered
&& shardId.equals(group.shardId)
&& allocatedNodes.equals(group.allocatedNodes);
SearchShardsGroup that = (SearchShardsGroup) o;
return skipped == that.skipped
&& preFiltered == that.preFiltered
&& Objects.equals(shardId, that.shardId)
&& Objects.equals(allocatedNodes, that.allocatedNodes)
&& Objects.equals(reshardSplitShardCountSummary, that.reshardSplitShardCountSummary);
}

@Override
public int hashCode() {
return Objects.hash(shardId, allocatedNodes, skipped, preFiltered);
return Objects.hash(shardId, allocatedNodes, skipped, reshardSplitShardCountSummary, preFiltered);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,14 @@ public void searchShards(Task task, SearchShardsRequest searchShardsRequest, Act
private static List<SearchShardsGroup> toGroups(List<SearchShardIterator> shardIts) {
List<SearchShardsGroup> groups = new ArrayList<>(shardIts.size());
for (SearchShardIterator shardIt : shardIts) {
groups.add(new SearchShardsGroup(shardIt.shardId(), shardIt.getTargetNodeIds(), shardIt.skip()));
groups.add(
new SearchShardsGroup(
shardIt.shardId(),
shardIt.getTargetNodeIds(),
shardIt.skip(),
shardIt.getReshardSplitShardCountSummary()
)
);
}
return groups;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public ReplicationRequest(@Nullable ShardId shardId, SplitShardCountSummary resh
this.reshardSplitShardCountSummary = reshardSplitShardCountSummary;
} else {
if (in.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SMALL)) {
this.reshardSplitShardCountSummary = SplitShardCountSummary.fromInt(in.readVInt());
this.reshardSplitShardCountSummary = new SplitShardCountSummary(in);
} else if (in.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SUMMARY)) {
this.reshardSplitShardCountSummary = SplitShardCountSummary.fromInt(in.readInt());
} else {
Expand Down Expand Up @@ -237,7 +237,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(index);
out.writeVLong(routedBasedOnClusterVersion);
if (out.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SMALL)) {
out.writeVInt(reshardSplitShardCountSummary.asInt());
reshardSplitShardCountSummary.writeTo(out);
} else if (out.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SUMMARY)) {
out.writeInt(reshardSplitShardCountSummary.asInt());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexReshardingMetadata;
import org.elasticsearch.cluster.metadata.IndexReshardingState;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;

import java.io.IOException;

/**
* The SplitShardCountSummary has been added to accommodate in-place index resharding.
Expand Down Expand Up @@ -84,7 +89,7 @@
* will be treated as a Summary mismatch on the source shard node.
*/

public class SplitShardCountSummary {
public class SplitShardCountSummary implements Writeable {
public static final SplitShardCountSummary UNSET = new SplitShardCountSummary(0);

/**
Expand Down Expand Up @@ -161,14 +166,21 @@ private static SplitShardCountSummary getReshardSplitShardCountSummary(

/**
* Construct a SplitShardCountSummary from an integer
* Used for deserialization.
* Used for deserialization in versions that use int instead of vInt for serialization.
*/
public static SplitShardCountSummary fromInt(int payload) {
return new SplitShardCountSummary(payload);
}

private final int shardCountSummary;

/**
* Deserialize a SplitShardCountSummary using a canonical vInt-based serialization protocol.
*/
public SplitShardCountSummary(StreamInput in) throws IOException {
this.shardCountSummary = in.readVInt();
}

/**
* Return an integer representation of this summary
* Used for serialization.
Expand All @@ -189,6 +201,14 @@ public boolean isUnset() {
this.shardCountSummary = shardCountSummary;
}

/**
* Serializes a SplitShardCountSummary using a canonical vInt-based serialization protocol.
*/
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(shardCountSummary);
}

@Override
public boolean equals(Object other) {
if (this == other) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.index;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.metadata.IndexMetadata;

Expand All @@ -20,6 +21,8 @@
* needed by other services.
*/
public class IndexReshardService {
public static TransportVersion RESHARDING_SHARD_SUMMARY_IN_ESQL = TransportVersion.fromName("resharding_shard_summary_in_esql");

/**
* Returns the indices from the provided set that are currently being resharded.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,18 @@ public static long computeWaitForCheckpoint(Map<String, long[]> indexToWaitForCh

// Used by ValidateQueryAction, ExplainAction, FieldCaps, TermsEnumAction, lookup join in ESQL
Copy link
Contributor Author

@lkts lkts Nov 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As noted, additional handling may be needed for lookup join.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, resizing lookup indices is not allowed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RIght, thanks. I think i would like to still pass the summary if it's available in that context just to avoid inconsistency. It would be a follow up anyway.

public ShardSearchRequest(ShardId shardId, long nowInMillis, AliasFilter aliasFilter) {
this(shardId, nowInMillis, aliasFilter, null);
// TODO fix SplitShardCountSummary
this(shardId, nowInMillis, aliasFilter, null, SplitShardCountSummary.UNSET);
}

// Used by ESQL and field_caps API
public ShardSearchRequest(ShardId shardId, long nowInMillis, AliasFilter aliasFilter, String clusterAlias) {
public ShardSearchRequest(
ShardId shardId,
long nowInMillis,
AliasFilter aliasFilter,
String clusterAlias,
SplitShardCountSummary reshardSplitShardCountSummary
) {
this(
OriginalIndices.NONE,
shardId,
Expand All @@ -220,10 +227,7 @@ public ShardSearchRequest(ShardId shardId, long nowInMillis, AliasFilter aliasFi
SequenceNumbers.UNASSIGNED_SEQ_NO,
SearchService.NO_TIMEOUT,
false,
// This parameter is specific to the resharding feature.
// TODO
// It is currently only supported in _search API and is stubbed here as a result.
SplitShardCountSummary.UNSET
reshardSplitShardCountSummary
);
}

Expand Down Expand Up @@ -363,7 +367,7 @@ public ShardSearchRequest(StreamInput in) throws IOException {
forceSyntheticSource = false;
}
if (in.getTransportVersion().supports(SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)) {
reshardSplitShardCountSummary = SplitShardCountSummary.fromInt(in.readVInt());
reshardSplitShardCountSummary = new SplitShardCountSummary(in);
} else {
reshardSplitShardCountSummary = SplitShardCountSummary.UNSET;
}
Expand Down Expand Up @@ -429,7 +433,7 @@ protected final void innerWriteTo(StreamOutput out, boolean asKey) throws IOExce
}
}
if (out.getTransportVersion().supports(SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)) {
out.writeVInt(reshardSplitShardCountSummary.asInt());
reshardSplitShardCountSummary.writeTo(out);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
9217000
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/9.3.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
esql_execution_metadata,9216000
resharding_shard_summary_in_esql,9217000
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
Expand Down Expand Up @@ -72,7 +73,9 @@ protected SearchShardsResponse createTestInstance() {
for (int j = 0; j < numOfAllocatedNodes; j++) {
allocatedNodes.add(UUIDs.randomBase64UUID());
}
groups.add(new SearchShardsGroup(shardId, allocatedNodes, randomBoolean()));
groups.add(
new SearchShardsGroup(shardId, allocatedNodes, randomBoolean(), SplitShardCountSummary.fromInt(randomIntBetween(0, 1024)))
);
}
Map<String, AliasFilter> aliasFilters = new HashMap<>();
for (SearchShardsGroup g : groups) {
Expand All @@ -93,7 +96,9 @@ protected SearchShardsResponse mutateInstance(SearchShardsResponse r) throws IOE
case 0 -> {
List<SearchShardsGroup> groups = new ArrayList<>(r.getGroups());
ShardId shardId = new ShardId(randomAlphaOfLengthBetween(5, 10), UUIDs.randomBase64UUID(), randomInt(2));
groups.add(new SearchShardsGroup(shardId, List.of(), randomBoolean()));
groups.add(
new SearchShardsGroup(shardId, List.of(), randomBoolean(), SplitShardCountSummary.fromInt(randomIntBetween(0, 1024)))
);
return new SearchShardsResponse(groups, r.getNodes(), r.getAliasFilters());
}
case 1 -> {
Expand Down Expand Up @@ -143,12 +148,14 @@ public void testLegacyResponse() {
assertThat(group1.shardId(), equalTo(new ShardId("index-1", "uuid-1", 0)));
assertThat(group1.allocatedNodes(), equalTo(List.of("node-1", "node-2")));
assertFalse(group1.skipped());
assertThat(group1.reshardSplitShardCountSummary(), equalTo(SplitShardCountSummary.UNSET));
assertFalse(group1.preFiltered());

SearchShardsGroup group2 = Iterables.get(newResponse.getGroups(), 1);
assertThat(group2.shardId(), equalTo(new ShardId("index-2", "uuid-2", 7)));
assertThat(group2.allocatedNodes(), equalTo(List.of("node-1")));
assertFalse(group2.skipped());
assertThat(group2.reshardSplitShardCountSummary(), equalTo(SplitShardCountSummary.UNSET));
assertFalse(group2.preFiltered());

TransportVersion version = TransportVersionUtils.randomCompatibleVersion(random());
Expand Down
Loading