Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package org.elasticsearch.xpack.search;

import org.apache.http.HttpEntity;
import org.apache.http.util.EntityUtils;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.config.Configurator;
import org.elasticsearch.client.Request;
Expand All @@ -16,7 +18,6 @@
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.ErrorTraceHelper;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.MockLog;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
Expand All @@ -31,7 +32,7 @@
reason = "testing debug log output to identify race condition",
value = "org.elasticsearch.xpack.search.MutableSearchResponse:DEBUG,org.elasticsearch.xpack.search.AsyncSearchTask:DEBUG"
)
public class AsyncSearchErrorTraceIT extends ESIntegTestCase {
public class AsyncSearchErrorTraceIT extends AsyncSearchIntegTestCase {

@Override
protected boolean addMockHttpTransport() {
Expand Down Expand Up @@ -76,10 +77,15 @@ public void testAsyncSearchFailingQueryErrorTraceDefault() throws Exception {
createAsyncRequest.addParameter("keep_on_completion", "true");
createAsyncRequest.addParameter("wait_for_completion_timeout", "0ms");
Map<String, Object> createAsyncResponseEntity = performRequestAndGetResponseEntity(createAsyncRequest);
if (Boolean.TRUE.equals(createAsyncResponseEntity.get("is_running"))) {
String asyncExecutionId = (String) createAsyncResponseEntity.get("id");
Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId);
awaitAsyncRequestDoneRunning(getAsyncRequest);

try {
if (Boolean.TRUE.equals(createAsyncResponseEntity.get("is_running"))) {
String asyncExecutionId = (String) createAsyncResponseEntity.get("id");
Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId);
awaitAsyncRequestDoneRunning(getAsyncRequest);
}
} finally {
deleteAsyncSearchIfPresent(createAsyncResponseEntity);
}
// check that the stack trace was not sent from the data node to the coordinating node
ErrorTraceHelper.assertStackTraceCleared(internalCluster());
Expand All @@ -103,11 +109,16 @@ public void testAsyncSearchFailingQueryErrorTraceTrue() throws Exception {
createAsyncRequest.addParameter("keep_on_completion", "true");
createAsyncRequest.addParameter("wait_for_completion_timeout", "0ms");
Map<String, Object> createAsyncResponseEntity = performRequestAndGetResponseEntity(createAsyncRequest);
if (Boolean.TRUE.equals(createAsyncResponseEntity.get("is_running"))) {
String asyncExecutionId = (String) createAsyncResponseEntity.get("id");
Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId);
getAsyncRequest.addParameter("error_trace", "true");
awaitAsyncRequestDoneRunning(getAsyncRequest);

try {
if (Boolean.TRUE.equals(createAsyncResponseEntity.get("is_running"))) {
String asyncExecutionId = (String) createAsyncResponseEntity.get("id");
Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId);
getAsyncRequest.addParameter("error_trace", "true");
awaitAsyncRequestDoneRunning(getAsyncRequest);
}
} finally {
deleteAsyncSearchIfPresent(createAsyncResponseEntity);
}
// check that the stack trace was sent from the data node to the coordinating node
ErrorTraceHelper.assertStackTraceObserved(internalCluster());
Expand All @@ -131,11 +142,16 @@ public void testAsyncSearchFailingQueryErrorTraceFalse() throws Exception {
createAsyncRequest.addParameter("keep_on_completion", "true");
createAsyncRequest.addParameter("wait_for_completion_timeout", "0ms");
Map<String, Object> createAsyncResponseEntity = performRequestAndGetResponseEntity(createAsyncRequest);
if (Boolean.TRUE.equals(createAsyncResponseEntity.get("is_running"))) {
String asyncExecutionId = (String) createAsyncResponseEntity.get("id");
Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId);
getAsyncRequest.addParameter("error_trace", "false");
awaitAsyncRequestDoneRunning(getAsyncRequest);

try {
if (Boolean.TRUE.equals(createAsyncResponseEntity.get("is_running"))) {
String asyncExecutionId = (String) createAsyncResponseEntity.get("id");
Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId);
getAsyncRequest.addParameter("error_trace", "false");
awaitAsyncRequestDoneRunning(getAsyncRequest);
}
} finally {
deleteAsyncSearchIfPresent(createAsyncResponseEntity);
}
// check that the stack trace was not sent from the data node to the coordinating node
ErrorTraceHelper.assertStackTraceCleared(internalCluster());
Expand Down Expand Up @@ -172,19 +188,24 @@ public void testDataNodeLogsStackTrace() throws Exception {
try (var mockLog = MockLog.capture(SearchService.class)) {
ErrorTraceHelper.addSeenLoggingExpectations(numShards, mockLog, errorTriggeringIndex);
Map<String, Object> createAsyncResponseEntity = performRequestAndGetResponseEntity(createAsyncRequest);
if (Boolean.TRUE.equals(createAsyncResponseEntity.get("is_running"))) {
String asyncExecutionId = (String) createAsyncResponseEntity.get("id");
Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId);
// Use the same value of error_trace as the search request
if (errorTraceValue == 0) {
getAsyncRequest.addParameter("error_trace", "true");
} else if (errorTraceValue == 1) {
getAsyncRequest.addParameter("error_trace", "false");
} // else empty
awaitAsyncRequestDoneRunning(getAsyncRequest);
}

mockLog.assertAllExpectationsMatched();
try {
if (Boolean.TRUE.equals(createAsyncResponseEntity.get("is_running"))) {
String asyncExecutionId = (String) createAsyncResponseEntity.get("id");
Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId);
// Use the same value of error_trace as the search request
if (errorTraceValue == 0) {
getAsyncRequest.addParameter("error_trace", "true");
} else if (errorTraceValue == 1) {
getAsyncRequest.addParameter("error_trace", "false");
} // else empty
awaitAsyncRequestDoneRunning(getAsyncRequest);
}

mockLog.assertAllExpectationsMatched();
} finally {
deleteAsyncSearchIfPresent(createAsyncResponseEntity);
}
}
}

Expand All @@ -206,11 +227,16 @@ public void testAsyncSearchFailingQueryErrorTraceFalseOnSubmitAndTrueOnGet() thr
createAsyncSearchRequest.addParameter("keep_on_completion", "true");
createAsyncSearchRequest.addParameter("wait_for_completion_timeout", "0ms");
Map<String, Object> createAsyncResponseEntity = performRequestAndGetResponseEntity(createAsyncSearchRequest);
if (Boolean.TRUE.equals(createAsyncResponseEntity.get("is_running"))) {
String asyncExecutionId = (String) createAsyncResponseEntity.get("id");
Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId);
getAsyncRequest.addParameter("error_trace", "true");
awaitAsyncRequestDoneRunning(getAsyncRequest);

try {
if (Boolean.TRUE.equals(createAsyncResponseEntity.get("is_running"))) {
String asyncExecutionId = (String) createAsyncResponseEntity.get("id");
Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId);
getAsyncRequest.addParameter("error_trace", "true");
awaitAsyncRequestDoneRunning(getAsyncRequest);
}
} finally {
deleteAsyncSearchIfPresent(createAsyncResponseEntity);
}
// check that the stack trace was not sent from the data node to the coordinating node
ErrorTraceHelper.assertStackTraceCleared(internalCluster());
Expand All @@ -234,11 +260,15 @@ public void testAsyncSearchFailingQueryErrorTraceTrueOnSubmitAndFalseOnGet() thr
createAsyncSearchRequest.addParameter("keep_on_completion", "true");
createAsyncSearchRequest.addParameter("wait_for_completion_timeout", "0ms");
Map<String, Object> createAsyncResponseEntity = performRequestAndGetResponseEntity(createAsyncSearchRequest);
if (Boolean.TRUE.equals(createAsyncResponseEntity.get("is_running"))) {
String asyncExecutionId = (String) createAsyncResponseEntity.get("id");
Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId);
getAsyncRequest.addParameter("error_trace", "false");
awaitAsyncRequestDoneRunning(getAsyncRequest);
try {
if (Boolean.TRUE.equals(createAsyncResponseEntity.get("is_running"))) {
String asyncExecutionId = (String) createAsyncResponseEntity.get("id");
Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId);
getAsyncRequest.addParameter("error_trace", "false");
awaitAsyncRequestDoneRunning(getAsyncRequest);
}
} finally {
deleteAsyncSearchIfPresent(createAsyncResponseEntity);
}
// check that the stack trace was sent from the data node to the coordinating node
ErrorTraceHelper.assertStackTraceObserved(internalCluster());
Expand All @@ -250,6 +280,26 @@ private Map<String, Object> performRequestAndGetResponseEntity(Request r) throws
return XContentHelper.convertToMap(entityContentType.xContent(), response.getEntity().getContent(), false);
}

private void deleteAsyncSearchIfPresent(Map<String, Object> map) throws IOException {
String id = (String) map.get("id");
if (id == null) {
return;
}

// Make sure the .async-search system index is green before deleting it
try {
ensureGreen(".async-search");
} catch (Exception ignore) {
// the index may not exist
}

Response response = getRestClient().performRequest(new Request("DELETE", "/_async_search/" + id));
HttpEntity entity = response.getEntity();
if (entity != null) {
EntityUtils.consumeQuietly(entity);
}
}

private void awaitAsyncRequestDoneRunning(Request getAsyncRequest) throws Exception {
assertBusy(() -> {
Map<String, Object> getAsyncResponseEntity = performRequestAndGetResponseEntity(getAsyncRequest);
Expand Down