From 5c8c576c30218e581cf8d89e480f8fd1d2711a85 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Mon, 13 Oct 2025 14:01:19 +0100 Subject: [PATCH 1/2] [ML] Add Auditor reset internal action (#136363) Adds auditor reset action for deterministic resets in tests. --- .../core/ml/action/ResetAuditorAction.java | 126 ++++++++++++++++++ .../xpack/ml/MachineLearning.java | 37 ++--- .../action/TransportResetAuditorAction.java | 88 ++++++++++++ .../ml/notifications/AbstractMlAuditor.java | 31 ----- .../xpack/security/operator/Constants.java | 1 + 5 files changed, 235 insertions(+), 48 deletions(-) create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/ResetAuditorAction.java create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportResetAuditorAction.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/ResetAuditorAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/ResetAuditorAction.java new file mode 100644 index 0000000000000..dd5cbaf29bc1f --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/ResetAuditorAction.java @@ -0,0 +1,126 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.ml.action; + +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.nodes.BaseNodeResponse; +import org.elasticsearch.action.support.nodes.BaseNodesRequest; +import org.elasticsearch.action.support.nodes.BaseNodesResponse; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.transport.AbstractTransportRequest; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +public class ResetAuditorAction extends ActionType { + + public static final ResetAuditorAction INSTANCE = new ResetAuditorAction(); + public static final String NAME = "cluster:internal/xpack/ml/auditor/reset"; + + private ResetAuditorAction() { + super(NAME); + } + + public static class Request extends BaseNodesRequest { + + public static Request RESET_AUDITOR_REQUEST = new Request(); + + private Request() { + super(new String[] { "ml:true" }); // Only ml nodes. See DiscoveryNodes::resolveNodes + } + } + + public static class NodeRequest extends AbstractTransportRequest { + + public NodeRequest(StreamInput in) throws IOException { + super(in); + } + + public NodeRequest() {} + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + return true; + } + + @Override + public int hashCode() { + return Objects.hash(); + } + } + + public static class Response extends BaseNodesResponse { + + public Response(ClusterName clusterName, List nodes, List failures) { + super(clusterName, nodes, failures); + } + + protected Response(StreamInput in) throws IOException { + super(in); + } + + public static class ResetResponse extends BaseNodeResponse { + private final boolean acknowledged; + + public ResetResponse(DiscoveryNode node, boolean acknowledged) { + super(node); + this.acknowledged = acknowledged; + } + + public ResetResponse(StreamInput in) throws IOException { + super(in, null); + acknowledged = in.readBoolean(); + } + + public ResetResponse(StreamInput in, DiscoveryNode node) throws IOException { + super(in, node); + acknowledged = in.readBoolean(); + } + + public boolean isAcknowledged() { + return acknowledged; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBoolean(acknowledged); + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) return false; + ResetResponse that = (ResetResponse) o; + return acknowledged == that.acknowledged; + } + + @Override + public int hashCode() { + return Objects.hashCode(acknowledged); + } + } + + @Override + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readCollectionAsList(ResetResponse::new); + } + + @Override + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeCollection(nodes); + } + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 83adee27248be..704b0d7634db4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -32,7 +32,6 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Setting; @@ -161,6 +160,7 @@ import org.elasticsearch.xpack.core.ml.action.PutTrainedModelAliasAction; import org.elasticsearch.xpack.core.ml.action.PutTrainedModelDefinitionPartAction; import org.elasticsearch.xpack.core.ml.action.PutTrainedModelVocabularyAction; +import org.elasticsearch.xpack.core.ml.action.ResetAuditorAction; import org.elasticsearch.xpack.core.ml.action.ResetJobAction; import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction; import org.elasticsearch.xpack.core.ml.action.SetResetModeAction; @@ -271,6 +271,7 @@ import org.elasticsearch.xpack.ml.action.TransportPutTrainedModelAliasAction; import org.elasticsearch.xpack.ml.action.TransportPutTrainedModelDefinitionPartAction; import org.elasticsearch.xpack.ml.action.TransportPutTrainedModelVocabularyAction; +import org.elasticsearch.xpack.ml.action.TransportResetAuditorAction; import org.elasticsearch.xpack.ml.action.TransportResetJobAction; import org.elasticsearch.xpack.ml.action.TransportRevertModelSnapshotAction; import org.elasticsearch.xpack.ml.action.TransportSetResetModeAction; @@ -785,7 +786,6 @@ public void loadExtensions(ExtensionLoader loader) { public static final int MAX_LOW_PRIORITY_MODELS_PER_NODE = 100; private static final Logger logger = LogManager.getLogger(MachineLearning.class); - private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(MachineLearning.class); private final Settings settings; private final boolean enabled; @@ -1564,6 +1564,7 @@ public List getActions() { actionHandlers.add(new ActionHandler(MlMemoryAction.INSTANCE, TransportMlMemoryAction.class)); actionHandlers.add(new ActionHandler(SetUpgradeModeAction.INSTANCE, TransportSetUpgradeModeAction.class)); actionHandlers.add(new ActionHandler(SetResetModeAction.INSTANCE, TransportSetResetModeAction.class)); + actionHandlers.add(new ActionHandler(ResetAuditorAction.INSTANCE, TransportResetAuditorAction.class)); // Included in this section as it's used by MlMemoryAction actionHandlers.add(new ActionHandler(TrainedModelCacheInfoAction.INSTANCE, TransportTrainedModelCacheInfoAction.class)); actionHandlers.add(new ActionHandler(GetMlAutoscalingStats.INSTANCE, TransportGetMlAutoscalingStats.class)); @@ -2149,8 +2150,6 @@ public void cleanUpFeature( final Map results = new ConcurrentHashMap<>(); ActionListener unsetResetModeListener = ActionListener.wrap(success -> { - // reset the auditors as aliases used may be removed - resetAuditors(); client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(true), ActionListener.wrap(resetSuccess -> { finalListener.onResponse(success); @@ -2176,8 +2175,24 @@ public void cleanUpFeature( ); }); + ActionListener resetAuditors = ActionListener.wrap(success -> { + // reset the auditors as aliases used may be removed + client.execute( + ResetAuditorAction.INSTANCE, + ResetAuditorAction.Request.RESET_AUDITOR_REQUEST, + ActionListener.wrap(ignored -> unsetResetModeListener.onResponse(success), unsetResetModeListener::onFailure) + ); + }, failure -> { + logger.error("failed to reset machine learning", failure); + client.execute( + ResetAuditorAction.INSTANCE, + ResetAuditorAction.Request.RESET_AUDITOR_REQUEST, + ActionListener.wrap(ignored -> unsetResetModeListener.onFailure(failure), unsetResetModeListener::onFailure) + ); + }); + // Stop all model deployments - ActionListener pipelineValidation = unsetResetModeListener.delegateFailureAndWrap( + ActionListener pipelineValidation = resetAuditors.delegateFailureAndWrap( (delegate, listTasksResponse) -> { listTasksResponse.rethrowFailures("Waiting for indexing requests for .ml-* indices"); if (results.values().stream().allMatch(b -> b)) { @@ -2332,18 +2347,6 @@ public void cleanUpFeature( client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.enabled(), afterResetModeSet); } - private void resetAuditors() { - if (anomalyDetectionAuditor.get() != null) { - anomalyDetectionAuditor.get().reset(); - } - if (dataFrameAnalyticsAuditor.get() != null) { - dataFrameAnalyticsAuditor.get().reset(); - } - if (inferenceAuditor.get() != null) { - inferenceAuditor.get().reset(); - } - } - @Override public BreakerSettings getCircuitBreaker(Settings settingsToUse) { return BreakerSettings.updateFromSettings( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportResetAuditorAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportResetAuditorAction.java new file mode 100644 index 0000000000000..a8f0daca2274d --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportResetAuditorAction.java @@ -0,0 +1,88 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.ml.action; + +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.nodes.TransportNodesAction; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ml.action.ResetAuditorAction; +import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; +import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor; +import org.elasticsearch.xpack.ml.notifications.InferenceAuditor; + +import java.io.IOException; +import java.util.List; + +public class TransportResetAuditorAction extends TransportNodesAction< + ResetAuditorAction.Request, + ResetAuditorAction.Response, + ResetAuditorAction.NodeRequest, + ResetAuditorAction.Response.ResetResponse, + Void> { + + private final AnomalyDetectionAuditor anomalyDetectionAuditor; + private final DataFrameAnalyticsAuditor dfaAuditor; + private final InferenceAuditor inferenceAuditor; + + @Inject + public TransportResetAuditorAction( + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters, + AnomalyDetectionAuditor anomalyDetectionAuditor, + DataFrameAnalyticsAuditor dfaAuditor, + InferenceAuditor inferenceAuditor + ) { + super( + ResetAuditorAction.NAME, + clusterService, + transportService, + actionFilters, + ResetAuditorAction.NodeRequest::new, + threadPool.executor(ThreadPool.Names.MANAGEMENT) + ); + this.anomalyDetectionAuditor = anomalyDetectionAuditor; + this.dfaAuditor = dfaAuditor; + this.inferenceAuditor = inferenceAuditor; + } + + @Override + protected ResetAuditorAction.Response newResponse( + ResetAuditorAction.Request request, + List resetResponses, + List failures + ) { + return new ResetAuditorAction.Response(clusterService.getClusterName(), resetResponses, failures); + } + + @Override + protected ResetAuditorAction.NodeRequest newNodeRequest(ResetAuditorAction.Request request) { + return new ResetAuditorAction.NodeRequest(); + } + + @Override + protected ResetAuditorAction.Response.ResetResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException { + return new ResetAuditorAction.Response.ResetResponse(in); + } + + @Override + protected ResetAuditorAction.Response.ResetResponse nodeOperation(ResetAuditorAction.NodeRequest request, Task task) { + anomalyDetectionAuditor.reset(); + dfaAuditor.reset(); + inferenceAuditor.reset(); + return new ResetAuditorAction.Response.ResetResponse(clusterService.localNode(), true); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/AbstractMlAuditor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/AbstractMlAuditor.java index 99b03c2725411..2c9949fac70d4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/AbstractMlAuditor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/AbstractMlAuditor.java @@ -16,13 +16,11 @@ import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentParserConfiguration; import org.elasticsearch.xcontent.json.JsonXContent; import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage; import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessageFactory; import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.notifications.NotificationsIndex; import org.elasticsearch.xpack.ml.MlIndexTemplateRegistry; @@ -33,7 +31,6 @@ abstract class AbstractMlAuditor extends AbstractAuditor { private static final Logger logger = LogManager.getLogger(AbstractMlAuditor.class); - private volatile boolean isResetMode; protected AbstractMlAuditor( Client client, @@ -50,34 +47,6 @@ protected AbstractMlAuditor( indexNameExpressionResolver, clusterService.threadPool().generic() ); - clusterService.addListener(event -> { - if (event.metadataChanged()) { - setResetMode(MlMetadata.getMlMetadata(event.state()).isResetMode()); - } - }); - } - - private void setResetMode(boolean value) { - isResetMode = value; - } - - @Override - protected void indexDoc(ToXContent toXContent) { - if (isResetMode) { - logger.trace("Skipped writing the audit message backlog as reset_mode is enabled"); - } else { - super.indexDoc(toXContent); - } - } - - @Override - protected void writeBacklog() { - if (isResetMode) { - logger.trace("Skipped writing the audit message backlog as reset_mode is enabled"); - clearBacklog(); - } else { - super.writeBacklog(); - } } @Override diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index d992f1b028a3c..08e756842a109 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -329,6 +329,7 @@ public class Constants { "cluster:internal/xpack/inference", "cluster:internal/xpack/inference/rerankwindowsize/get", "cluster:internal/xpack/inference/unified", + "cluster:internal/xpack/ml/auditor/reset", "cluster:internal/xpack/ml/coordinatedinference", "cluster:internal/xpack/ml/datafeed/isolate", "cluster:internal/xpack/ml/datafeed/running_state", From d5f76618cbe382e5e90ffe0e6836018a2262da66 Mon Sep 17 00:00:00 2001 From: Jan Kuipers <148754765+jan-elastic@users.noreply.github.com> Date: Mon, 20 Oct 2025 17:34:46 +0200 Subject: [PATCH 2/2] backport: - [ML] Add Auditor reset internal action (#136363) - Fix ML tests failing with "no shards available" (#136800) --- muted-tests.yml | 33 -------------- ...tion.java => ResetMlComponentsAction.java} | 6 +-- .../ml/integration/DeleteExpiredDataIT.java | 2 - .../xpack/ml/MachineLearning.java | 27 +++++------ ...sportGetDataFrameAnalyticsStatsAction.java | 3 +- ... => TransportResetMlComponentsAction.java} | 45 ++++++++++--------- .../inference/TrainedModelStatsService.java | 4 ++ 7 files changed, 48 insertions(+), 72 deletions(-) rename x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/{ResetAuditorAction.java => ResetMlComponentsAction.java} (94%) rename x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/{TransportResetAuditorAction.java => TransportResetMlComponentsAction.java} (55%) diff --git a/muted-tests.yml b/muted-tests.yml index 3d471bd6a24db..e5461e7c8499d 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -146,18 +146,12 @@ tests: - class: org.elasticsearch.xpack.test.rest.XPackRestIT method: test {p0=snapshot/10_basic/Create a source only snapshot and then restore it} issue: https://github.com/elastic/elasticsearch/issues/122755 -- class: org.elasticsearch.smoketest.MlWithSecurityIT - method: test {yaml=ml/data_frame_analytics_crud/Test get stats given multiple analytics} - issue: https://github.com/elastic/elasticsearch/issues/123034 - class: org.elasticsearch.indices.recovery.IndexRecoveryIT method: testSourceThrottling issue: https://github.com/elastic/elasticsearch/issues/123680 - class: org.elasticsearch.smoketest.MlWithSecurityIT method: test {yaml=ml/3rd_party_deployment/Test start deployment fails while model download in progress} issue: https://github.com/elastic/elasticsearch/issues/120814 -- class: org.elasticsearch.smoketest.MlWithSecurityIT - method: test {yaml=ml/start_data_frame_analytics/Test start classification analysis when the dependent variable is missing} - issue: https://github.com/elastic/elasticsearch/issues/124168 - class: org.elasticsearch.smoketest.MlWithSecurityIT method: test {yaml=ml/3rd_party_deployment/Test start and stop multiple deployments} issue: https://github.com/elastic/elasticsearch/issues/124315 @@ -170,15 +164,6 @@ tests: - class: org.elasticsearch.packaging.test.BootstrapCheckTests method: test10Install issue: https://github.com/elastic/elasticsearch/issues/124957 -- class: org.elasticsearch.smoketest.MlWithSecurityIT - method: test {yaml=ml/data_frame_analytics_crud/Test get stats on newly created config} - issue: https://github.com/elastic/elasticsearch/issues/121726 -- class: org.elasticsearch.smoketest.MlWithSecurityIT - method: test {yaml=ml/data_frame_analytics_cat_apis/Test cat data frame analytics all jobs with header and column selection} - issue: https://github.com/elastic/elasticsearch/issues/125641 -- class: org.elasticsearch.smoketest.MlWithSecurityIT - method: test {yaml=ml/data_frame_analytics_cat_apis/Test cat data frame analytics single job with header} - issue: https://github.com/elastic/elasticsearch/issues/125642 - class: org.elasticsearch.xpack.test.rest.XPackRestIT method: test {p0=transform/transforms_start_stop/Test schedule_now on an already started transform} issue: https://github.com/elastic/elasticsearch/issues/120720 @@ -188,9 +173,6 @@ tests: - class: org.elasticsearch.xpack.core.common.notifications.AbstractAuditorTests method: testRecreateTemplateWhenDeleted issue: https://github.com/elastic/elasticsearch/issues/123232 -- class: org.elasticsearch.xpack.test.rest.XPackRestIT - method: test {p0=ml/start_data_frame_analytics/Test start given dest index is not empty} - issue: https://github.com/elastic/elasticsearch/issues/125909 - class: org.elasticsearch.xpack.test.rest.XPackRestIT method: test {p0=transform/transforms_stats/Test get transform stats with timeout} issue: https://github.com/elastic/elasticsearch/issues/125975 @@ -206,15 +188,6 @@ tests: - class: org.elasticsearch.xpack.test.rest.XPackRestIT method: test {p0=transform/transforms_stats/Test get transform stats} issue: https://github.com/elastic/elasticsearch/issues/126270 -- class: org.elasticsearch.xpack.test.rest.XPackRestIT - method: test {p0=ml/start_data_frame_analytics/Test start classification analysis when the dependent variable cardinality is too low} - issue: https://github.com/elastic/elasticsearch/issues/126299 -- class: org.elasticsearch.smoketest.MlWithSecurityIT - method: test {yaml=ml/start_data_frame_analytics/Test start classification analysis when the dependent variable cardinality is too low} - issue: https://github.com/elastic/elasticsearch/issues/123200 -- class: org.elasticsearch.smoketest.MlWithSecurityIT - method: test {yaml=ml/trained_model_cat_apis/Test cat trained models} - issue: https://github.com/elastic/elasticsearch/issues/125750 - class: org.elasticsearch.xpack.test.rest.XPackRestIT method: test {p0=transform/transforms_start_stop/Test start/stop only starts/stops specified transform} issue: https://github.com/elastic/elasticsearch/issues/126466 @@ -251,9 +224,6 @@ tests: - class: org.elasticsearch.cli.keystore.AddStringKeyStoreCommandTests method: testStdinWithMultipleValues issue: https://github.com/elastic/elasticsearch/issues/126882 -- class: org.elasticsearch.xpack.test.rest.XPackRestIT - method: test {p0=ml/data_frame_analytics_cat_apis/Test cat data frame analytics all jobs with header} - issue: https://github.com/elastic/elasticsearch/issues/127625 - class: org.elasticsearch.xpack.ccr.action.ShardFollowTaskReplicationTests method: testChangeFollowerHistoryUUID issue: https://github.com/elastic/elasticsearch/issues/127680 @@ -336,9 +306,6 @@ tests: - class: org.elasticsearch.packaging.test.DockerTests method: test171AdditionalCliOptionsAreForwarded issue: https://github.com/elastic/elasticsearch/issues/120925 -- class: org.elasticsearch.xpack.test.rest.XPackRestIT - method: test {p0=ml/delete_expired_data/Test delete expired data with body parameters} - issue: https://github.com/elastic/elasticsearch/issues/131364 - class: org.elasticsearch.packaging.test.DockerTests method: test070BindMountCustomPathConfAndJvmOptions issue: https://github.com/elastic/elasticsearch/issues/131366 diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/ResetAuditorAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/ResetMlComponentsAction.java similarity index 94% rename from x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/ResetAuditorAction.java rename to x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/ResetMlComponentsAction.java index dd5cbaf29bc1f..3ecda2674e36f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/ResetAuditorAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/ResetMlComponentsAction.java @@ -22,12 +22,12 @@ import java.util.List; import java.util.Objects; -public class ResetAuditorAction extends ActionType { +public class ResetMlComponentsAction extends ActionType { - public static final ResetAuditorAction INSTANCE = new ResetAuditorAction(); + public static final ResetMlComponentsAction INSTANCE = new ResetMlComponentsAction(); public static final String NAME = "cluster:internal/xpack/ml/auditor/reset"; - private ResetAuditorAction() { + private ResetMlComponentsAction() { super(NAME); } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java index 50b9597bb0326..fa176f2ce92b3 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java @@ -105,7 +105,6 @@ public void testDeleteExpiredData_GivenNothingToDelete() throws Exception { client().execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).get(); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/62699") public void testDeleteExpiredDataNoThrottle() throws Exception { testExpiredDeletion(null, 10010); } @@ -152,7 +151,6 @@ public void testDeleteExpiredDataActionDeletesEmptyStateIndices() throws Excepti ); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/62699") public void testDeleteExpiredDataWithStandardThrottle() throws Exception { testExpiredDeletion(-1.0f, 100); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 704b0d7634db4..8dda503c01982 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -160,8 +160,8 @@ import org.elasticsearch.xpack.core.ml.action.PutTrainedModelAliasAction; import org.elasticsearch.xpack.core.ml.action.PutTrainedModelDefinitionPartAction; import org.elasticsearch.xpack.core.ml.action.PutTrainedModelVocabularyAction; -import org.elasticsearch.xpack.core.ml.action.ResetAuditorAction; import org.elasticsearch.xpack.core.ml.action.ResetJobAction; +import org.elasticsearch.xpack.core.ml.action.ResetMlComponentsAction; import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction; import org.elasticsearch.xpack.core.ml.action.SetResetModeAction; import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction; @@ -271,8 +271,8 @@ import org.elasticsearch.xpack.ml.action.TransportPutTrainedModelAliasAction; import org.elasticsearch.xpack.ml.action.TransportPutTrainedModelDefinitionPartAction; import org.elasticsearch.xpack.ml.action.TransportPutTrainedModelVocabularyAction; -import org.elasticsearch.xpack.ml.action.TransportResetAuditorAction; import org.elasticsearch.xpack.ml.action.TransportResetJobAction; +import org.elasticsearch.xpack.ml.action.TransportResetMlComponentsAction; import org.elasticsearch.xpack.ml.action.TransportRevertModelSnapshotAction; import org.elasticsearch.xpack.ml.action.TransportSetResetModeAction; import org.elasticsearch.xpack.ml.action.TransportSetUpgradeModeAction; @@ -805,7 +805,7 @@ public void loadExtensions(ExtensionLoader loader) { private final SetOnce learningToRankService = new SetOnce<>(); private final SetOnce mlAutoscalingDeciderService = new SetOnce<>(); private final SetOnce deploymentManager = new SetOnce<>(); - private final SetOnce trainedModelAllocationClusterServiceSetOnce = new SetOnce<>(); + private final SetOnce trainedModelAllocationClusterService = new SetOnce<>(); private final SetOnce machineLearningExtension = new SetOnce<>(); @@ -1315,7 +1315,7 @@ public Collection createComponents(PluginServices services) { clusterService, threadPool ); - trainedModelAllocationClusterServiceSetOnce.set( + trainedModelAllocationClusterService.set( new TrainedModelAssignmentClusterService( settings, clusterService, @@ -1391,7 +1391,8 @@ public Collection createComponents(PluginServices services) { trainedModelCacheMetadataService, trainedModelProvider, trainedModelAssignmentService, - trainedModelAllocationClusterServiceSetOnce.get(), + trainedModelAllocationClusterService.get(), + trainedModelStatsService, deploymentManager.get(), nodeAvailabilityZoneMapper, new MachineLearningExtensionHolder(machineLearningExtension.get()), @@ -1564,7 +1565,7 @@ public List getActions() { actionHandlers.add(new ActionHandler(MlMemoryAction.INSTANCE, TransportMlMemoryAction.class)); actionHandlers.add(new ActionHandler(SetUpgradeModeAction.INSTANCE, TransportSetUpgradeModeAction.class)); actionHandlers.add(new ActionHandler(SetResetModeAction.INSTANCE, TransportSetResetModeAction.class)); - actionHandlers.add(new ActionHandler(ResetAuditorAction.INSTANCE, TransportResetAuditorAction.class)); + actionHandlers.add(new ActionHandler(ResetMlComponentsAction.INSTANCE, TransportResetMlComponentsAction.class)); // Included in this section as it's used by MlMemoryAction actionHandlers.add(new ActionHandler(TrainedModelCacheInfoAction.INSTANCE, TransportTrainedModelCacheInfoAction.class)); actionHandlers.add(new ActionHandler(GetMlAutoscalingStats.INSTANCE, TransportGetMlAutoscalingStats.class)); @@ -2176,17 +2177,17 @@ public void cleanUpFeature( }); ActionListener resetAuditors = ActionListener.wrap(success -> { - // reset the auditors as aliases used may be removed + // reset components, such as the auditors the trained model stats queue client.execute( - ResetAuditorAction.INSTANCE, - ResetAuditorAction.Request.RESET_AUDITOR_REQUEST, + ResetMlComponentsAction.INSTANCE, + ResetMlComponentsAction.Request.RESET_AUDITOR_REQUEST, ActionListener.wrap(ignored -> unsetResetModeListener.onResponse(success), unsetResetModeListener::onFailure) ); }, failure -> { logger.error("failed to reset machine learning", failure); client.execute( - ResetAuditorAction.INSTANCE, - ResetAuditorAction.Request.RESET_AUDITOR_REQUEST, + ResetMlComponentsAction.INSTANCE, + ResetMlComponentsAction.Request.RESET_AUDITOR_REQUEST, ActionListener.wrap(ignored -> unsetResetModeListener.onFailure(failure), unsetResetModeListener::onFailure) ); }); @@ -2321,11 +2322,11 @@ public void cleanUpFeature( ); client.execute(CancelJobModelSnapshotUpgradeAction.INSTANCE, cancelSnapshotUpgradesReq, delegate); }).delegateFailureAndWrap((delegate, acknowledgedResponse) -> { - if (trainedModelAllocationClusterServiceSetOnce.get() == null || machineLearningExtension.get().isNlpEnabled() == false) { + if (trainedModelAllocationClusterService.get() == null || machineLearningExtension.get().isNlpEnabled() == false) { delegate.onResponse(AcknowledgedResponse.TRUE); return; } - trainedModelAllocationClusterServiceSetOnce.get().removeAllModelAssignments(delegate); + trainedModelAllocationClusterService.get().removeAllModelAssignments(delegate); }); // validate no pipelines are using machine learning models diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java index 7d918ebeaf5a1..fdd64822a7a02 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java @@ -61,6 +61,7 @@ import org.elasticsearch.xpack.ml.utils.persistence.MlParserUtils; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.List; @@ -278,7 +279,7 @@ private void searchStats(DataFrameAnalyticsConfig config, TaskId parentTaskId, A () -> format( "[%s] Item failure encountered during multi search for request [indices=%s, source=%s]: %s", config.getId(), - itemRequest.indices(), + Arrays.toString(itemRequest.indices()), itemRequest.source(), itemResponse.getFailureMessage() ), diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportResetAuditorAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportResetMlComponentsAction.java similarity index 55% rename from x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportResetAuditorAction.java rename to x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportResetMlComponentsAction.java index a8f0daca2274d..748de68b8c46c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportResetAuditorAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportResetMlComponentsAction.java @@ -17,7 +17,8 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.ml.action.ResetAuditorAction; +import org.elasticsearch.xpack.core.ml.action.ResetMlComponentsAction; +import org.elasticsearch.xpack.ml.inference.TrainedModelStatsService; import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor; import org.elasticsearch.xpack.ml.notifications.InferenceAuditor; @@ -25,64 +26,68 @@ import java.io.IOException; import java.util.List; -public class TransportResetAuditorAction extends TransportNodesAction< - ResetAuditorAction.Request, - ResetAuditorAction.Response, - ResetAuditorAction.NodeRequest, - ResetAuditorAction.Response.ResetResponse, +public class TransportResetMlComponentsAction extends TransportNodesAction< + ResetMlComponentsAction.Request, + ResetMlComponentsAction.Response, + ResetMlComponentsAction.NodeRequest, + ResetMlComponentsAction.Response.ResetResponse, Void> { private final AnomalyDetectionAuditor anomalyDetectionAuditor; private final DataFrameAnalyticsAuditor dfaAuditor; private final InferenceAuditor inferenceAuditor; + private final TrainedModelStatsService trainedModelStatsService; @Inject - public TransportResetAuditorAction( + public TransportResetMlComponentsAction( ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters, AnomalyDetectionAuditor anomalyDetectionAuditor, DataFrameAnalyticsAuditor dfaAuditor, - InferenceAuditor inferenceAuditor + InferenceAuditor inferenceAuditor, + TrainedModelStatsService trainedModelStatsService ) { super( - ResetAuditorAction.NAME, + ResetMlComponentsAction.NAME, clusterService, transportService, actionFilters, - ResetAuditorAction.NodeRequest::new, + ResetMlComponentsAction.NodeRequest::new, threadPool.executor(ThreadPool.Names.MANAGEMENT) ); this.anomalyDetectionAuditor = anomalyDetectionAuditor; this.dfaAuditor = dfaAuditor; this.inferenceAuditor = inferenceAuditor; + this.trainedModelStatsService = trainedModelStatsService; } @Override - protected ResetAuditorAction.Response newResponse( - ResetAuditorAction.Request request, - List resetResponses, + protected ResetMlComponentsAction.Response newResponse( + ResetMlComponentsAction.Request request, + List resetResponses, List failures ) { - return new ResetAuditorAction.Response(clusterService.getClusterName(), resetResponses, failures); + return new ResetMlComponentsAction.Response(clusterService.getClusterName(), resetResponses, failures); } @Override - protected ResetAuditorAction.NodeRequest newNodeRequest(ResetAuditorAction.Request request) { - return new ResetAuditorAction.NodeRequest(); + protected ResetMlComponentsAction.NodeRequest newNodeRequest(ResetMlComponentsAction.Request request) { + return new ResetMlComponentsAction.NodeRequest(); } @Override - protected ResetAuditorAction.Response.ResetResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException { - return new ResetAuditorAction.Response.ResetResponse(in); + protected ResetMlComponentsAction.Response.ResetResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException { + return new ResetMlComponentsAction.Response.ResetResponse(in); } @Override - protected ResetAuditorAction.Response.ResetResponse nodeOperation(ResetAuditorAction.NodeRequest request, Task task) { + protected ResetMlComponentsAction.Response.ResetResponse nodeOperation(ResetMlComponentsAction.NodeRequest request, Task task) { anomalyDetectionAuditor.reset(); dfaAuditor.reset(); inferenceAuditor.reset(); - return new ResetAuditorAction.Response.ResetResponse(clusterService.localNode(), true); + trainedModelStatsService.clearQueue(); + return new ResetMlComponentsAction.Response.ResetResponse(clusterService.localNode(), true); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/TrainedModelStatsService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/TrainedModelStatsService.java index 4fc606f1513c2..e8f7fde64640d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/TrainedModelStatsService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/TrainedModelStatsService.java @@ -295,4 +295,8 @@ static UpdateRequest buildUpdateRequest(InferenceStats stats) { } return null; } + + public void clearQueue() { + statsQueue.clear(); + } }