Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.test.TestClustersThreadFilter;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.rest.TestFeatureService;
Expand All @@ -36,6 +38,7 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
Expand Down Expand Up @@ -81,6 +84,8 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase {
private static RestClient remoteClusterClient;
private static DataLocation dataLocation = null;

private static final Logger LOGGER = LogManager.getLogger(MultiClusterSpecIT.class);

@ParametersFactory(argumentFormatting = "csv-spec:%2$s.%3$s")
public static List<Object[]> readScriptSpec() throws Exception {
List<URL> urls = classpathResources("/*.csv-spec");
Expand All @@ -96,7 +101,16 @@ public MultiClusterSpecIT(
CsvTestCase testCase,
String instructions
) {
super(fileName, groupName, testName, lineNumber, convertToRemoteIndices(testCase), instructions);
super(
fileName,
groupName,
testName,
lineNumber,
testCase.requiredCapabilities.contains(SUBQUERY_IN_FROM_COMMAND.capabilityName())
? convertSubqueryToRemoteIndices(testCase)
: convertToRemoteIndices(testCase),
instructions
);
}

// TODO: think how to handle this better
Expand Down Expand Up @@ -168,12 +182,6 @@ protected void shouldSkipTest(String testName) throws IOException {
assumeFalse("LOOKUP JOIN after SORT not yet supported in CCS", testName.contains("OnTheCoordinator"));

assumeFalse("FORK not yet supported with CCS", testCase.requiredCapabilities.contains(FORK_V9.capabilityName()));

// And convertToRemoteIndices does not generate correct queries with subqueries in the FROM command yet
assumeFalse(
"Subqueries in FROM command not yet supported in CCS",
testCase.requiredCapabilities.contains(SUBQUERY_IN_FROM_COMMAND.capabilityName())
);
}

private TestFeatureService remoteFeaturesService() throws IOException {
Expand Down Expand Up @@ -399,6 +407,120 @@ private static String unquote(String index, int numOfQuotes) {
return index.substring(numOfQuotes, index.length() - numOfQuotes);
}

/**
* Convert index patterns and subqueries in FROM commands to use remote indices for a given test case.
*/
private static CsvSpecReader.CsvTestCase convertSubqueryToRemoteIndices(CsvSpecReader.CsvTestCase testCase) {
if (dataLocation == null) {
dataLocation = randomFrom(DataLocation.values());
}
String query = testCase.query;
testCase.query = convertSubqueryToRemoteIndices(query);
return testCase;
}

/**
* Convert index patterns and subqueries in FROM commands to use remote indices.
*/
private static String convertSubqueryToRemoteIndices(String testQuery) {
String query = testQuery;
// find the main from command, ignoring pipes inside subqueries
List<String> mainFromCommandAndTheRest = splitIgnoringParentheses(query, "|");
String mainFrom = mainFromCommandAndTheRest.get(0).strip();
List<String> theRest = mainFromCommandAndTheRest.size() > 1
? mainFromCommandAndTheRest.subList(1, mainFromCommandAndTheRest.size())
: List.of();
// check for metadata in the main from command
List<String> mainFromCommandWithMetadata = splitIgnoringParentheses(mainFrom, "metadata");
mainFrom = mainFromCommandWithMetadata.get(0).strip();
// if there is metadata, we need to add it back later
String metadata = mainFromCommandWithMetadata.size() > 1 ? " metadata " + mainFromCommandWithMetadata.get(1) : "";
// the main from command could be a comma separated list of index patterns, and subqueries
List<String> indexPatternsAndSubqueries = splitIgnoringParentheses(mainFrom, ",");
List<String> transformed = new ArrayList<>();
for (String indexPatternOrSubquery : indexPatternsAndSubqueries) {
// remove the from keyword if it's there
indexPatternOrSubquery = indexPatternOrSubquery.strip();
if (indexPatternOrSubquery.toLowerCase(Locale.ROOT).startsWith("from ")) {
indexPatternOrSubquery = indexPatternOrSubquery.strip().substring(5);
}
// substitute the index patterns or subquery with remote index patterns
if (isSubquery(indexPatternOrSubquery)) {
// it's a subquery, we need to process it recursively
String subquery = indexPatternOrSubquery.substring(1, indexPatternOrSubquery.length() - 1);
String transformedSubquery = convertSubqueryToRemoteIndices(subquery);
transformed.add("(" + transformedSubquery + ")");
} else {
// It's an index pattern, we need to convert it to remote index pattern.
// indexPatternOrSubquery could be a comma separated list of indices, we need to process each index separately
List<String> indexPatterns = splitIgnoringParentheses(indexPatternOrSubquery, ",");
String remoteIndex = indexPatterns.stream()
.map(String::strip)
// Data is loaded on one of the clusters not both, remote only may not guarantee data is found on the remote cluster.
.map(index -> unquoteAndRequoteAsRemote(index, false))
.collect(Collectors.joining(","));
transformed.add(remoteIndex);
}
}
// rebuild from command from transformed index patterns and subqueries
String transformedFrom = "from " + String.join(", ", transformed) + metadata;
// rebuild the whole query
testQuery = transformedFrom + (theRest.isEmpty() ? "" : " | " + String.join(" | ", theRest));

LOGGER.trace("Transform query: \nFROM: {}\nTO: {}", query, testQuery);
return testQuery;
}

/**
* Checks if the given string is a subquery (enclosed in parentheses).
*/
private static boolean isSubquery(String indexPatternOrSubquery) {
String trimmed = indexPatternOrSubquery.strip();
return trimmed.startsWith("(") && trimmed.endsWith(")");
}

/**
* Splits the input string by the given delimiter, ignoring delimiters inside parentheses.
*/
public static List<String> splitIgnoringParentheses(String input, String delimiter) {
List<String> results = new ArrayList<>();
if (input == null || input.isEmpty()) return results;

int depth = 0; // parentheses nesting
int lastSplit = 0;
int delimiterLength = delimiter.length();

for (int i = 0; i <= input.length() - delimiterLength; i++) {
char c = input.charAt(i);

if (c == '(') {
depth++;
} else if (c == ')') {
if (depth > 0) depth--;
}

// check delimiter only outside parentheses
if (depth == 0) {
boolean match;
if (delimiter.length() == 1) {
match = c == delimiter.charAt(0);
} else {
match = input.regionMatches(true, i, delimiter, 0, delimiterLength);
}

if (match) {
results.add(input.substring(lastSplit, i).trim());
lastSplit = i + delimiterLength;
i += delimiterLength - 1; // skip the delimiter
}
}
}
// add remaining part
results.add(input.substring(lastSplit).trim());

return results;
}

@Override
protected boolean enableRoundingDoubleValuesOnAsserting() {
return true;
Expand Down
Loading