diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/RequestDispatcher.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/RequestDispatcher.java index 577804022144e..b504345792a95 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/RequestDispatcher.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/RequestDispatcher.java @@ -21,7 +21,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.routing.SearchShardRouting; -import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -271,18 +270,24 @@ private static class IndexSelector { IndexSelector( String clusterAlias, - List shardIts, + List shards, QueryBuilder indexFilter, long nowInMillis, CoordinatorRewriteContextProvider coordinatorRewriteContextProvider ) { - for (ShardIterator shardIt : shardIts) { + for (SearchShardRouting searchShardRouting : shards) { boolean canMatch = true; - final ShardId shardId = shardIt.shardId(); + final ShardId shardId = searchShardRouting.shardId(); if (indexFilter != null && indexFilter instanceof MatchAllQueryBuilder == false) { var coordinatorRewriteContext = coordinatorRewriteContextProvider.getCoordinatorRewriteContext(shardId.getIndex()); if (coordinatorRewriteContext != null) { - var shardRequest = new ShardSearchRequest(shardId, nowInMillis, AliasFilter.EMPTY, clusterAlias); + var shardRequest = new ShardSearchRequest( + shardId, + nowInMillis, + AliasFilter.EMPTY, + clusterAlias, + searchShardRouting.reshardSplitShardCountSummary() + ); shardRequest.source(new SearchSourceBuilder().query(indexFilter)); try { canMatch = SearchService.queryStillMatchesAfterRewrite(shardRequest, coordinatorRewriteContext); @@ -292,7 +297,7 @@ private static class IndexSelector { } } if (canMatch) { - for (ShardRouting shard : shardIt) { + for (ShardRouting shard : searchShardRouting) { nodeToShards.computeIfAbsent(shard.currentNodeId(), node -> new ArrayList<>()).add(shard); } } else { diff --git a/server/src/main/java/org/elasticsearch/action/search/CanMatchNodeRequest.java b/server/src/main/java/org/elasticsearch/action/search/CanMatchNodeRequest.java index 2aa3c84ea3184..4b09ea6d6d0e7 100644 --- a/server/src/main/java/org/elasticsearch/action/search/CanMatchNodeRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/CanMatchNodeRequest.java @@ -102,7 +102,7 @@ public Shard(StreamInput in) throws IOException { waitForCheckpoint = in.readLong(); assert keepAlive == null || readerId != null : "readerId: " + readerId + " keepAlive: " + keepAlive; if (in.getTransportVersion().supports(ShardSearchRequest.SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)) { - reshardSplitShardCountSummary = SplitShardCountSummary.fromInt(in.readVInt()); + reshardSplitShardCountSummary = new SplitShardCountSummary(in); } else { reshardSplitShardCountSummary = SplitShardCountSummary.UNSET; } @@ -119,7 +119,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalTimeValue(keepAlive); out.writeLong(waitForCheckpoint); if (out.getTransportVersion().supports(ShardSearchRequest.SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)) { - out.writeVInt(reshardSplitShardCountSummary.asInt()); + reshardSplitShardCountSummary.writeTo(out); } } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 26b5b9a5d9dd2..af3aa18373c1e 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -403,7 +403,7 @@ static ShardToQuery readFrom(StreamInput in) throws IOException { new ShardId(in), in.readOptionalWriteable(ShardSearchContextId::new), in.getTransportVersion().supports(ShardSearchRequest.SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY) - ? SplitShardCountSummary.fromInt(in.readVInt()) + ? new SplitShardCountSummary(in) : SplitShardCountSummary.UNSET ); } @@ -416,7 +416,7 @@ public void writeTo(StreamOutput out) throws IOException { shardId.writeTo(out); out.writeOptionalWriteable(contextId); if (out.getTransportVersion().supports(ShardSearchRequest.SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)) { - out.writeVInt(reshardSplitShardCountSummary.asInt()); + reshardSplitShardCountSummary.writeTo(out); } } } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchShardsGroup.java b/server/src/main/java/org/elasticsearch/action/search/SearchShardsGroup.java index bdaf7e29ae0bc..72fd17279e41f 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchShardsGroup.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchShardsGroup.java @@ -11,9 +11,11 @@ import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.index.IndexReshardService; import org.elasticsearch.index.shard.ShardId; import java.io.IOException; @@ -29,12 +31,19 @@ public class SearchShardsGroup implements Writeable { private final ShardId shardId; private final List allocatedNodes; private final boolean skipped; + private final SplitShardCountSummary reshardSplitShardCountSummary; private final transient boolean preFiltered; - public SearchShardsGroup(ShardId shardId, List allocatedNodes, boolean skipped) { + public SearchShardsGroup( + ShardId shardId, + List allocatedNodes, + boolean skipped, + SplitShardCountSummary reshardSplitShardCountSummary + ) { this.shardId = shardId; this.allocatedNodes = allocatedNodes; this.skipped = skipped; + this.reshardSplitShardCountSummary = reshardSplitShardCountSummary; this.preFiltered = true; } @@ -45,6 +54,10 @@ public SearchShardsGroup(ShardId shardId, List allocatedNodes, boolean s this.shardId = oldGroup.getShardId(); this.allocatedNodes = Arrays.stream(oldGroup.getShards()).map(ShardRouting::currentNodeId).toList(); this.skipped = false; + // This value is specific to resharding feature and this code path is specific to CCS + // involving 8.x remote cluster. + // We don't currently expect resharding to be used in such conditions so it's unset. + this.reshardSplitShardCountSummary = SplitShardCountSummary.UNSET; this.preFiltered = false; } @@ -52,6 +65,9 @@ public SearchShardsGroup(StreamInput in) throws IOException { this.shardId = new ShardId(in); this.allocatedNodes = in.readStringCollectionAsList(); this.skipped = in.readBoolean(); + this.reshardSplitShardCountSummary = in.getTransportVersion().supports(IndexReshardService.RESHARDING_SHARD_SUMMARY_IN_ESQL) + ? SplitShardCountSummary.fromInt(in.readVInt()) + : SplitShardCountSummary.UNSET; this.preFiltered = true; } @@ -64,6 +80,9 @@ public void writeTo(StreamOutput out) throws IOException { shardId.writeTo(out); out.writeStringCollection(allocatedNodes); out.writeBoolean(skipped); + if (out.getTransportVersion().supports(IndexReshardService.RESHARDING_SHARD_SUMMARY_IN_ESQL)) { + reshardSplitShardCountSummary.writeTo(out); + } } public ShardId shardId() { @@ -92,20 +111,24 @@ public List allocatedNodes() { return allocatedNodes; } + public SplitShardCountSummary reshardSplitShardCountSummary() { + return reshardSplitShardCountSummary; + } + @Override public boolean equals(Object o) { - if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - SearchShardsGroup group = (SearchShardsGroup) o; - return skipped == group.skipped - && preFiltered == group.preFiltered - && shardId.equals(group.shardId) - && allocatedNodes.equals(group.allocatedNodes); + SearchShardsGroup that = (SearchShardsGroup) o; + return skipped == that.skipped + && preFiltered == that.preFiltered + && Objects.equals(shardId, that.shardId) + && Objects.equals(allocatedNodes, that.allocatedNodes) + && Objects.equals(reshardSplitShardCountSummary, that.reshardSplitShardCountSummary); } @Override public int hashCode() { - return Objects.hash(shardId, allocatedNodes, skipped, preFiltered); + return Objects.hash(shardId, allocatedNodes, skipped, reshardSplitShardCountSummary, preFiltered); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchShardsAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchShardsAction.java index 93f820f11aca7..58192de86280b 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchShardsAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchShardsAction.java @@ -210,7 +210,14 @@ public void searchShards(Task task, SearchShardsRequest searchShardsRequest, Act private static List toGroups(List shardIts) { List groups = new ArrayList<>(shardIts.size()); for (SearchShardIterator shardIt : shardIts) { - groups.add(new SearchShardsGroup(shardIt.shardId(), shardIt.getTargetNodeIds(), shardIt.skip())); + groups.add( + new SearchShardsGroup( + shardIt.shardId(), + shardIt.getTargetNodeIds(), + shardIt.skip(), + shardIt.getReshardSplitShardCountSummary() + ) + ); } return groups; } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java index 2b89ddc31aa70..64d797a0e948a 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java @@ -102,7 +102,7 @@ public ReplicationRequest(@Nullable ShardId shardId, SplitShardCountSummary resh this.reshardSplitShardCountSummary = reshardSplitShardCountSummary; } else { if (in.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SMALL)) { - this.reshardSplitShardCountSummary = SplitShardCountSummary.fromInt(in.readVInt()); + this.reshardSplitShardCountSummary = new SplitShardCountSummary(in); } else if (in.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SUMMARY)) { this.reshardSplitShardCountSummary = SplitShardCountSummary.fromInt(in.readInt()); } else { @@ -237,7 +237,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(index); out.writeVLong(routedBasedOnClusterVersion); if (out.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SMALL)) { - out.writeVInt(reshardSplitShardCountSummary.asInt()); + reshardSplitShardCountSummary.writeTo(out); } else if (out.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SUMMARY)) { out.writeInt(reshardSplitShardCountSummary.asInt()); } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/SplitShardCountSummary.java b/server/src/main/java/org/elasticsearch/cluster/routing/SplitShardCountSummary.java index cc67e7c20e7da..ab35d102954d2 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/SplitShardCountSummary.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/SplitShardCountSummary.java @@ -12,6 +12,11 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexReshardingMetadata; import org.elasticsearch.cluster.metadata.IndexReshardingState; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; + +import java.io.IOException; /** * The SplitShardCountSummary has been added to accommodate in-place index resharding. @@ -84,7 +89,7 @@ * will be treated as a Summary mismatch on the source shard node. */ -public class SplitShardCountSummary { +public class SplitShardCountSummary implements Writeable { public static final SplitShardCountSummary UNSET = new SplitShardCountSummary(0); /** @@ -161,7 +166,7 @@ private static SplitShardCountSummary getReshardSplitShardCountSummary( /** * Construct a SplitShardCountSummary from an integer - * Used for deserialization. + * Used for deserialization in versions that use int instead of vInt for serialization. */ public static SplitShardCountSummary fromInt(int payload) { return new SplitShardCountSummary(payload); @@ -169,6 +174,13 @@ public static SplitShardCountSummary fromInt(int payload) { private final int shardCountSummary; + /** + * Deserialize a SplitShardCountSummary using a canonical vInt-based serialization protocol. + */ + public SplitShardCountSummary(StreamInput in) throws IOException { + this.shardCountSummary = in.readVInt(); + } + /** * Return an integer representation of this summary * Used for serialization. @@ -189,6 +201,14 @@ public boolean isUnset() { this.shardCountSummary = shardCountSummary; } + /** + * Serializes a SplitShardCountSummary using a canonical vInt-based serialization protocol. + */ + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(shardCountSummary); + } + @Override public boolean equals(Object other) { if (this == other) { diff --git a/server/src/main/java/org/elasticsearch/index/IndexReshardService.java b/server/src/main/java/org/elasticsearch/index/IndexReshardService.java index edcf1806b499d..657530548f6dd 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexReshardService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexReshardService.java @@ -9,6 +9,7 @@ package org.elasticsearch.index; +import org.elasticsearch.TransportVersion; import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.metadata.IndexMetadata; @@ -20,6 +21,8 @@ * needed by other services. */ public class IndexReshardService { + public static TransportVersion RESHARDING_SHARD_SUMMARY_IN_ESQL = TransportVersion.fromName("resharding_shard_summary_in_esql"); + /** * Returns the indices from the provided set that are currently being resharded. */ diff --git a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java index e10e95e955c6f..542d1cd32542b 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java @@ -196,11 +196,18 @@ public static long computeWaitForCheckpoint(Map indexToWaitForCh // Used by ValidateQueryAction, ExplainAction, FieldCaps, TermsEnumAction, lookup join in ESQL public ShardSearchRequest(ShardId shardId, long nowInMillis, AliasFilter aliasFilter) { - this(shardId, nowInMillis, aliasFilter, null); + // TODO fix SplitShardCountSummary + this(shardId, nowInMillis, aliasFilter, null, SplitShardCountSummary.UNSET); } // Used by ESQL and field_caps API - public ShardSearchRequest(ShardId shardId, long nowInMillis, AliasFilter aliasFilter, String clusterAlias) { + public ShardSearchRequest( + ShardId shardId, + long nowInMillis, + AliasFilter aliasFilter, + String clusterAlias, + SplitShardCountSummary reshardSplitShardCountSummary + ) { this( OriginalIndices.NONE, shardId, @@ -220,10 +227,7 @@ public ShardSearchRequest(ShardId shardId, long nowInMillis, AliasFilter aliasFi SequenceNumbers.UNASSIGNED_SEQ_NO, SearchService.NO_TIMEOUT, false, - // This parameter is specific to the resharding feature. - // TODO - // It is currently only supported in _search API and is stubbed here as a result. - SplitShardCountSummary.UNSET + reshardSplitShardCountSummary ); } @@ -363,7 +367,7 @@ public ShardSearchRequest(StreamInput in) throws IOException { forceSyntheticSource = false; } if (in.getTransportVersion().supports(SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)) { - reshardSplitShardCountSummary = SplitShardCountSummary.fromInt(in.readVInt()); + reshardSplitShardCountSummary = new SplitShardCountSummary(in); } else { reshardSplitShardCountSummary = SplitShardCountSummary.UNSET; } @@ -429,7 +433,7 @@ protected final void innerWriteTo(StreamOutput out, boolean asKey) throws IOExce } } if (out.getTransportVersion().supports(SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)) { - out.writeVInt(reshardSplitShardCountSummary.asInt()); + reshardSplitShardCountSummary.writeTo(out); } } diff --git a/server/src/main/resources/transport/definitions/referable/resharding_shard_summary_in_esql.csv b/server/src/main/resources/transport/definitions/referable/resharding_shard_summary_in_esql.csv new file mode 100644 index 0000000000000..fad15855ad264 --- /dev/null +++ b/server/src/main/resources/transport/definitions/referable/resharding_shard_summary_in_esql.csv @@ -0,0 +1 @@ +9217000 diff --git a/server/src/main/resources/transport/upper_bounds/9.3.csv b/server/src/main/resources/transport/upper_bounds/9.3.csv index 28e1d47db658d..b29f7625613b5 100644 --- a/server/src/main/resources/transport/upper_bounds/9.3.csv +++ b/server/src/main/resources/transport/upper_bounds/9.3.csv @@ -1 +1 @@ -esql_execution_metadata,9216000 +resharding_shard_summary_in_esql,9217000 diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchShardsResponseTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchShardsResponseTests.java index 5790d4b3064ed..bacd25cea4191 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchShardsResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchShardsResponseTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.io.stream.BytesStreamOutput; @@ -72,7 +73,9 @@ protected SearchShardsResponse createTestInstance() { for (int j = 0; j < numOfAllocatedNodes; j++) { allocatedNodes.add(UUIDs.randomBase64UUID()); } - groups.add(new SearchShardsGroup(shardId, allocatedNodes, randomBoolean())); + groups.add( + new SearchShardsGroup(shardId, allocatedNodes, randomBoolean(), SplitShardCountSummary.fromInt(randomIntBetween(0, 1024))) + ); } Map aliasFilters = new HashMap<>(); for (SearchShardsGroup g : groups) { @@ -93,7 +96,9 @@ protected SearchShardsResponse mutateInstance(SearchShardsResponse r) throws IOE case 0 -> { List groups = new ArrayList<>(r.getGroups()); ShardId shardId = new ShardId(randomAlphaOfLengthBetween(5, 10), UUIDs.randomBase64UUID(), randomInt(2)); - groups.add(new SearchShardsGroup(shardId, List.of(), randomBoolean())); + groups.add( + new SearchShardsGroup(shardId, List.of(), randomBoolean(), SplitShardCountSummary.fromInt(randomIntBetween(0, 1024))) + ); return new SearchShardsResponse(groups, r.getNodes(), r.getAliasFilters()); } case 1 -> { @@ -143,12 +148,14 @@ public void testLegacyResponse() { assertThat(group1.shardId(), equalTo(new ShardId("index-1", "uuid-1", 0))); assertThat(group1.allocatedNodes(), equalTo(List.of("node-1", "node-2"))); assertFalse(group1.skipped()); + assertThat(group1.reshardSplitShardCountSummary(), equalTo(SplitShardCountSummary.UNSET)); assertFalse(group1.preFiltered()); SearchShardsGroup group2 = Iterables.get(newResponse.getGroups(), 1); assertThat(group2.shardId(), equalTo(new ShardId("index-2", "uuid-2", 7))); assertThat(group2.allocatedNodes(), equalTo(List.of("node-1"))); assertFalse(group2.skipped()); + assertThat(group2.reshardSplitShardCountSummary(), equalTo(SplitShardCountSummary.UNSET)); assertFalse(group2.preFiltered()); TransportVersion version = TransportVersionUtils.randomCompatibleVersion(random()); diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index 48e607d1b3fba..57f3b3d67f585 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -275,9 +275,9 @@ public void testProcessRemoteShards() { AliasFilter.of(new MatchAllQueryBuilder(), Strings.EMPTY_ARRAY) ); List groups = List.of( - new SearchShardsGroup(new ShardId("foo", "foo_id", 0), List.of("node1", "node2"), false), - new SearchShardsGroup(new ShardId("foo", "foo_id", 1), List.of("node2", "node1"), true), - new SearchShardsGroup(new ShardId("bar", "bar_id", 0), List.of("node2", "node1"), false) + new SearchShardsGroup(new ShardId("foo", "foo_id", 0), List.of("node1", "node2"), false, SplitShardCountSummary.UNSET), + new SearchShardsGroup(new ShardId("foo", "foo_id", 1), List.of("node2", "node1"), true, SplitShardCountSummary.UNSET), + new SearchShardsGroup(new ShardId("bar", "bar_id", 0), List.of("node2", "node1"), false, SplitShardCountSummary.UNSET) ); searchShardsResponseMap.put("test_cluster_1", new SearchShardsResponse(groups, nodes, aliasFilters1)); } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java index f2fdb24e88dc4..a5c1b12e45044 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; @@ -317,7 +318,7 @@ private void runLookup(List keyTypes, PopulateIndices populateIndices, */ try ( SearchContext searchContext = searchService.createSearchContext( - new ShardSearchRequest(shardId, System.currentTimeMillis(), AliasFilter.EMPTY, null), + new ShardSearchRequest(shardId, System.currentTimeMillis(), AliasFilter.EMPTY, null, SplitShardCountSummary.UNSET), SearchService.NO_TIMEOUT ) ) { diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/CanMatchIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/CanMatchIT.java index 541e6a1421946..29a2fd342eb34 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/CanMatchIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/CanMatchIT.java @@ -16,7 +16,6 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.query.MatchQueryBuilder; import org.elasticsearch.index.query.RangeQueryBuilder; -import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.RemoteClusterAware; @@ -78,8 +77,8 @@ public void testCanMatch() { ComputeService.DATA_ACTION_NAME, (handler, request, channel, task) -> { DataNodeRequest dataNodeRequest = (DataNodeRequest) request; - for (ShardId shardId : dataNodeRequest.shardIds()) { - queriedIndices.add(shardId.getIndexName()); + for (DataNodeRequest.Shard shard : dataNodeRequest.shards()) { + queriedIndices.add(shard.shardId().getIndexName()); } handler.messageReceived(request, channel, task); } @@ -395,8 +394,8 @@ public void testSkipOnIndexName() { ComputeService.DATA_ACTION_NAME, (handler, request, channel, task) -> { DataNodeRequest dataNodeRequest = (DataNodeRequest) request; - for (ShardId shardId : dataNodeRequest.shardIds()) { - queriedIndices.add(shardId.getIndexName()); + for (DataNodeRequest.Shard shard : dataNodeRequest.shards()) { + queriedIndices.add(shard.shardId().getIndexName()); } handler.messageReceived(request, channel, task); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java index 42ef1f617b3d8..1b9bb37d2a391 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java @@ -17,6 +17,7 @@ import org.elasticsearch.action.support.RefCountingRunnable; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.project.ProjectResolver; +import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.compute.lucene.IndexedByShardId; import org.elasticsearch.compute.lucene.IndexedByShardIdFromSingleton; @@ -27,6 +28,7 @@ import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Tuple; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; @@ -135,7 +137,7 @@ void startComputeOnDataNodes( @Override protected void sendRequest( DiscoveryNode node, - List shardIds, + List shards, Map aliasFilters, NodeListener nodeListener ) { @@ -170,7 +172,7 @@ protected void sendRequest( try { groupTask = computeService.createGroupTask( parentTask, - () -> "compute group: data-node [" + node.getName() + "], " + shardIds + " [" + shardIds + "]" + () -> "compute group: data-node [" + node.getName() + "], shards [" + shards + "]" ); } catch (TaskCancelledException e) { l.onFailure(e); @@ -194,7 +196,7 @@ protected void sendRequest( childSessionId, configuration, clusterAlias, - shardIds, + shards, aliasFilters, dataNodePlan, originalIndices.indices(), @@ -283,9 +285,9 @@ private void runBatch(int startBatchIndex) { final Configuration configuration = request.configuration(); final String clusterAlias = request.clusterAlias(); final var sessionId = request.sessionId(); - final int endBatchIndex = Math.min(startBatchIndex + maxConcurrentShards, request.shardIds().size()); + final int endBatchIndex = Math.min(startBatchIndex + maxConcurrentShards, request.shards().size()); final AtomicInteger pagesProduced = new AtomicInteger(); - List shardIds = request.shardIds().subList(startBatchIndex, endBatchIndex); + List shards = request.shards().subList(startBatchIndex, endBatchIndex); ActionListener batchListener = new ActionListener<>() { final ActionListener ref = computeListener.acquireCompute(); @@ -301,8 +303,8 @@ public void onResponse(DriverCompletionInfo info) { @Override public void onFailure(Exception e) { if (pagesProduced.get() == 0 && failFastOnShardFailure == false) { - for (ShardId shardId : shardIds) { - addShardLevelFailure(shardId, e); + for (DataNodeRequest.Shard shard : shards) { + addShardLevelFailure(shard.shardId(), e); } onResponse(DriverCompletionInfo.EMPTY); } else { @@ -317,7 +319,7 @@ public void onFailure(Exception e) { }; acquireSearchContexts( clusterAlias, - shardIds, + shards, configuration, request.aliasFilters(), ActionListener.wrap(acquiredSearchContexts -> { @@ -363,18 +365,20 @@ public void onFailure(Exception e) { private void acquireSearchContexts( String clusterAlias, - List shardIds, + List shards, Configuration configuration, Map aliasFilters, ActionListener> listener ) { - final List targetShards = new ArrayList<>(); - for (ShardId shardId : shardIds) { + final List> targetShards = new ArrayList<>(); + for (DataNodeRequest.Shard shard : shards) { try { - var indexShard = searchService.getIndicesService().indexServiceSafe(shardId.getIndex()).getShard(shardId.id()); - targetShards.add(indexShard); + var indexShard = searchService.getIndicesService() + .indexServiceSafe(shard.shardId().getIndex()) + .getShard(shard.shardId().id()); + targetShards.add(new Tuple<>(indexShard, shard.reshardSplitShardCountSummary())); } catch (Exception e) { - if (addShardLevelFailure(shardId, e) == false) { + if (addShardLevelFailure(shard.shardId(), e) == false) { listener.onFailure(e); return; } @@ -385,15 +389,17 @@ private void acquireSearchContexts( synchronized (searchContexts) { int startingIndex = searchContexts.length(); int endingIndex = startingIndex; - for (IndexShard shard : targetShards) { + for (Tuple targetShard : targetShards) { SearchContext context = null; + IndexShard indexShard = targetShard.v1(); try { - var aliasFilter = aliasFilters.getOrDefault(shard.shardId().getIndex(), AliasFilter.EMPTY); + var aliasFilter = aliasFilters.getOrDefault(indexShard.shardId().getIndex(), AliasFilter.EMPTY); var shardRequest = new ShardSearchRequest( - shard.shardId(), + indexShard.shardId(), configuration.absoluteStartedTimeInMillis(), aliasFilter, - clusterAlias + clusterAlias, + targetShard.v2() ); // TODO: `searchService.createSearchContext` allows opening search contexts without limits, // we need to limit the number of active search contexts here or in SearchService @@ -403,7 +409,7 @@ private void acquireSearchContexts( endingIndex++; searchContexts.add(cse); } catch (Exception e) { - if (addShardLevelFailure(shard.shardId(), e)) { + if (addShardLevelFailure(indexShard.shardId(), e)) { IOUtils.close(context); } else { var subList = searchContexts.subRange(startingIndex, endingIndex); @@ -423,9 +429,9 @@ private void acquireSearchContexts( doAcquire.run(); } })) { - for (IndexShard targetShard : targetShards) { + for (Tuple targetShard : targetShards) { final Releasable ref = refs.acquire(); - targetShard.ensureShardSearchActive(await -> { + targetShard.v1().ensureShardSearchActive(await -> { try (ref) { if (await) { waitedForRefreshes.set(true); @@ -437,7 +443,7 @@ private void acquireSearchContexts( } private void onBatchCompleted(int lastBatchIndex) { - if (lastBatchIndex < request.shardIds().size() && exchangeSink.isFinished() == false) { + if (lastBatchIndex < request.shards().size() && exchangeSink.isFinished() == false) { runBatch(lastBatchIndex); } else { // don't return until all pages are fetched @@ -570,7 +576,7 @@ public void messageReceived(DataNodeRequest request, TransportChannel channel, T sessionId + "[n]", // internal session request.configuration(), request.clusterAlias(), - request.shardIds(), + request.shards(), request.aliasFilters(), request.plan(), request.indices(), @@ -580,7 +586,7 @@ public void messageReceived(DataNodeRequest request, TransportChannel channel, T ); // the sender doesn't support retry on shard failures, so we need to fail fast here. final boolean failFastOnShardFailures = supportShardLevelRetryFailure(channel.getVersion()) == false; - var computeSearchContexts = new ComputeSearchContextByShardId(request.shardIds().size()); + var computeSearchContexts = new ComputeSearchContextByShardId(request.shards().size()); runComputeOnDataNode( (CancellableTask) task, sessionId, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequest.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequest.java index 4da60bbb08127..525dfbe394d35 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequest.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequest.java @@ -11,14 +11,17 @@ import org.elasticsearch.TransportVersions; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.NoopCircuitBreaker; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.BlockStreamInput; import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexReshardService; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; @@ -54,7 +57,7 @@ final class DataNodeRequest extends AbstractTransportRequest implements IndicesR private final String clusterAlias; private final Map aliasFilters; private final PhysicalPlan plan; - private List shardIds; + private List shards; private String[] indices; private final IndicesOptions indicesOptions; private final boolean runNodeLevelReduction; @@ -64,7 +67,7 @@ final class DataNodeRequest extends AbstractTransportRequest implements IndicesR String sessionId, Configuration configuration, String clusterAlias, - List shardIds, + List shards, Map aliasFilters, PhysicalPlan plan, String[] indices, @@ -75,7 +78,7 @@ final class DataNodeRequest extends AbstractTransportRequest implements IndicesR this.sessionId = sessionId; this.configuration = configuration; this.clusterAlias = clusterAlias; - this.shardIds = shardIds; + this.shards = shards; this.aliasFilters = aliasFilters; this.plan = plan; this.indices = indices; @@ -100,7 +103,11 @@ final class DataNodeRequest extends AbstractTransportRequest implements IndicesR new BlockStreamInput(in, new BlockFactory(new NoopCircuitBreaker(CircuitBreaker.REQUEST), BigArrays.NON_RECYCLING_INSTANCE)) ); this.clusterAlias = in.readString(); - this.shardIds = in.readCollectionAsList(ShardId::new); + if (in.getTransportVersion().supports(IndexReshardService.RESHARDING_SHARD_SUMMARY_IN_ESQL)) { + this.shards = in.readCollectionAsList(Shard::new); + } else { + this.shards = in.readCollectionAsList(i -> new Shard(new ShardId(i), SplitShardCountSummary.UNSET)); + } this.aliasFilters = in.readMap(Index::new, AliasFilter::readFrom); PlanStreamInput pin = new PlanStreamInput(in, in.namedWriteableRegistry(), configuration, idMapper); this.plan = pin.readNamedWriteable(PhysicalPlan.class); @@ -124,7 +131,11 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(sessionId); configuration.writeTo(out); out.writeString(clusterAlias); - out.writeCollection(shardIds); + if (out.getTransportVersion().supports(IndexReshardService.RESHARDING_SHARD_SUMMARY_IN_ESQL)) { + out.writeCollection(shards); + } else { + out.writeCollection(shards, (o, s) -> s.shardId().writeTo(o)); + } out.writeMap(aliasFilters); new PlanStreamOutput(out, configuration).writeNamedWriteable(plan); out.writeStringArray(indices); @@ -146,8 +157,8 @@ public String[] indices() { public IndicesRequest indices(String... indices) { this.indices = indices; if (Arrays.equals(NO_INDICES_OR_ALIASES_ARRAY, indices) || Arrays.asList(indices).contains(NO_INDEX_PLACEHOLDER)) { - logger.trace(() -> format("Indices empty after index resolution, also clearing shardIds %s", shardIds)); - this.shardIds = Collections.emptyList(); + logger.trace(() -> format("Indices empty after index resolution, also clearing shardIds %s", shards)); + this.shards = Collections.emptyList(); } return this; } @@ -192,8 +203,8 @@ String clusterAlias() { return clusterAlias; } - List shardIds() { - return shardIds; + List shards() { + return shards; } /** @@ -217,7 +228,7 @@ boolean reductionLateMaterialization() { @Override public String getDescription() { - return "shards=" + shardIds + " plan=" + plan; + return "shards=" + shards + " plan=" + plan; } @Override @@ -233,7 +244,7 @@ public boolean equals(Object o) { return sessionId.equals(request.sessionId) && configuration.equals(request.configuration) && clusterAlias.equals(request.clusterAlias) - && shardIds.equals(request.shardIds) + && shards.equals(request.shards) && aliasFilters.equals(request.aliasFilters) && plan.equals(request.plan) && getParentTask().equals(request.getParentTask()) @@ -248,7 +259,7 @@ public int hashCode() { sessionId, configuration, clusterAlias, - shardIds, + shards, aliasFilters, plan, Arrays.hashCode(indices), @@ -262,7 +273,7 @@ public DataNodeRequest withPlan(ExchangeSinkExec newPlan) { sessionId, configuration, clusterAlias, - shardIds, + shards, aliasFilters, newPlan, indices, @@ -271,4 +282,16 @@ public DataNodeRequest withPlan(ExchangeSinkExec newPlan) { reductionLateMaterialization ); } + + public record Shard(ShardId shardId, SplitShardCountSummary reshardSplitShardCountSummary) implements Writeable { + Shard(StreamInput in) throws IOException { + this(new ShardId(in), new SplitShardCountSummary(in)); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + shardId.writeTo(out); + reshardSplitShardCountSummary.writeTo(out); + } + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java index 11f1a444eb20a..1d50967d333b7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java @@ -20,6 +20,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.project.ProjectResolver; +import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.util.Maps; @@ -280,7 +281,7 @@ private List selectFailures() { private void sendOneNodeRequest(TargetShards targetShards, ComputeListener computeListener, NodeRequest request) { final ActionListener listener = computeListener.acquireCompute(); - sendRequest(request.node, request.shardIds, request.aliasFilters, new NodeListener() { + sendRequest(request.node, request.shards, request.aliasFilters, new NodeListener() { void onAfter(DriverCompletionInfo info) { nodePermits.get(request.node).release(); @@ -294,9 +295,9 @@ void onAfter(DriverCompletionInfo info) { @Override public void onResponse(DataNodeComputeResponse response) { // remove failures of successful shards - for (ShardId shardId : request.shardIds()) { - if (response.shardLevelFailures().containsKey(shardId) == false) { - shardFailures.remove(shardId); + for (DataNodeRequest.Shard shard : request.shards()) { + if (response.shardLevelFailures().containsKey(shard.shardId()) == false) { + shardFailures.remove(shard.shardId()); } } for (var entry : response.shardLevelFailures().entrySet()) { @@ -309,9 +310,9 @@ public void onResponse(DataNodeComputeResponse response) { @Override public void onFailure(Exception e, boolean receivedData) { - for (ShardId shardId : request.shardIds) { - trackShardLevelFailure(shardId, receivedData, e); - pendingShardIds.add(shardId); + for (DataNodeRequest.Shard shard : request.shards) { + trackShardLevelFailure(shard.shardId(), receivedData, e); + pendingShardIds.add(shard.shardId()); } onAfter(DriverCompletionInfo.EMPTY); } @@ -328,7 +329,12 @@ public void onSkip() { }); } - abstract void sendRequest(DiscoveryNode node, List shardIds, Map aliasFilters, NodeListener nodeListener); + abstract void sendRequest( + DiscoveryNode node, + List shards, + Map aliasFilters, + NodeListener nodeListener + ); interface NodeListener { void onResponse(DataNodeComputeResponse response); @@ -381,11 +387,17 @@ TargetShard getShard(ShardId shardId) { } /** - * (Remaining) allocated nodes of a given shard id and its alias filter + * Information required to send requests for a shard with this shardId. + * Note that {@link SplitShardCountSummary} value should never change, it is important for resharding feature to work. */ - record TargetShard(ShardId shardId, List remainingNodes, AliasFilter aliasFilter) {} + record TargetShard( + ShardId shardId, + List remainingNodes, + AliasFilter aliasFilter, + SplitShardCountSummary reshardSplitShardCountSummary + ) {} - record NodeRequest(DiscoveryNode node, List shardIds, Map aliasFilters) {} + record NodeRequest(DiscoveryNode node, List shards, Map aliasFilters) {} private record ShardFailure(boolean fatal, Exception failure) {} @@ -400,7 +412,7 @@ private static boolean isRetryableFailure(ShardFailure failure) { */ private List selectNodeRequests(TargetShards targetShards) { assert sendingLock.isHeldByCurrentThread(); - final Map> nodeToShardIds = new LinkedHashMap<>(); + final Map> nodeToShardMetadata = new LinkedHashMap<>(); final Iterator shardsIt = pendingShardIds.iterator(); while (shardsIt.hasNext()) { @@ -414,9 +426,9 @@ private List selectNodeRequests(TargetShards targetShards) { Iterator nodesIt = shard.remainingNodes.iterator(); while (nodesIt.hasNext()) { DiscoveryNode node = nodesIt.next(); - List pendingRequest = nodeToShardIds.get(node); + List pendingRequest = nodeToShardMetadata.get(node); if (pendingRequest != null) { - pendingRequest.add(shard.shardId); + pendingRequest.add(new DataNodeRequest.Shard(shard.shardId, shard.reshardSplitShardCountSummary)); nodesIt.remove(); shardsIt.remove(); break; @@ -425,8 +437,8 @@ private List selectNodeRequests(TargetShards targetShards) { if (concurrentRequests == null || concurrentRequests.tryAcquire()) { if (nodePermits.computeIfAbsent(node, n -> new Semaphore(1)).tryAcquire()) { pendingRequest = new ArrayList<>(); - pendingRequest.add(shard.shardId); - nodeToShardIds.put(node, pendingRequest); + pendingRequest.add(new DataNodeRequest.Shard(shard.shardId, shard.reshardSplitShardCountSummary)); + nodeToShardMetadata.put(node, pendingRequest); nodesIt.remove(); shardsIt.remove(); @@ -439,18 +451,18 @@ private List selectNodeRequests(TargetShards targetShards) { } } - final List nodeRequests = new ArrayList<>(nodeToShardIds.size()); - for (var entry : nodeToShardIds.entrySet()) { + final List nodeRequests = new ArrayList<>(nodeToShardMetadata.size()); + for (var entry : nodeToShardMetadata.entrySet()) { var node = entry.getKey(); - var shardIds = entry.getValue(); + var shards = entry.getValue(); Map aliasFilters = new HashMap<>(); - for (ShardId shardId : shardIds) { - var aliasFilter = targetShards.getShard(shardId).aliasFilter; + for (DataNodeRequest.Shard shard : shards) { + var aliasFilter = targetShards.getShard(shard.shardId()).aliasFilter; if (aliasFilter != null) { - aliasFilters.put(shardId.getIndex(), aliasFilter); + aliasFilters.put(shard.shardId().getIndex(), aliasFilter); } } - nodeRequests.add(new NodeRequest(node, shardIds, aliasFilters)); + nodeRequests.add(new NodeRequest(node, shards, aliasFilters)); } return nodeRequests; } @@ -485,7 +497,7 @@ void searchShards(Set concreteIndices, ActionListener list allocatedNodes.add(nodes.get(n)); } AliasFilter aliasFilter = resp.getAliasFilters().get(shardId.getIndex().getUUID()); - shards.put(shardId, new TargetShard(shardId, allocatedNodes, aliasFilter)); + shards.put(shardId, new TargetShard(shardId, allocatedNodes, aliasFilter, group.reshardSplitShardCountSummary())); } return new TargetShards(shards, totalShards, skippedShards); }); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java index b6ad5b5837acd..32b221e0270fd 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.node.VersionInformation; +import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.common.breaker.CircuitBreaker.Durability; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.settings.Settings; @@ -43,6 +44,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -83,11 +85,14 @@ public class DataNodeRequestSenderTests extends ComputeTestCase { private final DiscoveryNode node3 = DiscoveryNodeUtils.builder("node-3").roles(Set.of(DATA_HOT_NODE_ROLE)).build(); private final DiscoveryNode node4 = DiscoveryNodeUtils.builder("node-4").roles(Set.of(DATA_HOT_NODE_ROLE)).build(); private final DiscoveryNode node5 = DiscoveryNodeUtils.builder("node-5").roles(Set.of(DATA_HOT_NODE_ROLE)).build(); - private final ShardId shard1 = new ShardId("index", "n/a", 1); - private final ShardId shard2 = new ShardId("index", "n/a", 2); - private final ShardId shard3 = new ShardId("index", "n/a", 3); - private final ShardId shard4 = new ShardId("index", "n/a", 4); - private final ShardId shard5 = new ShardId("index", "n/a", 5); + private final DataNodeRequest.Shard shard1 = new DataNodeRequest.Shard(new ShardId("index", "n/a", 1), SplitShardCountSummary.UNSET); + private final DataNodeRequest.Shard shard2 = new DataNodeRequest.Shard(new ShardId("index", "n/a", 2), SplitShardCountSummary.UNSET); + private final DataNodeRequest.Shard shard3 = new DataNodeRequest.Shard( + new ShardId("index", "n/a", 3), + SplitShardCountSummary.fromInt(3) + ); + private final DataNodeRequest.Shard shard4 = new DataNodeRequest.Shard(new ShardId("index", "n/a", 4), SplitShardCountSummary.UNSET); + private final DataNodeRequest.Shard shard5 = new DataNodeRequest.Shard(new ShardId("index", "n/a", 5), SplitShardCountSummary.UNSET); @Before public void setThreadPool() { @@ -130,8 +135,8 @@ public void testOnePass() { targetShard(shard4, node2, node3) ); Queue sent = ConcurrentCollections.newQueue(); - var future = sendRequests(randomBoolean(), -1, targetShards, (node, shardIds, aliasFilters, listener) -> { - sent.add(nodeRequest(node, shardIds)); + var future = sendRequests(randomBoolean(), -1, targetShards, (node, shards, aliasFilters, listener) -> { + sent.add(nodeRequest(node, shards)); runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()))); }); safeGet(future); @@ -159,7 +164,7 @@ public void testMissingShards() { assertThat(resp.successfulShards, equalTo(2)); assertThat(resp.failures, not(empty())); assertNotNull(resp.failures.get(0).shard()); - assertThat(resp.failures.get(0).shard().getShardId(), equalTo(shard3)); + assertThat(resp.failures.get(0).shard().getShardId(), equalTo(shard3.shardId())); assertThat(resp.failures.get(0).reason(), containsString("no shard copies found")); } } @@ -177,10 +182,10 @@ public void testRetryThenSuccess() { sent.add(nodeRequest(node, shardIds)); Map failures = new HashMap<>(); if (node.equals(node1) && shardIds.contains(shard5)) { - failures.put(shard5, new IOException("test")); + failures.put(shard5.shardId(), new IOException("test")); } if (node.equals(node4) && shardIds.contains(shard2)) { - failures.put(shard2, new IOException("test")); + failures.put(shard2.shardId(), new IOException("test")); } runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, failures))); }); @@ -210,7 +215,7 @@ public void testRetryButFail() { sent.add(nodeRequest(node, shardIds)); Map failures = new HashMap<>(); if (shardIds.contains(shard5)) { - failures.put(shard5, new IOException("test failure for shard5")); + failures.put(shard5.shardId(), new IOException("test failure for shard5")); } runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, failures))); }); @@ -311,7 +316,7 @@ public void testLimitConcurrentNodes() { for (int i = 0; i < shards; i++) { targetShards.add( targetShard( - new ShardId("index", "n/a", i), + new DataNodeRequest.Shard(new ShardId("index", "n/a", i), SplitShardCountSummary.UNSET), DiscoveryNodeUtils.builder("node-" + i).roles(Set.of(DATA_HOT_NODE_ROLE)).build() ) ); @@ -439,15 +444,18 @@ public void testRetryMovedShard() { var attempt = new AtomicInteger(0); var response = safeGet( sendRequests(randomBoolean(), -1, List.of(targetShard(shard1, node1)), shardIds -> switch (attempt.incrementAndGet()) { - case 1 -> Map.of(shard1, List.of(node2)); - case 2 -> Map.of(shard1, List.of(node3)); - default -> Map.of(shard1, List.of(node4)); + case 1 -> Map.of(shard1.shardId(), List.of(node2)); + case 2 -> Map.of(shard1.shardId(), List.of(node3)); + default -> Map.of(shard1.shardId(), List.of(node4)); }, (node, shardIds, aliasFilters, listener) -> runWithDelay( () -> listener.onResponse( Objects.equals(node, node4) ? new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()) - : new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of(shard1, new ShardNotFoundException(shard1))) + : new DataNodeComputeResponse( + DriverCompletionInfo.EMPTY, + Map.of(shard1.shardId(), new ShardNotFoundException(shard1.shardId())) + ) ) ) ) @@ -467,12 +475,14 @@ public void testRetryMultipleMovedShards() { -1, List.of(targetShard(shard1, node1), targetShard(shard2, node2), targetShard(shard3, node3)), shardIds -> shardIds.stream().collect(toMap(Function.identity(), shardId -> List.of(randomFrom(node1, node2, node3)))), - (node, shardIds, aliasFilters, listener) -> runWithDelay( + (node, shards, aliasFilters, listener) -> runWithDelay( () -> listener.onResponse( attempt.incrementAndGet() <= 6 ? new DataNodeComputeResponse( DriverCompletionInfo.EMPTY, - shardIds.stream().collect(toMap(Function.identity(), ShardNotFoundException::new)) + shards.stream() + .map(DataNodeRequest.Shard::shardId) + .collect(toMap(Function.identity(), ShardNotFoundException::new)) ) : new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()) ) @@ -489,11 +499,14 @@ public void testDoesNotRetryMovedShardIndefinitely() { var attempt = new AtomicInteger(0); var response = safeGet(sendRequests(true, -1, List.of(targetShard(shard1, node1)), shardIds -> { attempt.incrementAndGet(); - return Map.of(shard1, List.of(node2)); + return Map.of(shard1.shardId(), List.of(node2)); }, (node, shardIds, aliasFilters, listener) -> runWithDelay( () -> listener.onResponse( - new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of(shard1, new ShardNotFoundException(shard1))) + new DataNodeComputeResponse( + DriverCompletionInfo.EMPTY, + Map.of(shard1.shardId(), new ShardNotFoundException(shard1.shardId())) + ) ) ) )); @@ -511,17 +524,23 @@ public void testRetryOnlyMovedShards() { sendRequests(randomBoolean(), -1, List.of(targetShard(shard1, node1, node3), targetShard(shard2, node2)), shardIds -> { attempt.incrementAndGet(); resolvedShards.addAll(shardIds); - return Map.of(shard2, List.of(node4)); + return Map.of(shard2.shardId(), List.of(node4)); }, (node, shardIds, aliasFilters, listener) -> runWithDelay(() -> { if (Objects.equals(node, node1)) { // search is going to be retried from replica on node3 without shard resolution listener.onResponse( - new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of(shard1, new ShardNotFoundException(shard1))) + new DataNodeComputeResponse( + DriverCompletionInfo.EMPTY, + Map.of(shard1.shardId(), new ShardNotFoundException(shard1.shardId())) + ) ); } else if (Objects.equals(node, node2)) { // search is going to be retried after resolving new shard node since there are no replicas listener.onResponse( - new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of(shard2, new ShardNotFoundException(shard2))) + new DataNodeComputeResponse( + DriverCompletionInfo.EMPTY, + Map.of(shard2.shardId(), new ShardNotFoundException(shard2.shardId())) + ) ); } else { listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of())); @@ -533,20 +552,23 @@ public void testRetryOnlyMovedShards() { assertThat(response.skippedShards, equalTo(0)); assertThat(response.failedShards, equalTo(0)); assertThat(attempt.get(), equalTo(1)); - assertThat("Must retry only affected shards", resolvedShards, contains(shard2)); + assertThat("Must retry only affected shards", resolvedShards, contains(shard2.shardId())); } public void testRetryUnassignedShardWithoutPartialResults() { var attempt = new AtomicInteger(0); var future = sendRequests(false, -1, List.of(targetShard(shard1, node1), targetShard(shard2, node2)), shardIds -> { attempt.incrementAndGet(); - return Map.of(shard1, List.of()); + return Map.of(shard1.shardId(), List.of()); }, (node, shardIds, aliasFilters, listener) -> runWithDelay( () -> listener.onResponse( Objects.equals(shardIds, List.of(shard2)) ? new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()) - : new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of(shard1, new ShardNotFoundException(shard1))) + : new DataNodeComputeResponse( + DriverCompletionInfo.EMPTY, + Map.of(shard1.shardId(), new ShardNotFoundException(shard1.shardId())) + ) ) ) @@ -559,13 +581,16 @@ public void testRetryUnassignedShardWithPartialResults() { var attempt = new AtomicInteger(0); var response = safeGet(sendRequests(true, -1, List.of(targetShard(shard1, node1), targetShard(shard2, node2)), shardIds -> { attempt.incrementAndGet(); - return Map.of(shard1, List.of()); + return Map.of(shard1.shardId(), List.of()); }, (node, shardIds, aliasFilters, listener) -> runWithDelay( () -> listener.onResponse( Objects.equals(shardIds, List.of(shard2)) ? new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()) - : new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of(shard1, new ShardNotFoundException(shard1))) + : new DataNodeComputeResponse( + DriverCompletionInfo.EMPTY, + Map.of(shard1.shardId(), new ShardNotFoundException(shard1.shardId())) + ) ) ) )); @@ -576,17 +601,22 @@ public void testRetryUnassignedShardWithPartialResults() { assertThat(attempt.get(), equalTo(1)); } - static DataNodeRequestSender.TargetShard targetShard(ShardId shardId, DiscoveryNode... nodes) { - return new DataNodeRequestSender.TargetShard(shardId, new ArrayList<>(Arrays.asList(nodes)), null); + static DataNodeRequestSender.TargetShard targetShard(DataNodeRequest.Shard shard, DiscoveryNode... nodes) { + return new DataNodeRequestSender.TargetShard( + shard.shardId(), + new ArrayList<>(Arrays.asList(nodes)), + null, + shard.reshardSplitShardCountSummary() + ); } - static DataNodeRequestSender.NodeRequest nodeRequest(DiscoveryNode node, ShardId... shardIds) { + static DataNodeRequestSender.NodeRequest nodeRequest(DiscoveryNode node, DataNodeRequest.Shard... shardIds) { return nodeRequest(node, Arrays.asList(shardIds)); } - static DataNodeRequestSender.NodeRequest nodeRequest(DiscoveryNode node, List shardIds) { - var copy = new ArrayList<>(shardIds); - Collections.sort(copy); + static DataNodeRequestSender.NodeRequest nodeRequest(DiscoveryNode node, List shards) { + var copy = new ArrayList<>(shards); + Collections.sort(copy, Comparator.comparing(DataNodeRequest.Shard::shardId)); return new NodeRequest(node, copy, Map.of()); } @@ -669,11 +699,11 @@ Map> resolveShards(Set shardIds) { @Override protected void sendRequest( DiscoveryNode node, - List shardIds, + List shards, Map aliasFilters, NodeListener listener ) { - sender.sendRequestToOneNode(node, shardIds, aliasFilters, listener); + sender.sendRequestToOneNode(node, shards, aliasFilters, listener); } }.startComputeOnDataNodes(Set.of(randomAlphaOfLength(10)), () -> {}, future); return future; @@ -684,6 +714,11 @@ interface Resolver { } interface Sender { - void sendRequestToOneNode(DiscoveryNode node, List shardIds, Map aliasFilters, NodeListener listener); + void sendRequestToOneNode( + DiscoveryNode node, + List shards, + Map aliasFilters, + NodeListener listener + ); } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSerializationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSerializationTests.java index 4b6ede9da2ae2..4e659350b370c 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSerializationTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSerializationTests.java @@ -9,6 +9,7 @@ import org.elasticsearch.TransportVersion; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; @@ -81,7 +82,14 @@ protected DataNodeRequest createTestInstance() { | eval c = first_name | stats x = avg(salary) """); - List shardIds = randomList(1, 10, () -> new ShardId("index-" + between(1, 10), "n/a", between(1, 10))); + List shards = randomList( + 1, + 10, + () -> new DataNodeRequest.Shard( + new ShardId("index-" + between(1, 10), "n/a", between(1, 10)), + SplitShardCountSummary.fromInt(randomIntBetween(0, 1024)) + ) + ); PhysicalPlan physicalPlan = mapAndMaybeOptimize(parse(query)); Map aliasFilters = Map.of( new Index("concrete-index", "n/a"), @@ -91,7 +99,7 @@ protected DataNodeRequest createTestInstance() { sessionId, randomConfiguration(query, randomTables()), randomAlphaOfLength(10), - shardIds, + shards, aliasFilters, physicalPlan, generateRandomStringArray(10, 10, false, false), @@ -111,7 +119,7 @@ protected DataNodeRequest mutateInstance(DataNodeRequest in) throws IOException randomAlphaOfLength(20), in.configuration(), in.clusterAlias(), - in.shardIds(), + in.shards(), in.aliasFilters(), in.plan(), in.indices(), @@ -127,7 +135,7 @@ protected DataNodeRequest mutateInstance(DataNodeRequest in) throws IOException in.sessionId(), randomConfiguration(), in.clusterAlias(), - in.shardIds(), + in.shards(), in.aliasFilters(), in.plan(), in.indices(), @@ -139,12 +147,19 @@ protected DataNodeRequest mutateInstance(DataNodeRequest in) throws IOException yield request; } case 2 -> { - List shardIds = randomList(1, 10, () -> new ShardId("new-index-" + between(1, 10), "n/a", between(1, 10))); + List shards = randomList( + 1, + 10, + () -> new DataNodeRequest.Shard( + new ShardId("new-index-" + between(1, 10), "n/a", between(1, 10)), + SplitShardCountSummary.fromInt(randomIntBetween(0, 1024)) + ) + ); var request = new DataNodeRequest( in.sessionId(), in.configuration(), in.clusterAlias(), - shardIds, + shards, in.aliasFilters(), in.plan(), in.indices(), @@ -173,7 +188,7 @@ protected DataNodeRequest mutateInstance(DataNodeRequest in) throws IOException in.sessionId(), in.configuration(), in.clusterAlias(), - in.shardIds(), + in.shards(), in.aliasFilters(), mapAndMaybeOptimize(parse(newQuery)), in.indices(), @@ -195,7 +210,7 @@ protected DataNodeRequest mutateInstance(DataNodeRequest in) throws IOException in.sessionId(), in.configuration(), in.clusterAlias(), - in.shardIds(), + in.shards(), aliasFilters, in.plan(), in.indices(), @@ -211,7 +226,7 @@ protected DataNodeRequest mutateInstance(DataNodeRequest in) throws IOException in.sessionId(), in.configuration(), in.clusterAlias(), - in.shardIds(), + in.shards(), in.aliasFilters(), in.plan(), in.indices(), @@ -231,7 +246,7 @@ protected DataNodeRequest mutateInstance(DataNodeRequest in) throws IOException in.sessionId(), in.configuration(), clusterAlias, - in.shardIds(), + in.shards(), in.aliasFilters(), in.plan(), in.indices(), @@ -248,7 +263,7 @@ protected DataNodeRequest mutateInstance(DataNodeRequest in) throws IOException in.sessionId(), in.configuration(), in.clusterAlias(), - in.shardIds(), + in.shards(), in.aliasFilters(), in.plan(), indices, @@ -268,7 +283,7 @@ protected DataNodeRequest mutateInstance(DataNodeRequest in) throws IOException in.sessionId(), in.configuration(), in.clusterAlias(), - in.shardIds(), + in.shards(), in.aliasFilters(), in.plan(), in.indices(), @@ -284,7 +299,7 @@ protected DataNodeRequest mutateInstance(DataNodeRequest in) throws IOException in.sessionId(), in.configuration(), in.clusterAlias(), - in.shardIds(), + in.shards(), in.aliasFilters(), in.plan(), in.indices(), diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestTests.java index 56cc332263432..8cb524d304e43 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestTests.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.esql.plugin; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; @@ -24,7 +25,14 @@ public class DataNodeRequestTests extends ESTestCase { public void testNoIndexPlaceholder() { var sessionId = randomAlphaOfLength(10); - List shardIds = randomList(1, 10, () -> new ShardId("index-" + between(1, 10), "n/a", between(1, 10))); + List shards = randomList( + 1, + 10, + () -> new DataNodeRequest.Shard( + new ShardId("index-" + between(1, 10), "n/a", between(1, 10)), + SplitShardCountSummary.fromInt(randomIntBetween(0, 1024)) + ) + ); DataNodeRequest request = new DataNodeRequest( sessionId, @@ -35,7 +43,7 @@ public void testNoIndexPlaceholder() { | stats x = avg(c) """, randomTables()), randomAlphaOfLength(10), - shardIds, + shards, Collections.emptyMap(), null, generateRandomStringArray(10, 10, false, false), @@ -44,14 +52,14 @@ public void testNoIndexPlaceholder() { randomBoolean() ); - assertThat(request.shardIds(), equalTo(shardIds)); + assertThat(request.shards(), equalTo(shards)); request.indices(generateRandomStringArray(10, 10, false, false)); - assertThat(request.shardIds(), equalTo(shardIds)); + assertThat(request.shards(), equalTo(shards)); request.indices(NO_INDEX_PLACEHOLDER); - assertThat(request.shardIds(), empty()); + assertThat(request.shards(), empty()); } } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointActionTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointActionTests.java index 0d2d9619aca68..ad94d715a9a0c 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointActionTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointActionTests.java @@ -9,6 +9,7 @@ import org.elasticsearch.action.search.SearchShardsGroup; import org.elasticsearch.action.search.SearchShardsResponse; +import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; @@ -46,9 +47,9 @@ public class TransportGetCheckpointActionTests extends ESTestCase { public void testFilterOutSkippedShards_EmptyNodesAndShards() { SearchShardsResponse searchShardsResponse = new SearchShardsResponse( Set.of( - new SearchShardsGroup(SHARD_A_0, List.of(NODE_0, NODE_1), true), - new SearchShardsGroup(SHARD_B_0, List.of(NODE_1, NODE_2), false), - new SearchShardsGroup(SHARD_B_1, List.of(NODE_0, NODE_2), true) + new SearchShardsGroup(SHARD_A_0, List.of(NODE_0, NODE_1), true, SplitShardCountSummary.UNSET), + new SearchShardsGroup(SHARD_B_0, List.of(NODE_1, NODE_2), false, SplitShardCountSummary.UNSET), + new SearchShardsGroup(SHARD_B_1, List.of(NODE_0, NODE_2), true, SplitShardCountSummary.UNSET) ), Set.of(), Map.of() @@ -72,10 +73,10 @@ public void testFilterOutSkippedShards_EmptySearchShardsResponse() { public void testFilterOutSkippedShards_SomeNodesEmptyAfterFiltering() { SearchShardsResponse searchShardsResponse = new SearchShardsResponse( Set.of( - new SearchShardsGroup(SHARD_A_0, List.of(NODE_0, NODE_2), true), - new SearchShardsGroup(SHARD_A_1, List.of(NODE_0, NODE_2), true), - new SearchShardsGroup(SHARD_B_0, List.of(NODE_0, NODE_2), true), - new SearchShardsGroup(SHARD_B_1, List.of(NODE_0, NODE_2), true) + new SearchShardsGroup(SHARD_A_0, List.of(NODE_0, NODE_2), true, SplitShardCountSummary.UNSET), + new SearchShardsGroup(SHARD_A_1, List.of(NODE_0, NODE_2), true, SplitShardCountSummary.UNSET), + new SearchShardsGroup(SHARD_B_0, List.of(NODE_0, NODE_2), true, SplitShardCountSummary.UNSET), + new SearchShardsGroup(SHARD_B_1, List.of(NODE_0, NODE_2), true, SplitShardCountSummary.UNSET) ), Set.of(), Map.of() @@ -91,10 +92,10 @@ public void testFilterOutSkippedShards_SomeNodesEmptyAfterFiltering() { public void testFilterOutSkippedShards_AllNodesEmptyAfterFiltering() { SearchShardsResponse searchShardsResponse = new SearchShardsResponse( Set.of( - new SearchShardsGroup(SHARD_A_0, List.of(NODE_0, NODE_1, NODE_2), true), - new SearchShardsGroup(SHARD_A_1, List.of(NODE_0, NODE_1, NODE_2), true), - new SearchShardsGroup(SHARD_B_0, List.of(NODE_0, NODE_1, NODE_2), true), - new SearchShardsGroup(SHARD_B_1, List.of(NODE_0, NODE_1, NODE_2), true) + new SearchShardsGroup(SHARD_A_0, List.of(NODE_0, NODE_1, NODE_2), true, SplitShardCountSummary.UNSET), + new SearchShardsGroup(SHARD_A_1, List.of(NODE_0, NODE_1, NODE_2), true, SplitShardCountSummary.UNSET), + new SearchShardsGroup(SHARD_B_0, List.of(NODE_0, NODE_1, NODE_2), true, SplitShardCountSummary.UNSET), + new SearchShardsGroup(SHARD_B_1, List.of(NODE_0, NODE_1, NODE_2), true, SplitShardCountSummary.UNSET) ), Set.of(), Map.of() @@ -109,10 +110,10 @@ public void testFilterOutSkippedShards_AllNodesEmptyAfterFiltering() { public void testFilterOutSkippedShards() { SearchShardsResponse searchShardsResponse = new SearchShardsResponse( Set.of( - new SearchShardsGroup(SHARD_A_0, List.of(NODE_0, NODE_1), true), - new SearchShardsGroup(SHARD_B_0, List.of(NODE_1, NODE_2), false), - new SearchShardsGroup(SHARD_B_1, List.of(NODE_0, NODE_2), true), - new SearchShardsGroup(new ShardId(INDEX_C, 0), List.of(NODE_0, NODE_1, NODE_2), true) + new SearchShardsGroup(SHARD_A_0, List.of(NODE_0, NODE_1), true, SplitShardCountSummary.UNSET), + new SearchShardsGroup(SHARD_B_0, List.of(NODE_1, NODE_2), false, SplitShardCountSummary.UNSET), + new SearchShardsGroup(SHARD_B_1, List.of(NODE_0, NODE_2), true, SplitShardCountSummary.UNSET), + new SearchShardsGroup(new ShardId(INDEX_C, 0), List.of(NODE_0, NODE_1, NODE_2), true, SplitShardCountSummary.UNSET) ), Set.of(), Map.of()