From 9a654f382b105d0984c02f539f6d73d71c7e7b57 Mon Sep 17 00:00:00 2001 From: Pat Whelan Date: Wed, 5 Nov 2025 20:17:38 -0500 Subject: [PATCH 1/5] [ML] Implement CCMCache Implements a lazy-loading cache in front of the CCM Inference index. By default, the cache holds a single entry for 15 minutes, and cache misses search the CCM index and load the responses into the cache. Some design decisions: - The cache maintains an "empty" entry so that the `isPresent` call can reuse the "empty" response to quickly return false. Invalidating the cache or calling get will drop this "empty" entry. - Invalidating the cache will broadcast a message to all nodes so that all caches on all nodes will invalidate their caches. - Since the broadcast message only works if all nodes are on the latest version, there is a new NodeFeature to enable the cache once all nodes in the cluster have upgraded. --- .../services/elastic/ccm/CCMCacheTests.java | 156 ++++++++++++ .../xpack/inference/InferenceFeatures.java | 3 +- .../xpack/inference/InferencePlugin.java | 18 +- .../common/BroadcastMessageAction.java | 147 ++++++++++++ .../services/elastic/ccm/CCMCache.java | 224 ++++++++++++++++++ 5 files changed, 545 insertions(+), 3 deletions(-) create mode 100644 x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/services/elastic/ccm/CCMCacheTests.java create mode 100644 x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/common/BroadcastMessageAction.java create mode 100644 x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elastic/ccm/CCMCache.java diff --git a/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/services/elastic/ccm/CCMCacheTests.java b/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/services/elastic/ccm/CCMCacheTests.java new file mode 100644 index 0000000000000..75b5af5b82aac --- /dev/null +++ b/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/services/elastic/ccm/CCMCacheTests.java @@ -0,0 +1,156 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.inference.services.elastic.ccm; + +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.support.ActionTestUtils; +import org.elasticsearch.action.support.TestPlainActionFuture; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.xpack.inference.LocalStateInferencePlugin; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.sameInstance; + +public class CCMCacheTests extends ESSingleNodeTestCase { + + private static final TimeValue TIMEOUT = new TimeValue(30, TimeUnit.SECONDS); + + private CCMCache ccmCache; + private CCMStorageService ccmStorageService; + + @Override + protected Collection> getPlugins() { + return List.of(LocalStateInferencePlugin.class); + } + + @Before + public void createComponents() { + ccmCache = node().injector().getInstance(CCMCache.class); + ccmStorageService = node().injector().getInstance(CCMStorageService.class); + } + + @Override + protected boolean resetNodeAfterTest() { + return true; + } + + @After + public void clearCacheAndIndex() { + try { + indicesAdmin().prepareDelete(CCMIndex.INDEX_NAME).execute().actionGet(TIMEOUT); + } catch (ResourceNotFoundException e) { + // mission complete! + } + } + + public void testCacheHit() throws IOException { + var expectedCcmModel = storeCcm(); + var actualCcmModel = getFromCache(); + assertThat(actualCcmModel, equalTo(expectedCcmModel)); + assertThat(ccmCache.stats().getHits(), equalTo(0L)); + assertThat(getFromCache(), sameInstance(actualCcmModel)); + assertThat(ccmCache.stats().getHits(), equalTo(1L)); + } + + private CCMModel storeCcm() throws IOException { + var ccmModel = CCMModel.fromXContentBytes(new BytesArray(""" + { + "api_key": "test_key" + } + """)); + var listener = new TestPlainActionFuture(); + ccmStorageService.store(ccmModel, listener); + listener.actionGet(TIMEOUT); + return ccmModel; + } + + private CCMModel getFromCache() { + var listener = new TestPlainActionFuture(); + ccmCache.get(listener); + return listener.actionGet(TIMEOUT); + } + + public void testCacheInvalidate() throws Exception { + var expectedCcmModel = storeCcm(); + var actualCcmModel = getFromCache(); + assertThat(actualCcmModel, equalTo(expectedCcmModel)); + assertThat(ccmCache.stats().getHits(), equalTo(0L)); + assertThat(ccmCache.stats().getMisses(), equalTo(1L)); + assertThat(ccmCache.cacheCount(), equalTo(1)); + + var listener = new TestPlainActionFuture(); + ccmCache.invalidate(listener); + listener.actionGet(TIMEOUT); + + assertThat(getFromCache(), not(sameInstance(actualCcmModel))); + assertThat(ccmCache.stats().getHits(), equalTo(0L)); + assertThat(ccmCache.stats().getMisses(), equalTo(2L)); + assertThat(ccmCache.stats().getEvictions(), equalTo(1L)); + assertThat(ccmCache.cacheCount(), equalTo(1)); + } + + public void testEmptyInvalidate() throws InterruptedException { + var latch = new CountDownLatch(1); + ccmCache.invalidate(ActionTestUtils.assertNoFailureListener(success -> latch.countDown())); + assertTrue(latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS)); + + assertThat(ccmCache.stats().getEvictions(), equalTo(0L)); + assertThat(ccmCache.cacheCount(), equalTo(0)); + } + + private boolean isPresent() { + var listener = new TestPlainActionFuture(); + ccmCache.isEnabled(listener); + return listener.actionGet(TIMEOUT); + } + + public void testIsEnabled() throws IOException { + storeCcm(); + + getFromCache(); + assertThat(ccmCache.stats().getHits(), equalTo(0L)); + assertThat(ccmCache.stats().getMisses(), equalTo(1L)); + + assertTrue(isPresent()); + assertThat(ccmCache.stats().getHits(), equalTo(1L)); + assertThat(ccmCache.stats().getMisses(), equalTo(1L)); + } + + public void testIsDisabledWithMissingIndex() { + assertFalse(isPresent()); + } + + public void testIsDisabledWithPresentIndex() { + indicesAdmin().prepareCreate(CCMIndex.INDEX_NAME).execute().actionGet(TIMEOUT); + assertFalse(isPresent()); + } + + public void testIsDisabledWithCacheHit() { + indicesAdmin().prepareCreate(CCMIndex.INDEX_NAME).execute().actionGet(TIMEOUT); + + assertFalse(isPresent()); + assertThat(ccmCache.stats().getHits(), equalTo(0L)); + assertThat(ccmCache.stats().getMisses(), equalTo(1L)); + + assertFalse(isPresent()); + assertThat(ccmCache.stats().getHits(), equalTo(1L)); + assertThat(ccmCache.stats().getMisses(), equalTo(1L)); + } +} diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferenceFeatures.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferenceFeatures.java index a55a126976284..4ec2bb3280522 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferenceFeatures.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferenceFeatures.java @@ -54,11 +54,12 @@ public class InferenceFeatures implements FeatureSpecification { private static final NodeFeature SEMANTIC_TEXT_FIELDS_CHUNKS_FORMAT = new NodeFeature("semantic_text.fields_chunks_format"); public static final NodeFeature INFERENCE_ENDPOINT_CACHE = new NodeFeature("inference.endpoint.cache"); + public static final NodeFeature INFERENCE_CCM_CACHE = new NodeFeature("inference.ccm.cache"); public static final NodeFeature SEARCH_USAGE_EXTENDED_DATA = new NodeFeature("search.usage.extended_data"); @Override public Set getFeatures() { - return Set.of(INFERENCE_ENDPOINT_CACHE); + return Set.of(INFERENCE_ENDPOINT_CACHE, INFERENCE_CCM_CACHE); } @Override diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferencePlugin.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferencePlugin.java index 491f0f06be199..7767944a5fc91 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferencePlugin.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferencePlugin.java @@ -134,6 +134,7 @@ import org.elasticsearch.xpack.inference.services.elastic.ElasticInferenceServiceComponents; import org.elasticsearch.xpack.inference.services.elastic.ElasticInferenceServiceSettings; import org.elasticsearch.xpack.inference.services.elastic.authorization.ElasticInferenceServiceAuthorizationRequestHandler; +import org.elasticsearch.xpack.inference.services.elastic.ccm.CCMCache; import org.elasticsearch.xpack.inference.services.elastic.ccm.CCMFeatureFlag; import org.elasticsearch.xpack.inference.services.elastic.ccm.CCMIndex; import org.elasticsearch.xpack.inference.services.elastic.ccm.CCMStorageService; @@ -254,7 +255,8 @@ public List getActions() { new ActionHandler(GetInferenceServicesAction.INSTANCE, TransportGetInferenceServicesAction.class), new ActionHandler(UnifiedCompletionAction.INSTANCE, TransportUnifiedCompletionInferenceAction.class), new ActionHandler(GetRerankerWindowSizeAction.INSTANCE, TransportGetRerankerWindowSizeAction.class), - new ActionHandler(ClearInferenceEndpointCacheAction.INSTANCE, ClearInferenceEndpointCacheAction.class) + new ActionHandler(ClearInferenceEndpointCacheAction.INSTANCE, ClearInferenceEndpointCacheAction.class), + new ActionHandler(CCMCache.ClearCCMCacheAction.INSTANCE, CCMCache.ClearCCMCacheAction.class) ); } @@ -409,7 +411,18 @@ public Collection createComponents(PluginServices services) { ); if (CCMFeatureFlag.FEATURE_FLAG.isEnabled()) { - components.add(new CCMStorageService(services.client())); + var ccmStorageService = new CCMStorageService(services.client()); + components.add(ccmStorageService); + components.add( + new CCMCache( + ccmStorageService, + services.clusterService(), + settings, + services.featureService(), + services.projectResolver(), + services.client() + ) + ); } return components; @@ -609,6 +622,7 @@ public static Set> getInferenceSettings() { settings.add(INFERENCE_QUERY_TIMEOUT); settings.addAll(InferenceEndpointRegistry.getSettingsDefinitions()); settings.addAll(ElasticInferenceServiceSettings.getSettingsDefinitions()); + settings.addAll(CCMCache.getSettingsDefinitions()); return Collections.unmodifiableSet(settings); } diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/common/BroadcastMessageAction.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/common/BroadcastMessageAction.java new file mode 100644 index 0000000000000..c3b510636de40 --- /dev/null +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/common/BroadcastMessageAction.java @@ -0,0 +1,147 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.inference.common; + +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.TransportAction; +import org.elasticsearch.action.support.nodes.BaseNodeResponse; +import org.elasticsearch.action.support.nodes.BaseNodesRequest; +import org.elasticsearch.action.support.nodes.BaseNodesResponse; +import org.elasticsearch.action.support.nodes.TransportNodesAction; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.AbstractTransportRequest; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * Broadcasts a {@link Writeable} to all nodes and responds with an empty object. + * This is intended to be used as a fire-and-forget style, where responses and failures are logged and swallowed. + */ +public abstract class BroadcastMessageAction extends TransportNodesAction< + BroadcastMessageAction.Request, + BroadcastMessageAction.Response, + BroadcastMessageAction.NodeRequest, + BroadcastMessageAction.NodeResponse, + Void> { + + protected BroadcastMessageAction( + String actionName, + ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters, + Writeable.Reader messageReader + ) { + super( + actionName, + clusterService, + transportService, + actionFilters, + in -> new NodeRequest<>(messageReader.read(in)), + clusterService.threadPool().executor(ThreadPool.Names.MANAGEMENT) + ); + } + + @Override + protected Response newResponse(Request request, List nodeResponses, List failures) { + return new Response(clusterService.getClusterName(), nodeResponses, failures); + } + + @Override + protected NodeRequest newNodeRequest(Request request) { + return new NodeRequest<>(request.message); + } + + @Override + protected NodeResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException { + return new NodeResponse(in, node); + } + + @Override + protected NodeResponse nodeOperation(NodeRequest request, Task task) { + receiveMessage(request.message); + return new NodeResponse(transportService.getLocalNode()); + } + + /** + * This method is run on each node in the cluster. + */ + protected abstract void receiveMessage(Message message); + + public static Request request(T message, TimeValue timeout) { + return new Request<>(message, timeout); + } + + public static class Request extends BaseNodesRequest { + private final Message message; + + protected Request(Message message, TimeValue timeout) { + super(Strings.EMPTY_ARRAY); + this.message = message; + setTimeout(timeout); + } + } + + public static class Response extends BaseNodesResponse { + + protected Response(ClusterName clusterName, List nodes, List failures) { + super(clusterName, nodes, failures); + } + + @Override + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readCollectionAsList(NodeResponse::new); + } + + @Override + protected void writeNodesTo(StreamOutput out, List nodes) { + TransportAction.localOnly(); + } + } + + public static class NodeRequest extends AbstractTransportRequest { + private final Message message; + + private NodeRequest(Message message) { + this.message = message; + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, "broadcasted message to an individual node", parentTaskId, headers); + } + } + + public static class NodeResponse extends BaseNodeResponse { + protected NodeResponse(StreamInput in) throws IOException { + super(in); + } + + protected NodeResponse(StreamInput in, DiscoveryNode node) throws IOException { + super(in, node); + } + + protected NodeResponse(DiscoveryNode node) { + super(node); + } + } +} diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elastic/ccm/CCMCache.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elastic/ccm/CCMCache.java new file mode 100644 index 0000000000000..26f97b00a4c85 --- /dev/null +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elastic/ccm/CCMCache.java @@ -0,0 +1,224 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.inference.services.elastic.ccm; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.project.ProjectResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.cache.Cache; +import org.elasticsearch.common.cache.CacheBuilder; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.features.FeatureService; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.inference.InferenceFeatures; +import org.elasticsearch.xpack.inference.common.BroadcastMessageAction; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Objects; + +public class CCMCache { + + private static final Setting INFERENCE_CCM_CACHE_WEIGHT = Setting.intSetting( + "xpack.inference.ccm.cache.weight", + 1, + Setting.Property.NodeScope + ); + + private static final Setting INFERENCE_CCM_CACHE_EXPIRY = Setting.timeSetting( + "xpack.inference.ccm.cache.expiry_time", + TimeValue.timeValueMinutes(15), + TimeValue.timeValueMinutes(1), + TimeValue.timeValueHours(1), + Setting.Property.NodeScope + ); + + public static Collection> getSettingsDefinitions() { + return List.of(INFERENCE_CCM_CACHE_WEIGHT, INFERENCE_CCM_CACHE_EXPIRY); + } + + private static final Logger logger = LogManager.getLogger(CCMCache.class); + private static final Cache.Stats EMPTY = new Cache.Stats(0, 0, 0); + private final CCMStorageService ccmStorageService; + private final Cache cache; + private final ClusterService clusterService; + private final FeatureService featureService; + private final ProjectResolver projectResolver; + private final Client client; + + public CCMCache( + CCMStorageService ccmStorageService, + ClusterService clusterService, + Settings settings, + FeatureService featureService, + ProjectResolver projectResolver, + Client client + ) { + this.ccmStorageService = ccmStorageService; + this.cache = CacheBuilder.builder() + .setMaximumWeight(INFERENCE_CCM_CACHE_WEIGHT.get(settings)) + .setExpireAfterWrite(INFERENCE_CCM_CACHE_EXPIRY.get(settings)) + .build(); + this.clusterService = clusterService; + this.featureService = featureService; + this.projectResolver = projectResolver; + this.client = client; + } + + public void get(ActionListener listener) { + var projectId = projectResolver.getProjectId(); + var cachedEntry = get(projectId); + if (cachedEntry != null && cachedEntry.enabled()) { + listener.onResponse(cachedEntry.ccmModel()); + } else { + ccmStorageService.get(ActionListener.wrap(ccmModel -> { + enabled(projectId, ccmModel); + listener.onResponse(ccmModel); + }, e -> { + if (e instanceof ResourceNotFoundException) { + disabled(projectId); + } + listener.onFailure(e); + })); + } + } + + private CCMModelEntry get(ProjectId projectId) { + return cacheEnabled() ? cache.get(projectId) : null; + } + + private boolean cacheEnabled() { + var state = clusterService.state(); + return state.clusterRecovered() && featureService.clusterHasFeature(state, InferenceFeatures.INFERENCE_CCM_CACHE); + } + + private void enabled(ProjectId projectId, CCMModel ccmModel) { + if (cacheEnabled()) { + cache.put(projectId, CCMModelEntry.enabled(ccmModel)); + } + } + + private void disabled(ProjectId projectId) { + if (cacheEnabled()) { + cache.put(projectId, CCMModelEntry.DISABLED); + } + } + + public void isEnabled(ActionListener listener) { + var projectId = projectResolver.getProjectId(); + var cachedEntry = get(projectId); + if (cachedEntry != null) { + listener.onResponse(cachedEntry.enabled()); + } else { + ccmStorageService.get(ActionListener.wrap(ccmModel -> { + enabled(projectId, ccmModel); + listener.onResponse(true); + }, e -> { + if (e instanceof ResourceNotFoundException) { + disabled(projectId); + listener.onResponse(false); + } else { + listener.onFailure(e); + } + })); + } + } + + public void invalidate(ActionListener listener) { + if (cacheEnabled()) { + client.execute( + ClearCCMCacheAction.INSTANCE, + ClearCCMCacheAction.request(ClearCCMMessage.INSTANCE, null), + ActionListener.wrap(ack -> { + logger.debug("Successfully refreshed inference CCM cache for project {}.", projectResolver::getProjectId); + listener.onResponse((Void) null); + }, e -> { + logger.atDebug() + .withThrowable(e) + .log("Failed to refresh inference CCM cache for project {}.", projectResolver::getProjectId); + listener.onFailure(e); + }) + ); + } + } + + private void invalidate(ProjectId projectId) { + if (cacheEnabled()) { + var cacheKeys = cache.keys().iterator(); + while (cacheKeys.hasNext()) { + if (cacheKeys.next().equals(projectId)) { + cacheKeys.remove(); + } + } + } + } + + public Cache.Stats stats() { + return cacheEnabled() ? cache.stats() : EMPTY; + } + + public int cacheCount() { + return cacheEnabled() ? cache.count() : 0; + } + + private record CCMModelEntry(boolean enabled, @Nullable CCMModel ccmModel) { + private static final CCMModelEntry DISABLED = new CCMModelEntry(false, null); + + private static CCMModelEntry enabled(CCMModel ccmModel) { + return new CCMModelEntry(true, Objects.requireNonNull(ccmModel)); + } + } + + public static class ClearCCMCacheAction extends BroadcastMessageAction { + private static final String NAME = "cluster:internal/xpack/inference/clear_inference_ccm_cache"; + public static final ActionType INSTANCE = new ActionType<>(NAME); + + private final ProjectResolver projectResolver; + private final CCMCache ccmCache; + + @Inject + public ClearCCMCacheAction( + TransportService transportService, + ClusterService clusterService, + ActionFilters actionFilters, + ProjectResolver projectResolver, + CCMCache ccmCache + ) { + super(NAME, clusterService, transportService, actionFilters, in -> ClearCCMMessage.INSTANCE); + + this.projectResolver = projectResolver; + this.ccmCache = ccmCache; + } + + @Override + protected void receiveMessage(ClearCCMMessage clearCCMMessage) { + ccmCache.invalidate(projectResolver.getProjectId()); + } + } + + public record ClearCCMMessage() implements Writeable { + private static final ClearCCMMessage INSTANCE = new ClearCCMMessage(); + + @Override + public void writeTo(StreamOutput out) throws IOException {} + } +} From f59ca21591907632aa107472f0b8aa99e2c9bd61 Mon Sep 17 00:00:00 2001 From: Pat Whelan Date: Fri, 7 Nov 2025 10:56:45 -0500 Subject: [PATCH 2/5] export module-info --- x-pack/plugin/inference/src/main/java/module-info.java | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/plugin/inference/src/main/java/module-info.java b/x-pack/plugin/inference/src/main/java/module-info.java index bd200fd88a706..67a0cd11c6a4c 100644 --- a/x-pack/plugin/inference/src/main/java/module-info.java +++ b/x-pack/plugin/inference/src/main/java/module-info.java @@ -41,6 +41,7 @@ exports org.elasticsearch.xpack.inference.registry; exports org.elasticsearch.xpack.inference.rest; exports org.elasticsearch.xpack.inference.services; + exports org.elasticsearch.xpack.inference.services.elastic.ccm; exports org.elasticsearch.xpack.inference; exports org.elasticsearch.xpack.inference.action.task; exports org.elasticsearch.xpack.inference.telemetry; From 6eb5d3424a3c4212f631e66e20957e64f1646c6d Mon Sep 17 00:00:00 2001 From: Pat Whelan Date: Mon, 10 Nov 2025 10:04:01 -0500 Subject: [PATCH 3/5] Add non-operator permissions --- .../org/elasticsearch/xpack/security/operator/Constants.java | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index 0957ec55e882a..b10112c842de7 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -328,6 +328,7 @@ public class Constants { "cluster:admin/xpack/watcher/watch/put", "cluster:internal/remote_cluster/nodes", "cluster:internal/xpack/inference", + "cluster:internal/xpack/inference/clear_inference_ccm_cache", "cluster:internal/xpack/inference/clear_inference_endpoint_cache", "cluster:internal/xpack/inference/create_endpoints", "cluster:internal/xpack/inference/rerankwindowsize/get", From 60f1cd0b51272adf5ea20f55b294d07d1ae60e53 Mon Sep 17 00:00:00 2001 From: Pat Whelan Date: Mon, 10 Nov 2025 11:22:02 -0500 Subject: [PATCH 4/5] rename ccmstorageservice --- .../inference/services/elastic/ccm/CCMCacheTests.java | 6 +++--- .../xpack/inference/services/elastic/ccm/CCMCache.java | 10 +++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/services/elastic/ccm/CCMCacheTests.java b/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/services/elastic/ccm/CCMCacheTests.java index 75b5af5b82aac..6cde2f56a158e 100644 --- a/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/services/elastic/ccm/CCMCacheTests.java +++ b/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/services/elastic/ccm/CCMCacheTests.java @@ -33,7 +33,7 @@ public class CCMCacheTests extends ESSingleNodeTestCase { private static final TimeValue TIMEOUT = new TimeValue(30, TimeUnit.SECONDS); private CCMCache ccmCache; - private CCMStorageService ccmStorageService; + private CCMPersistentStorageService ccmPersistentStorageService; @Override protected Collection> getPlugins() { @@ -43,7 +43,7 @@ protected Collection> getPlugins() { @Before public void createComponents() { ccmCache = node().injector().getInstance(CCMCache.class); - ccmStorageService = node().injector().getInstance(CCMStorageService.class); + ccmPersistentStorageService = node().injector().getInstance(CCMPersistentStorageService.class); } @Override @@ -76,7 +76,7 @@ private CCMModel storeCcm() throws IOException { } """)); var listener = new TestPlainActionFuture(); - ccmStorageService.store(ccmModel, listener); + ccmPersistentStorageService.store(ccmModel, listener); listener.actionGet(TIMEOUT); return ccmModel; } diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elastic/ccm/CCMCache.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elastic/ccm/CCMCache.java index 6e864920a711d..3094d39be368f 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elastic/ccm/CCMCache.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elastic/ccm/CCMCache.java @@ -58,7 +58,7 @@ public static Collection> getSettingsDefinitions() { private static final Logger logger = LogManager.getLogger(CCMCache.class); private static final Cache.Stats EMPTY = new Cache.Stats(0, 0, 0); - private final CCMPersistentStorageService CCMPersistentStorageService; + private final CCMPersistentStorageService ccmPersistentStorageService; private final Cache cache; private final ClusterService clusterService; private final FeatureService featureService; @@ -66,14 +66,14 @@ public static Collection> getSettingsDefinitions() { private final Client client; public CCMCache( - CCMPersistentStorageService CCMPersistentStorageService, + CCMPersistentStorageService ccmPersistentStorageService, ClusterService clusterService, Settings settings, FeatureService featureService, ProjectResolver projectResolver, Client client ) { - this.CCMPersistentStorageService = CCMPersistentStorageService; + this.ccmPersistentStorageService = ccmPersistentStorageService; this.cache = CacheBuilder.builder() .setMaximumWeight(INFERENCE_CCM_CACHE_WEIGHT.get(settings)) .setExpireAfterWrite(INFERENCE_CCM_CACHE_EXPIRY.get(settings)) @@ -90,7 +90,7 @@ public void get(ActionListener listener) { if (cachedEntry != null && cachedEntry.enabled()) { listener.onResponse(cachedEntry.ccmModel()); } else { - CCMPersistentStorageService.get(ActionListener.wrap(ccmModel -> { + ccmPersistentStorageService.get(ActionListener.wrap(ccmModel -> { enabled(projectId, ccmModel); listener.onResponse(ccmModel); }, e -> { @@ -129,7 +129,7 @@ public void isEnabled(ActionListener listener) { if (cachedEntry != null) { listener.onResponse(cachedEntry.enabled()); } else { - CCMPersistentStorageService.get(ActionListener.wrap(ccmModel -> { + ccmPersistentStorageService.get(ActionListener.wrap(ccmModel -> { enabled(projectId, ccmModel); listener.onResponse(true); }, e -> { From 11182fdcc8930b40a92525ae2dc429dcc7d749aa Mon Sep 17 00:00:00 2001 From: Pat Whelan Date: Mon, 10 Nov 2025 15:29:59 -0500 Subject: [PATCH 5/5] address comments --- .../services/elastic/ccm/CCMCacheTests.java | 25 +++++++++++++++- .../services/elastic/ccm/CCMCache.java | 30 +++++++++++++------ 2 files changed, 45 insertions(+), 10 deletions(-) diff --git a/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/services/elastic/ccm/CCMCacheTests.java b/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/services/elastic/ccm/CCMCacheTests.java index 6cde2f56a158e..7ec9644e9d8d7 100644 --- a/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/services/elastic/ccm/CCMCacheTests.java +++ b/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/services/elastic/ccm/CCMCacheTests.java @@ -30,7 +30,7 @@ public class CCMCacheTests extends ESSingleNodeTestCase { - private static final TimeValue TIMEOUT = new TimeValue(30, TimeUnit.SECONDS); + private static final TimeValue TIMEOUT = TimeValue.THIRTY_SECONDS; private CCMCache ccmCache; private CCMPersistentStorageService ccmPersistentStorageService; @@ -153,4 +153,27 @@ public void testIsDisabledWithCacheHit() { assertThat(ccmCache.stats().getHits(), equalTo(1L)); assertThat(ccmCache.stats().getMisses(), equalTo(1L)); } + + public void testIsDisabledRefreshedWithGet() throws IOException { + indicesAdmin().prepareCreate(CCMIndex.INDEX_NAME).execute().actionGet(TIMEOUT); + + assertFalse(isPresent()); + assertThat(ccmCache.stats().getHits(), equalTo(0L)); + assertThat(ccmCache.stats().getMisses(), equalTo(1L)); + + var expectedCcmModel = storeCcm(); + + assertFalse(isPresent()); + assertThat(ccmCache.stats().getHits(), equalTo(1L)); + assertThat(ccmCache.stats().getMisses(), equalTo(1L)); + + var actualCcmModel = getFromCache(); + assertThat(actualCcmModel, equalTo(expectedCcmModel)); + assertThat(ccmCache.stats().getHits(), equalTo(2L)); + assertThat(ccmCache.stats().getMisses(), equalTo(1L)); + + assertTrue(isPresent()); + assertThat(ccmCache.stats().getHits(), equalTo(3L)); + assertThat(ccmCache.stats().getMisses(), equalTo(1L)); + } } diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elastic/ccm/CCMCache.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elastic/ccm/CCMCache.java index 3094d39be368f..d565a8ae9cc85 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elastic/ccm/CCMCache.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elastic/ccm/CCMCache.java @@ -36,6 +36,9 @@ import java.util.List; import java.util.Objects; +/** + * Cache for whether CCM is enabled or disabled for this cluster as well as what the CCM key is for when it is enabled. + */ public class CCMCache { private static final Setting INFERENCE_CCM_CACHE_WEIGHT = Setting.intSetting( @@ -84,25 +87,30 @@ public CCMCache( this.client = client; } + /** + * Immediately returns the CCM key if it is cached, or goes to the index if there is no value cached or the previous call returned + * nothing. The expectation is that the caller checks if CCM is enabled via the {@link #isEnabled(ActionListener)} API, which caches + * a boolean value if the CCM key is present or absent in the underlying index. + */ public void get(ActionListener listener) { var projectId = projectResolver.getProjectId(); - var cachedEntry = get(projectId); + var cachedEntry = getCacheEntry(projectId); if (cachedEntry != null && cachedEntry.enabled()) { listener.onResponse(cachedEntry.ccmModel()); } else { ccmPersistentStorageService.get(ActionListener.wrap(ccmModel -> { - enabled(projectId, ccmModel); + putEnabledEntry(projectId, ccmModel); listener.onResponse(ccmModel); }, e -> { if (e instanceof ResourceNotFoundException) { - disabled(projectId); + putDisabledEntry(projectId); } listener.onFailure(e); })); } } - private CCMModelEntry get(ProjectId projectId) { + private CCMModelEntry getCacheEntry(ProjectId projectId) { return cacheEnabled() ? cache.get(projectId) : null; } @@ -111,30 +119,34 @@ private boolean cacheEnabled() { return state.clusterRecovered() && featureService.clusterHasFeature(state, InferenceFeatures.INFERENCE_CCM_CACHE); } - private void enabled(ProjectId projectId, CCMModel ccmModel) { + private void putEnabledEntry(ProjectId projectId, CCMModel ccmModel) { if (cacheEnabled()) { cache.put(projectId, CCMModelEntry.enabled(ccmModel)); } } - private void disabled(ProjectId projectId) { + private void putDisabledEntry(ProjectId projectId) { if (cacheEnabled()) { cache.put(projectId, CCMModelEntry.DISABLED); } } + /** + * Checks if the value is present or absent based on a previous call to {@link #isEnabled(ActionListener)} + * or {@link #get(ActionListener)}. If the cache entry is missing or expired, then it will call through to the backing index. + */ public void isEnabled(ActionListener listener) { var projectId = projectResolver.getProjectId(); - var cachedEntry = get(projectId); + var cachedEntry = getCacheEntry(projectId); if (cachedEntry != null) { listener.onResponse(cachedEntry.enabled()); } else { ccmPersistentStorageService.get(ActionListener.wrap(ccmModel -> { - enabled(projectId, ccmModel); + putEnabledEntry(projectId, ccmModel); listener.onResponse(true); }, e -> { if (e instanceof ResourceNotFoundException) { - disabled(projectId); + putDisabledEntry(projectId); listener.onResponse(false); } else { listener.onFailure(e);