diff --git a/muted-tests.yml b/muted-tests.yml index 3d471bd6a24db..e5461e7c8499d 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -146,18 +146,12 @@ tests: - class: org.elasticsearch.xpack.test.rest.XPackRestIT method: test {p0=snapshot/10_basic/Create a source only snapshot and then restore it} issue: https://github.com/elastic/elasticsearch/issues/122755 -- class: org.elasticsearch.smoketest.MlWithSecurityIT - method: test {yaml=ml/data_frame_analytics_crud/Test get stats given multiple analytics} - issue: https://github.com/elastic/elasticsearch/issues/123034 - class: org.elasticsearch.indices.recovery.IndexRecoveryIT method: testSourceThrottling issue: https://github.com/elastic/elasticsearch/issues/123680 - class: org.elasticsearch.smoketest.MlWithSecurityIT method: test {yaml=ml/3rd_party_deployment/Test start deployment fails while model download in progress} issue: https://github.com/elastic/elasticsearch/issues/120814 -- class: org.elasticsearch.smoketest.MlWithSecurityIT - method: test {yaml=ml/start_data_frame_analytics/Test start classification analysis when the dependent variable is missing} - issue: https://github.com/elastic/elasticsearch/issues/124168 - class: org.elasticsearch.smoketest.MlWithSecurityIT method: test {yaml=ml/3rd_party_deployment/Test start and stop multiple deployments} issue: https://github.com/elastic/elasticsearch/issues/124315 @@ -170,15 +164,6 @@ tests: - class: org.elasticsearch.packaging.test.BootstrapCheckTests method: test10Install issue: https://github.com/elastic/elasticsearch/issues/124957 -- class: org.elasticsearch.smoketest.MlWithSecurityIT - method: test {yaml=ml/data_frame_analytics_crud/Test get stats on newly created config} - issue: https://github.com/elastic/elasticsearch/issues/121726 -- class: org.elasticsearch.smoketest.MlWithSecurityIT - method: test {yaml=ml/data_frame_analytics_cat_apis/Test cat data frame analytics all jobs with header and column selection} - issue: https://github.com/elastic/elasticsearch/issues/125641 -- class: org.elasticsearch.smoketest.MlWithSecurityIT - method: test {yaml=ml/data_frame_analytics_cat_apis/Test cat data frame analytics single job with header} - issue: https://github.com/elastic/elasticsearch/issues/125642 - class: org.elasticsearch.xpack.test.rest.XPackRestIT method: test {p0=transform/transforms_start_stop/Test schedule_now on an already started transform} issue: https://github.com/elastic/elasticsearch/issues/120720 @@ -188,9 +173,6 @@ tests: - class: org.elasticsearch.xpack.core.common.notifications.AbstractAuditorTests method: testRecreateTemplateWhenDeleted issue: https://github.com/elastic/elasticsearch/issues/123232 -- class: org.elasticsearch.xpack.test.rest.XPackRestIT - method: test {p0=ml/start_data_frame_analytics/Test start given dest index is not empty} - issue: https://github.com/elastic/elasticsearch/issues/125909 - class: org.elasticsearch.xpack.test.rest.XPackRestIT method: test {p0=transform/transforms_stats/Test get transform stats with timeout} issue: https://github.com/elastic/elasticsearch/issues/125975 @@ -206,15 +188,6 @@ tests: - class: org.elasticsearch.xpack.test.rest.XPackRestIT method: test {p0=transform/transforms_stats/Test get transform stats} issue: https://github.com/elastic/elasticsearch/issues/126270 -- class: org.elasticsearch.xpack.test.rest.XPackRestIT - method: test {p0=ml/start_data_frame_analytics/Test start classification analysis when the dependent variable cardinality is too low} - issue: https://github.com/elastic/elasticsearch/issues/126299 -- class: org.elasticsearch.smoketest.MlWithSecurityIT - method: test {yaml=ml/start_data_frame_analytics/Test start classification analysis when the dependent variable cardinality is too low} - issue: https://github.com/elastic/elasticsearch/issues/123200 -- class: org.elasticsearch.smoketest.MlWithSecurityIT - method: test {yaml=ml/trained_model_cat_apis/Test cat trained models} - issue: https://github.com/elastic/elasticsearch/issues/125750 - class: org.elasticsearch.xpack.test.rest.XPackRestIT method: test {p0=transform/transforms_start_stop/Test start/stop only starts/stops specified transform} issue: https://github.com/elastic/elasticsearch/issues/126466 @@ -251,9 +224,6 @@ tests: - class: org.elasticsearch.cli.keystore.AddStringKeyStoreCommandTests method: testStdinWithMultipleValues issue: https://github.com/elastic/elasticsearch/issues/126882 -- class: org.elasticsearch.xpack.test.rest.XPackRestIT - method: test {p0=ml/data_frame_analytics_cat_apis/Test cat data frame analytics all jobs with header} - issue: https://github.com/elastic/elasticsearch/issues/127625 - class: org.elasticsearch.xpack.ccr.action.ShardFollowTaskReplicationTests method: testChangeFollowerHistoryUUID issue: https://github.com/elastic/elasticsearch/issues/127680 @@ -336,9 +306,6 @@ tests: - class: org.elasticsearch.packaging.test.DockerTests method: test171AdditionalCliOptionsAreForwarded issue: https://github.com/elastic/elasticsearch/issues/120925 -- class: org.elasticsearch.xpack.test.rest.XPackRestIT - method: test {p0=ml/delete_expired_data/Test delete expired data with body parameters} - issue: https://github.com/elastic/elasticsearch/issues/131364 - class: org.elasticsearch.packaging.test.DockerTests method: test070BindMountCustomPathConfAndJvmOptions issue: https://github.com/elastic/elasticsearch/issues/131366 diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/ResetMlComponentsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/ResetMlComponentsAction.java new file mode 100644 index 0000000000000..3ecda2674e36f --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/ResetMlComponentsAction.java @@ -0,0 +1,126 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.ml.action; + +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.nodes.BaseNodeResponse; +import org.elasticsearch.action.support.nodes.BaseNodesRequest; +import org.elasticsearch.action.support.nodes.BaseNodesResponse; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.transport.AbstractTransportRequest; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +public class ResetMlComponentsAction extends ActionType { + + public static final ResetMlComponentsAction INSTANCE = new ResetMlComponentsAction(); + public static final String NAME = "cluster:internal/xpack/ml/auditor/reset"; + + private ResetMlComponentsAction() { + super(NAME); + } + + public static class Request extends BaseNodesRequest { + + public static Request RESET_AUDITOR_REQUEST = new Request(); + + private Request() { + super(new String[] { "ml:true" }); // Only ml nodes. See DiscoveryNodes::resolveNodes + } + } + + public static class NodeRequest extends AbstractTransportRequest { + + public NodeRequest(StreamInput in) throws IOException { + super(in); + } + + public NodeRequest() {} + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + return true; + } + + @Override + public int hashCode() { + return Objects.hash(); + } + } + + public static class Response extends BaseNodesResponse { + + public Response(ClusterName clusterName, List nodes, List failures) { + super(clusterName, nodes, failures); + } + + protected Response(StreamInput in) throws IOException { + super(in); + } + + public static class ResetResponse extends BaseNodeResponse { + private final boolean acknowledged; + + public ResetResponse(DiscoveryNode node, boolean acknowledged) { + super(node); + this.acknowledged = acknowledged; + } + + public ResetResponse(StreamInput in) throws IOException { + super(in, null); + acknowledged = in.readBoolean(); + } + + public ResetResponse(StreamInput in, DiscoveryNode node) throws IOException { + super(in, node); + acknowledged = in.readBoolean(); + } + + public boolean isAcknowledged() { + return acknowledged; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBoolean(acknowledged); + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) return false; + ResetResponse that = (ResetResponse) o; + return acknowledged == that.acknowledged; + } + + @Override + public int hashCode() { + return Objects.hashCode(acknowledged); + } + } + + @Override + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readCollectionAsList(ResetResponse::new); + } + + @Override + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeCollection(nodes); + } + } +} diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java index 50b9597bb0326..fa176f2ce92b3 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java @@ -105,7 +105,6 @@ public void testDeleteExpiredData_GivenNothingToDelete() throws Exception { client().execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).get(); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/62699") public void testDeleteExpiredDataNoThrottle() throws Exception { testExpiredDeletion(null, 10010); } @@ -152,7 +151,6 @@ public void testDeleteExpiredDataActionDeletesEmptyStateIndices() throws Excepti ); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/62699") public void testDeleteExpiredDataWithStandardThrottle() throws Exception { testExpiredDeletion(-1.0f, 100); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 83adee27248be..8dda503c01982 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -32,7 +32,6 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Setting; @@ -162,6 +161,7 @@ import org.elasticsearch.xpack.core.ml.action.PutTrainedModelDefinitionPartAction; import org.elasticsearch.xpack.core.ml.action.PutTrainedModelVocabularyAction; import org.elasticsearch.xpack.core.ml.action.ResetJobAction; +import org.elasticsearch.xpack.core.ml.action.ResetMlComponentsAction; import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction; import org.elasticsearch.xpack.core.ml.action.SetResetModeAction; import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction; @@ -272,6 +272,7 @@ import org.elasticsearch.xpack.ml.action.TransportPutTrainedModelDefinitionPartAction; import org.elasticsearch.xpack.ml.action.TransportPutTrainedModelVocabularyAction; import org.elasticsearch.xpack.ml.action.TransportResetJobAction; +import org.elasticsearch.xpack.ml.action.TransportResetMlComponentsAction; import org.elasticsearch.xpack.ml.action.TransportRevertModelSnapshotAction; import org.elasticsearch.xpack.ml.action.TransportSetResetModeAction; import org.elasticsearch.xpack.ml.action.TransportSetUpgradeModeAction; @@ -785,7 +786,6 @@ public void loadExtensions(ExtensionLoader loader) { public static final int MAX_LOW_PRIORITY_MODELS_PER_NODE = 100; private static final Logger logger = LogManager.getLogger(MachineLearning.class); - private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(MachineLearning.class); private final Settings settings; private final boolean enabled; @@ -805,7 +805,7 @@ public void loadExtensions(ExtensionLoader loader) { private final SetOnce learningToRankService = new SetOnce<>(); private final SetOnce mlAutoscalingDeciderService = new SetOnce<>(); private final SetOnce deploymentManager = new SetOnce<>(); - private final SetOnce trainedModelAllocationClusterServiceSetOnce = new SetOnce<>(); + private final SetOnce trainedModelAllocationClusterService = new SetOnce<>(); private final SetOnce machineLearningExtension = new SetOnce<>(); @@ -1315,7 +1315,7 @@ public Collection createComponents(PluginServices services) { clusterService, threadPool ); - trainedModelAllocationClusterServiceSetOnce.set( + trainedModelAllocationClusterService.set( new TrainedModelAssignmentClusterService( settings, clusterService, @@ -1391,7 +1391,8 @@ public Collection createComponents(PluginServices services) { trainedModelCacheMetadataService, trainedModelProvider, trainedModelAssignmentService, - trainedModelAllocationClusterServiceSetOnce.get(), + trainedModelAllocationClusterService.get(), + trainedModelStatsService, deploymentManager.get(), nodeAvailabilityZoneMapper, new MachineLearningExtensionHolder(machineLearningExtension.get()), @@ -1564,6 +1565,7 @@ public List getActions() { actionHandlers.add(new ActionHandler(MlMemoryAction.INSTANCE, TransportMlMemoryAction.class)); actionHandlers.add(new ActionHandler(SetUpgradeModeAction.INSTANCE, TransportSetUpgradeModeAction.class)); actionHandlers.add(new ActionHandler(SetResetModeAction.INSTANCE, TransportSetResetModeAction.class)); + actionHandlers.add(new ActionHandler(ResetMlComponentsAction.INSTANCE, TransportResetMlComponentsAction.class)); // Included in this section as it's used by MlMemoryAction actionHandlers.add(new ActionHandler(TrainedModelCacheInfoAction.INSTANCE, TransportTrainedModelCacheInfoAction.class)); actionHandlers.add(new ActionHandler(GetMlAutoscalingStats.INSTANCE, TransportGetMlAutoscalingStats.class)); @@ -2149,8 +2151,6 @@ public void cleanUpFeature( final Map results = new ConcurrentHashMap<>(); ActionListener unsetResetModeListener = ActionListener.wrap(success -> { - // reset the auditors as aliases used may be removed - resetAuditors(); client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(true), ActionListener.wrap(resetSuccess -> { finalListener.onResponse(success); @@ -2176,8 +2176,24 @@ public void cleanUpFeature( ); }); + ActionListener resetAuditors = ActionListener.wrap(success -> { + // reset components, such as the auditors the trained model stats queue + client.execute( + ResetMlComponentsAction.INSTANCE, + ResetMlComponentsAction.Request.RESET_AUDITOR_REQUEST, + ActionListener.wrap(ignored -> unsetResetModeListener.onResponse(success), unsetResetModeListener::onFailure) + ); + }, failure -> { + logger.error("failed to reset machine learning", failure); + client.execute( + ResetMlComponentsAction.INSTANCE, + ResetMlComponentsAction.Request.RESET_AUDITOR_REQUEST, + ActionListener.wrap(ignored -> unsetResetModeListener.onFailure(failure), unsetResetModeListener::onFailure) + ); + }); + // Stop all model deployments - ActionListener pipelineValidation = unsetResetModeListener.delegateFailureAndWrap( + ActionListener pipelineValidation = resetAuditors.delegateFailureAndWrap( (delegate, listTasksResponse) -> { listTasksResponse.rethrowFailures("Waiting for indexing requests for .ml-* indices"); if (results.values().stream().allMatch(b -> b)) { @@ -2306,11 +2322,11 @@ public void cleanUpFeature( ); client.execute(CancelJobModelSnapshotUpgradeAction.INSTANCE, cancelSnapshotUpgradesReq, delegate); }).delegateFailureAndWrap((delegate, acknowledgedResponse) -> { - if (trainedModelAllocationClusterServiceSetOnce.get() == null || machineLearningExtension.get().isNlpEnabled() == false) { + if (trainedModelAllocationClusterService.get() == null || machineLearningExtension.get().isNlpEnabled() == false) { delegate.onResponse(AcknowledgedResponse.TRUE); return; } - trainedModelAllocationClusterServiceSetOnce.get().removeAllModelAssignments(delegate); + trainedModelAllocationClusterService.get().removeAllModelAssignments(delegate); }); // validate no pipelines are using machine learning models @@ -2332,18 +2348,6 @@ public void cleanUpFeature( client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.enabled(), afterResetModeSet); } - private void resetAuditors() { - if (anomalyDetectionAuditor.get() != null) { - anomalyDetectionAuditor.get().reset(); - } - if (dataFrameAnalyticsAuditor.get() != null) { - dataFrameAnalyticsAuditor.get().reset(); - } - if (inferenceAuditor.get() != null) { - inferenceAuditor.get().reset(); - } - } - @Override public BreakerSettings getCircuitBreaker(Settings settingsToUse) { return BreakerSettings.updateFromSettings( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java index 7d918ebeaf5a1..fdd64822a7a02 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java @@ -61,6 +61,7 @@ import org.elasticsearch.xpack.ml.utils.persistence.MlParserUtils; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.List; @@ -278,7 +279,7 @@ private void searchStats(DataFrameAnalyticsConfig config, TaskId parentTaskId, A () -> format( "[%s] Item failure encountered during multi search for request [indices=%s, source=%s]: %s", config.getId(), - itemRequest.indices(), + Arrays.toString(itemRequest.indices()), itemRequest.source(), itemResponse.getFailureMessage() ), diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportResetMlComponentsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportResetMlComponentsAction.java new file mode 100644 index 0000000000000..748de68b8c46c --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportResetMlComponentsAction.java @@ -0,0 +1,93 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.ml.action; + +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.nodes.TransportNodesAction; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ml.action.ResetMlComponentsAction; +import org.elasticsearch.xpack.ml.inference.TrainedModelStatsService; +import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; +import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor; +import org.elasticsearch.xpack.ml.notifications.InferenceAuditor; + +import java.io.IOException; +import java.util.List; + +public class TransportResetMlComponentsAction extends TransportNodesAction< + ResetMlComponentsAction.Request, + ResetMlComponentsAction.Response, + ResetMlComponentsAction.NodeRequest, + ResetMlComponentsAction.Response.ResetResponse, + Void> { + + private final AnomalyDetectionAuditor anomalyDetectionAuditor; + private final DataFrameAnalyticsAuditor dfaAuditor; + private final InferenceAuditor inferenceAuditor; + private final TrainedModelStatsService trainedModelStatsService; + + @Inject + public TransportResetMlComponentsAction( + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters, + AnomalyDetectionAuditor anomalyDetectionAuditor, + DataFrameAnalyticsAuditor dfaAuditor, + InferenceAuditor inferenceAuditor, + TrainedModelStatsService trainedModelStatsService + ) { + super( + ResetMlComponentsAction.NAME, + clusterService, + transportService, + actionFilters, + ResetMlComponentsAction.NodeRequest::new, + threadPool.executor(ThreadPool.Names.MANAGEMENT) + ); + this.anomalyDetectionAuditor = anomalyDetectionAuditor; + this.dfaAuditor = dfaAuditor; + this.inferenceAuditor = inferenceAuditor; + this.trainedModelStatsService = trainedModelStatsService; + } + + @Override + protected ResetMlComponentsAction.Response newResponse( + ResetMlComponentsAction.Request request, + List resetResponses, + List failures + ) { + return new ResetMlComponentsAction.Response(clusterService.getClusterName(), resetResponses, failures); + } + + @Override + protected ResetMlComponentsAction.NodeRequest newNodeRequest(ResetMlComponentsAction.Request request) { + return new ResetMlComponentsAction.NodeRequest(); + } + + @Override + protected ResetMlComponentsAction.Response.ResetResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException { + return new ResetMlComponentsAction.Response.ResetResponse(in); + } + + @Override + protected ResetMlComponentsAction.Response.ResetResponse nodeOperation(ResetMlComponentsAction.NodeRequest request, Task task) { + anomalyDetectionAuditor.reset(); + dfaAuditor.reset(); + inferenceAuditor.reset(); + trainedModelStatsService.clearQueue(); + return new ResetMlComponentsAction.Response.ResetResponse(clusterService.localNode(), true); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/TrainedModelStatsService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/TrainedModelStatsService.java index 4fc606f1513c2..e8f7fde64640d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/TrainedModelStatsService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/TrainedModelStatsService.java @@ -295,4 +295,8 @@ static UpdateRequest buildUpdateRequest(InferenceStats stats) { } return null; } + + public void clearQueue() { + statsQueue.clear(); + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/AbstractMlAuditor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/AbstractMlAuditor.java index 99b03c2725411..2c9949fac70d4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/AbstractMlAuditor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/AbstractMlAuditor.java @@ -16,13 +16,11 @@ import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentParserConfiguration; import org.elasticsearch.xcontent.json.JsonXContent; import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage; import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessageFactory; import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.notifications.NotificationsIndex; import org.elasticsearch.xpack.ml.MlIndexTemplateRegistry; @@ -33,7 +31,6 @@ abstract class AbstractMlAuditor extends AbstractAuditor { private static final Logger logger = LogManager.getLogger(AbstractMlAuditor.class); - private volatile boolean isResetMode; protected AbstractMlAuditor( Client client, @@ -50,34 +47,6 @@ protected AbstractMlAuditor( indexNameExpressionResolver, clusterService.threadPool().generic() ); - clusterService.addListener(event -> { - if (event.metadataChanged()) { - setResetMode(MlMetadata.getMlMetadata(event.state()).isResetMode()); - } - }); - } - - private void setResetMode(boolean value) { - isResetMode = value; - } - - @Override - protected void indexDoc(ToXContent toXContent) { - if (isResetMode) { - logger.trace("Skipped writing the audit message backlog as reset_mode is enabled"); - } else { - super.indexDoc(toXContent); - } - } - - @Override - protected void writeBacklog() { - if (isResetMode) { - logger.trace("Skipped writing the audit message backlog as reset_mode is enabled"); - clearBacklog(); - } else { - super.writeBacklog(); - } } @Override diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index d992f1b028a3c..08e756842a109 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -329,6 +329,7 @@ public class Constants { "cluster:internal/xpack/inference", "cluster:internal/xpack/inference/rerankwindowsize/get", "cluster:internal/xpack/inference/unified", + "cluster:internal/xpack/ml/auditor/reset", "cluster:internal/xpack/ml/coordinatedinference", "cluster:internal/xpack/ml/datafeed/isolate", "cluster:internal/xpack/ml/datafeed/running_state",