Skip to content

Commit 3814ce0

Browse files
masseykejoegallo
andauthored
Improving statsByShard performance when the number of shards is very large (#130857) (#137749)
Co-authored-by: Joe Gallo <joe.gallo@elastic.co>
1 parent 42408b5 commit 3814ce0

File tree

11 files changed

+233
-95
lines changed

11 files changed

+233
-95
lines changed

docs/changelog/130857.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 130857
2+
summary: Improving statsByShard performance when the number of shards is very large
3+
area: Stats
4+
type: bug
5+
issues:
6+
- 97222

server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@
4848
import org.elasticsearch.index.seqno.RetentionLeaseStats;
4949
import org.elasticsearch.index.seqno.SeqNoStats;
5050
import org.elasticsearch.index.shard.IndexShard;
51+
import org.elasticsearch.index.shard.ShardId;
52+
import org.elasticsearch.indices.IndicesQueryCache;
5153
import org.elasticsearch.indices.IndicesService;
5254
import org.elasticsearch.injection.guice.Inject;
5355
import org.elasticsearch.node.NodeService;
@@ -258,9 +260,12 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
258260
false,
259261
false
260262
);
263+
Map<ShardId, Long> shardIdToSharedRam = IndicesQueryCache.getSharedRamSizeForAllShards(indicesService);
261264
List<ShardStats> shardsStats = new ArrayList<>();
262265
for (IndexService indexService : indicesService) {
263266
for (IndexShard indexShard : indexService) {
267+
// get the shared ram for this shard id (or zero if there's nothing in the map)
268+
long sharedRam = shardIdToSharedRam.getOrDefault(indexShard.shardId(), 0L);
264269
cancellableTask.ensureNotCancelled();
265270
if (indexShard.routingEntry() != null && indexShard.routingEntry().active()) {
266271
// only report on fully started shards
@@ -281,7 +286,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
281286
new ShardStats(
282287
indexShard.routingEntry(),
283288
indexShard.shardPath(),
284-
CommonStats.getShardLevelStats(indicesService.getIndicesQueryCache(), indexShard, SHARD_STATS_FLAGS),
289+
CommonStats.getShardLevelStats(indicesService.getIndicesQueryCache(), indexShard, SHARD_STATS_FLAGS, sharedRam),
285290
commitStats,
286291
seqNoStats,
287292
retentionLeaseStats,
@@ -312,7 +317,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
312317
clusterStatus,
313318
nodeInfo,
314319
nodeStats,
315-
shardsStats.toArray(new ShardStats[shardsStats.size()]),
320+
shardsStats.toArray(new ShardStats[0]),
316321
searchUsageStats,
317322
repositoryUsageStats,
318323
ccsTelemetry,
@@ -474,7 +479,7 @@ protected void sendItemRequest(String clusterAlias, ActionListener<RemoteCluster
474479
@Override
475480
protected void onItemResponse(String clusterAlias, RemoteClusterStatsResponse response) {
476481
if (response != null) {
477-
remoteClustersStats.computeIfPresent(clusterAlias, (k, v) -> v.acceptResponse(response));
482+
remoteClustersStats.computeIfPresent(clusterAlias, (ignored, v) -> v.acceptResponse(response));
478483
}
479484
}
480485

server/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,12 @@ public CommonStats(CommonStatsFlags flags) {
154154
/**
155155
* Filters the given flags for {@link CommonStatsFlags#SHARD_LEVEL} flags and calculates the corresponding statistics.
156156
*/
157-
public static CommonStats getShardLevelStats(IndicesQueryCache indicesQueryCache, IndexShard indexShard, CommonStatsFlags flags) {
157+
public static CommonStats getShardLevelStats(
158+
IndicesQueryCache indicesQueryCache,
159+
IndexShard indexShard,
160+
CommonStatsFlags flags,
161+
long precomputedSharedRam
162+
) {
158163
// Filter shard level flags
159164
CommonStatsFlags filteredFlags = flags.clone();
160165
for (CommonStatsFlags.Flag flag : filteredFlags.getFlags()) {
@@ -174,7 +179,7 @@ public static CommonStats getShardLevelStats(IndicesQueryCache indicesQueryCache
174179
case Refresh -> stats.refresh = indexShard.refreshStats();
175180
case Flush -> stats.flush = indexShard.flushStats();
176181
case Warmer -> stats.warmer = indexShard.warmerStats();
177-
case QueryCache -> stats.queryCache = indicesQueryCache.getStats(indexShard.shardId());
182+
case QueryCache -> stats.queryCache = indicesQueryCache.getStats(indexShard.shardId(), precomputedSharedRam);
178183
case FieldData -> stats.fieldData = indexShard.fieldDataStats(flags.fieldDataFields());
179184
case Completion -> stats.completion = indexShard.completionStats(flags.completionDataFields());
180185
case Segments -> stats.segments = indexShard.segmentStats(

server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.index.seqno.RetentionLeaseStats;
2828
import org.elasticsearch.index.seqno.SeqNoStats;
2929
import org.elasticsearch.index.shard.IndexShard;
30+
import org.elasticsearch.indices.IndicesQueryCache;
3031
import org.elasticsearch.indices.IndicesService;
3132
import org.elasticsearch.injection.guice.Inject;
3233
import org.elasticsearch.tasks.CancellableTask;
@@ -114,7 +115,13 @@ protected void shardOperation(IndicesStatsRequest request, ShardRouting shardRou
114115
assert task instanceof CancellableTask;
115116
IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex());
116117
IndexShard indexShard = indexService.getShard(shardRouting.shardId().id());
117-
CommonStats commonStats = CommonStats.getShardLevelStats(indicesService.getIndicesQueryCache(), indexShard, request.flags());
118+
long sharedRam = IndicesQueryCache.getSharedRamSizeForShard(indicesService, indexShard.shardId());
119+
CommonStats commonStats = CommonStats.getShardLevelStats(
120+
indicesService.getIndicesQueryCache(),
121+
indexShard,
122+
request.flags(),
123+
sharedRam
124+
);
118125
CommitStats commitStats;
119126
SeqNoStats seqNoStats;
120127
RetentionLeaseStats retentionLeaseStats;

server/src/main/java/org/elasticsearch/indices/IndicesQueryCache.java

Lines changed: 84 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,15 @@
2626
import org.elasticsearch.common.unit.ByteSizeValue;
2727
import org.elasticsearch.core.Nullable;
2828
import org.elasticsearch.core.Predicates;
29+
import org.elasticsearch.index.IndexService;
2930
import org.elasticsearch.index.cache.query.QueryCacheStats;
31+
import org.elasticsearch.index.shard.IndexShard;
3032
import org.elasticsearch.index.shard.ShardId;
3133

3234
import java.io.Closeable;
3335
import java.io.IOException;
3436
import java.util.Collections;
37+
import java.util.HashMap;
3538
import java.util.IdentityHashMap;
3639
import java.util.Map;
3740
import java.util.Set;
@@ -67,6 +70,38 @@ public class IndicesQueryCache implements QueryCache, Closeable {
6770
private final Map<ShardId, Stats> shardStats = new ConcurrentHashMap<>();
6871
private volatile long sharedRamBytesUsed;
6972

73+
/**
74+
* Calculates a map of {@link ShardId} to {@link Long} which contains the calculated share of the {@link IndicesQueryCache} shared ram
75+
* size for a given shard (that is, the sum of all the longs is the size of the indices query cache). Since many shards will not
76+
* participate in the cache, shards whose calculated share is zero will not be contained in the map at all. As a consequence, the
77+
* correct pattern for using the returned map will be via {@link Map#getOrDefault(Object, Object)} with a {@code defaultValue} of
78+
* {@code 0L}.
79+
*/
80+
public static Map<ShardId, Long> getSharedRamSizeForAllShards(IndicesService indicesService) {
81+
Map<ShardId, Long> shardIdToSharedRam = new HashMap<>();
82+
IndicesQueryCache.CacheTotals cacheTotals = IndicesQueryCache.getCacheTotalsForAllShards(indicesService);
83+
for (IndexService indexService : indicesService) {
84+
for (IndexShard indexShard : indexService) {
85+
final var queryCache = indicesService.getIndicesQueryCache();
86+
long sharedRam = (queryCache == null) ? 0L : queryCache.getSharedRamSizeForShard(indexShard.shardId(), cacheTotals);
87+
// as a size optimization, only store non-zero values in the map
88+
if (sharedRam > 0L) {
89+
shardIdToSharedRam.put(indexShard.shardId(), sharedRam);
90+
}
91+
}
92+
}
93+
return Collections.unmodifiableMap(shardIdToSharedRam);
94+
}
95+
96+
public long getCacheSizeForShard(ShardId shardId) {
97+
Stats stats = shardStats.get(shardId);
98+
return stats != null ? stats.cacheSize : 0L;
99+
}
100+
101+
public long getSharedRamBytesUsed() {
102+
return sharedRamBytesUsed;
103+
}
104+
70105
// This is a hack for the fact that the close listener for the
71106
// ShardCoreKeyMap will be called before onDocIdSetEviction
72107
// See onDocIdSetEviction for more info
@@ -89,40 +124,58 @@ private static QueryCacheStats toQueryCacheStatsSafe(@Nullable Stats stats) {
89124
return stats == null ? new QueryCacheStats() : stats.toQueryCacheStats();
90125
}
91126

92-
private long getShareOfAdditionalRamBytesUsed(long itemsInCacheForShard) {
93-
if (sharedRamBytesUsed == 0L) {
94-
return 0L;
95-
}
96-
97-
/*
98-
* We have some shared ram usage that we try to distribute proportionally to the number of segment-requests in the cache for each
99-
* shard.
100-
*/
101-
// TODO avoid looping over all local shards here - see https://github.com/elastic/elasticsearch/issues/97222
127+
/**
128+
* This computes the total cache size in bytes, and the total shard count in the cache for all shards.
129+
* @param indicesService
130+
* @return A CacheTotals object containing the computed total number of items in the cache and the number of shards seen in the cache
131+
*/
132+
private static CacheTotals getCacheTotalsForAllShards(IndicesService indicesService) {
133+
IndicesQueryCache queryCache = indicesService.getIndicesQueryCache();
134+
boolean hasQueryCache = queryCache != null;
102135
long totalItemsInCache = 0L;
103136
int shardCount = 0;
104-
if (itemsInCacheForShard == 0L) {
105-
for (final var stats : shardStats.values()) {
106-
shardCount += 1;
107-
if (stats.cacheSize > 0L) {
108-
// some shard has nonzero cache footprint, so we apportion the shared size by cache footprint, and this shard has none
109-
return 0L;
110-
}
111-
}
112-
} else {
113-
// branchless loop for the common case
114-
for (final var stats : shardStats.values()) {
115-
shardCount += 1;
116-
totalItemsInCache += stats.cacheSize;
137+
for (final IndexService indexService : indicesService) {
138+
for (final IndexShard indexShard : indexService) {
139+
final var shardId = indexShard.shardId();
140+
long cacheSize = hasQueryCache ? queryCache.getCacheSizeForShard(shardId) : 0L;
141+
shardCount++;
142+
assert cacheSize >= 0 : "Unexpected cache size of " + cacheSize + " for shard " + shardId;
143+
totalItemsInCache += cacheSize;
117144
}
118145
}
146+
return new CacheTotals(totalItemsInCache, shardCount);
147+
}
119148

149+
public static long getSharedRamSizeForShard(IndicesService indicesService, ShardId shardId) {
150+
IndicesQueryCache.CacheTotals cacheTotals = IndicesQueryCache.getCacheTotalsForAllShards(indicesService);
151+
final var queryCache = indicesService.getIndicesQueryCache();
152+
return (queryCache == null) ? 0L : queryCache.getSharedRamSizeForShard(shardId, cacheTotals);
153+
}
154+
155+
/**
156+
* This method computes the shared RAM size in bytes for the given indexShard.
157+
* @param shardId The shard to compute the shared RAM size for
158+
* @param cacheTotals Shard totals computed in getCacheTotalsForAllShards()
159+
* @return the shared RAM size in bytes allocated to the given shard, or 0 if unavailable
160+
*/
161+
private long getSharedRamSizeForShard(ShardId shardId, CacheTotals cacheTotals) {
162+
long sharedRamBytesUsed = getSharedRamBytesUsed();
163+
if (sharedRamBytesUsed == 0L) {
164+
return 0L;
165+
}
166+
167+
int shardCount = cacheTotals.shardCount();
120168
if (shardCount == 0) {
121169
// Sometimes it's not possible to do this when there are no shard entries at all, which can happen as the shared ram usage can
122170
// extend beyond the closing of all shards.
123171
return 0L;
124172
}
125-
173+
/*
174+
* We have some shared ram usage that we try to distribute proportionally to the number of segment-requests in the cache for each
175+
* shard.
176+
*/
177+
long totalItemsInCache = cacheTotals.totalItemsInCache();
178+
long itemsInCacheForShard = getCacheSizeForShard(shardId);
126179
final long additionalRamBytesUsed;
127180
if (totalItemsInCache == 0) {
128181
// all shards have zero cache footprint, so we apportion the size of the shared bytes equally across all shards
@@ -143,10 +196,12 @@ private long getShareOfAdditionalRamBytesUsed(long itemsInCacheForShard) {
143196
return additionalRamBytesUsed;
144197
}
145198

199+
private record CacheTotals(long totalItemsInCache, int shardCount) {}
200+
146201
/** Get usage statistics for the given shard. */
147-
public QueryCacheStats getStats(ShardId shard) {
202+
public QueryCacheStats getStats(ShardId shard, long precomputedSharedRamBytesUsed) {
148203
final QueryCacheStats queryCacheStats = toQueryCacheStatsSafe(shardStats.get(shard));
149-
queryCacheStats.addRamBytesUsed(getShareOfAdditionalRamBytesUsed(queryCacheStats.getCacheSize()));
204+
queryCacheStats.addRamBytesUsed(precomputedSharedRamBytesUsed);
150205
return queryCacheStats;
151206
}
152207

@@ -243,7 +298,7 @@ QueryCacheStats toQueryCacheStats() {
243298
public String toString() {
244299
return "{shardId="
245300
+ shardId
246-
+ ", ramBytedUsed="
301+
+ ", ramBytesUsed="
247302
+ ramBytesUsed
248303
+ ", hitCount="
249304
+ hitCount
@@ -340,11 +395,7 @@ protected void onDocIdSetCache(Object readerCoreKey, long ramBytesUsed) {
340395
shardStats.cacheCount += 1;
341396
shardStats.ramBytesUsed += ramBytesUsed;
342397

343-
StatsAndCount statsAndCount = stats2.get(readerCoreKey);
344-
if (statsAndCount == null) {
345-
statsAndCount = new StatsAndCount(shardStats);
346-
stats2.put(readerCoreKey, statsAndCount);
347-
}
398+
StatsAndCount statsAndCount = stats2.computeIfAbsent(readerCoreKey, ignored -> new StatsAndCount(shardStats));
348399
statsAndCount.count += 1;
349400
}
350401

@@ -357,7 +408,7 @@ protected void onDocIdSetEviction(Object readerCoreKey, int numEntries, long sum
357408
if (numEntries > 0) {
358409
// We can't use ShardCoreKeyMap here because its core closed
359410
// listener is called before the listener of the cache which
360-
// triggers this eviction. So instead we use use stats2 that
411+
// triggers this eviction. So instead we use stats2 that
361412
// we only evict when nothing is cached anymore on the segment
362413
// instead of relying on close listeners
363414
final StatsAndCount statsAndCount = stats2.get(readerCoreKey);

server/src/main/java/org/elasticsearch/indices/IndicesService.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -520,33 +520,36 @@ static Map<Index, CommonStats> statsByIndex(final IndicesService indicesService,
520520
}
521521

522522
static Map<Index, List<IndexShardStats>> statsByShard(final IndicesService indicesService, final CommonStatsFlags flags) {
523+
Map<ShardId, Long> shardIdToSharedRam = IndicesQueryCache.getSharedRamSizeForAllShards(indicesService);
523524
final Map<Index, List<IndexShardStats>> statsByShard = new HashMap<>();
524-
525525
for (final IndexService indexService : indicesService) {
526526
for (final IndexShard indexShard : indexService) {
527+
// get the shared ram for this shard id (or zero if there's nothing in the map)
528+
long sharedRam = shardIdToSharedRam.getOrDefault(indexShard.shardId(), 0L);
527529
try {
528-
final IndexShardStats indexShardStats = indicesService.indexShardStats(indicesService, indexShard, flags);
529-
530+
final IndexShardStats indexShardStats = indicesService.indexShardStats(indicesService, indexShard, flags, sharedRam);
530531
if (indexShardStats == null) {
531532
continue;
532533
}
533-
534534
if (statsByShard.containsKey(indexService.index()) == false) {
535535
statsByShard.put(indexService.index(), arrayAsArrayList(indexShardStats));
536536
} else {
537537
statsByShard.get(indexService.index()).add(indexShardStats);
538538
}
539539
} catch (IllegalIndexShardStateException | AlreadyClosedException e) {
540-
// we can safely ignore illegal state on ones that are closing for example
541540
logger.trace(() -> format("%s ignoring shard stats", indexShard.shardId()), e);
542541
}
543542
}
544543
}
545-
546544
return statsByShard;
547545
}
548546

549-
IndexShardStats indexShardStats(final IndicesService indicesService, final IndexShard indexShard, final CommonStatsFlags flags) {
547+
IndexShardStats indexShardStats(
548+
final IndicesService indicesService,
549+
final IndexShard indexShard,
550+
final CommonStatsFlags flags,
551+
final long precomputedSharedRam
552+
) {
550553
if (indexShard.routingEntry() == null) {
551554
return null;
552555
}
@@ -571,7 +574,7 @@ IndexShardStats indexShardStats(final IndicesService indicesService, final Index
571574
new ShardStats(
572575
indexShard.routingEntry(),
573576
indexShard.shardPath(),
574-
CommonStats.getShardLevelStats(indicesService.getIndicesQueryCache(), indexShard, flags),
577+
CommonStats.getShardLevelStats(indicesService.getIndicesQueryCache(), indexShard, flags, precomputedSharedRam),
575578
commitStats,
576579
seqNoStats,
577580
retentionLeaseStats,

server/src/test/java/org/elasticsearch/action/admin/cluster/stats/VersionStatsTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ public void testCreation() {
115115
ShardStats shardStats = new ShardStats(
116116
shardRouting,
117117
new ShardPath(false, path, path, shardRouting.shardId()),
118-
CommonStats.getShardLevelStats(null, indexShard, new CommonStatsFlags(CommonStatsFlags.Flag.Store)),
118+
CommonStats.getShardLevelStats(null, indexShard, new CommonStatsFlags(CommonStatsFlags.Flag.Store), 0L),
119119
null,
120120
null,
121121
null,

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1626,7 +1626,7 @@ public void testShardStats() throws IOException {
16261626
ShardStats stats = new ShardStats(
16271627
shard.routingEntry(),
16281628
shard.shardPath(),
1629-
CommonStats.getShardLevelStats(new IndicesQueryCache(Settings.EMPTY), shard, new CommonStatsFlags()),
1629+
CommonStats.getShardLevelStats(new IndicesQueryCache(Settings.EMPTY), shard, new CommonStatsFlags(), 0L),
16301630
shard.commitStats(),
16311631
shard.seqNoStats(),
16321632
shard.getRetentionLeaseStats(),

0 commit comments

Comments
 (0)