Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions quickwit/quickwit-metastore/src/tests/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -677,9 +677,14 @@ pub async fn test_metastore_list_indexes<MetastoreToTest: MetastoreServiceExt +
let index_uri_4 = format!("ram:///indexes/{index_id_4}");
let index_config_4 = IndexConfig::for_test(&index_id_4, &index_uri_4);

let index_id_5 = format!("my-exact-index-{index_id_fragment}-5");
let index_uri_5 = format!("ram:///indexes/{index_id_5}");
let index_config_5 = IndexConfig::for_test(&index_id_5, &index_uri_5);

let index_id_patterns = vec![
format!("prefix-*-{index_id_fragment}-suffix-*"),
format!("prefix*{index_id_fragment}*suffix-*"),
format!("my-exact-index-{index_id_fragment}-5"),
];
let indexes_count = metastore
.list_indexes_metadata(ListIndexesMetadataRequest { index_id_patterns })
Expand Down Expand Up @@ -715,8 +720,17 @@ pub async fn test_metastore_list_indexes<MetastoreToTest: MetastoreServiceExt +
.unwrap()
.index_uid()
.clone();
let index_uid_5 = metastore
.create_index(CreateIndexRequest::try_from_index_config(&index_config_5).unwrap())
.await
.unwrap()
.index_uid()
.clone();

let index_id_patterns = vec![format!("prefix-*-{index_id_fragment}-suffix-*")];
let index_id_patterns = vec![
format!("prefix-*-{index_id_fragment}-suffix-*"),
format!("my-exact-index-{index_id_fragment}-5"),
];
let indexes_count = metastore
.list_indexes_metadata(ListIndexesMetadataRequest { index_id_patterns })
.await
Expand All @@ -725,12 +739,13 @@ pub async fn test_metastore_list_indexes<MetastoreToTest: MetastoreServiceExt +
.await
.unwrap()
.len();
assert_eq!(indexes_count, 2);
assert_eq!(indexes_count, 3);

cleanup_index(&mut metastore, index_uid_1).await;
cleanup_index(&mut metastore, index_uid_2).await;
cleanup_index(&mut metastore, index_uid_3).await;
cleanup_index(&mut metastore, index_uid_4).await;
cleanup_index(&mut metastore, index_uid_5).await;
}

pub async fn test_metastore_delete_index<
Expand Down
4 changes: 4 additions & 0 deletions quickwit/quickwit-proto/protos/quickwit/search.proto
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,10 @@ message SearchRequest {
optional PartialHit search_after = 16;

CountHits count_hits = 17;

// When an exact index ID is provided (not a pattern), the query fails only if
// that index is not found and this parameter is set to `false`.
bool ignore_missing_indexes = 18;
}

enum CountHits {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

174 changes: 172 additions & 2 deletions quickwit/quickwit-search/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ fn simplify_search_request_for_scroll_api(req: &SearchRequest) -> crate::Result<
// request is simplified after initial query, and we cache the hit count, so we don't need
// to recompute it afterward.
count_hits: quickwit_proto::search::CountHits::Underestimate as i32,
ignore_missing_indexes: req.ignore_missing_indexes,
})
}

Expand Down Expand Up @@ -1156,7 +1157,12 @@ async fn plan_splits_for_root_search(
.deserialize_indexes_metadata()
.await?;

check_all_index_metadata_found(&indexes_metadata[..], &search_request.index_id_patterns[..])?;
if !search_request.ignore_missing_indexes {
check_all_index_metadata_found(
&indexes_metadata[..],
&search_request.index_id_patterns[..],
)?;
}

if indexes_metadata.is_empty() {
return Ok((Vec::new(), HashMap::default()));
Expand Down Expand Up @@ -1243,7 +1249,12 @@ pub async fn search_plan(
.deserialize_indexes_metadata()
.await?;

check_all_index_metadata_found(&indexes_metadata[..], &search_request.index_id_patterns[..])?;
if !search_request.ignore_missing_indexes {
check_all_index_metadata_found(
&indexes_metadata[..],
&search_request.index_id_patterns[..],
)?;
}
if indexes_metadata.is_empty() {
return Ok(SearchPlanResponse {
result: serde_json::to_string(&SearchPlanResponseRest {
Expand Down Expand Up @@ -3240,6 +3251,102 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_root_search_missing_index() -> anyhow::Result<()> {
let mut mock_metastore = MockMetastoreService::new();
let index_metadata = IndexMetadata::for_test("test-index1", "ram:///test-index");
let index_uid = index_metadata.index_uid.clone();
mock_metastore
.expect_list_indexes_metadata()
.returning(move |_index_ids_query| {
Ok(ListIndexesMetadataResponse::for_test(vec![
index_metadata.clone(),
]))
});
mock_metastore
.expect_list_splits()
.returning(move |_list_splits_request| {
let splits = vec![
MockSplitBuilder::new("split1")
.with_index_uid(&index_uid)
.build(),
];
let splits_response = ListSplitsResponse::try_from_splits(splits).unwrap();
Ok(ServiceStream::from(vec![Ok(splits_response)]))
});
let mock_metastore_client = MetastoreServiceClient::from_mock(mock_metastore);
let mut mock_search_service = MockSearchService::new();
mock_search_service.expect_leaf_search().returning(
|_leaf_search_req: quickwit_proto::search::LeafSearchRequest| {
Ok(quickwit_proto::search::LeafSearchResponse {
num_hits: 3,
partial_hits: vec![
mock_partial_hit("split1", 3, 1),
mock_partial_hit("split1", 2, 2),
mock_partial_hit("split1", 1, 3),
],
failed_splits: Vec::new(),
num_attempted_splits: 1,
..Default::default()
})
},
);
mock_search_service.expect_fetch_docs().returning(
|fetch_docs_req: quickwit_proto::search::FetchDocsRequest| {
Ok(quickwit_proto::search::FetchDocsResponse {
hits: get_doc_for_fetch_req(fetch_docs_req),
})
},
);
let searcher_pool = searcher_pool_for_test([("127.0.0.1:1001", mock_search_service)]);
let search_job_placer = SearchJobPlacer::new(searcher_pool);
let cluster_client = ClusterClient::new(search_job_placer.clone());

let searcher_context = SearcherContext::for_test();

// search with ignore_missing_indexes=true succeeds
let search_request = quickwit_proto::search::SearchRequest {
index_id_patterns: vec!["test-index1".to_string(), "test-index2".to_string()],
query_ast: qast_json_helper("test", &["body"]),
max_hits: 10,
ignore_missing_indexes: true,
..Default::default()
};
let search_response = root_search(
&searcher_context,
search_request,
mock_metastore_client.clone(),
&cluster_client,
)
.await
.unwrap();
assert_eq!(search_response.num_hits, 3);
assert_eq!(search_response.hits.len(), 3);

// search with ignore_missing_indexes=false fails
let search_request = quickwit_proto::search::SearchRequest {
index_id_patterns: vec!["test-index1".to_string(), "test-index2".to_string()],
query_ast: qast_json_helper("test", &["body"]),
max_hits: 10,
ignore_missing_indexes: false,
..Default::default()
};
let search_error = root_search(
&searcher_context,
search_request,
mock_metastore_client,
&cluster_client,
)
.await
.unwrap_err();
if let SearchError::IndexesNotFound { index_ids } = search_error {
assert_eq!(index_ids, vec!["test-index2".to_string()]);
} else {
panic!("unexpected error type: {search_error}");
}
Ok(())
}

#[tokio::test]
async fn test_root_search_multiple_splits_retry_on_other_node() -> anyhow::Result<()> {
let search_request = quickwit_proto::search::SearchRequest {
Expand Down Expand Up @@ -4112,6 +4219,69 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_search_plan_missing_index() -> anyhow::Result<()> {
let mut mock_metastore = MockMetastoreService::new();
let index_metadata = IndexMetadata::for_test("test-index1", "ram:///test-index");
let index_uid = index_metadata.index_uid.clone();
mock_metastore
.expect_list_indexes_metadata()
.returning(move |_index_ids_query| {
Ok(ListIndexesMetadataResponse::for_test(vec![
index_metadata.clone(),
]))
});
mock_metastore
.expect_list_splits()
.returning(move |_filter| {
let splits = vec![
MockSplitBuilder::new("split1")
.with_index_uid(&index_uid)
.build(),
MockSplitBuilder::new("split2")
.with_index_uid(&index_uid)
.build(),
];
let splits_response = ListSplitsResponse::try_from_splits(splits).unwrap();
Ok(ServiceStream::from(vec![Ok(splits_response)]))
});
let mock_metastore_service = MetastoreServiceClient::from_mock(mock_metastore);

// plan with ignore_missing_indexes=true succeeds
search_plan(
quickwit_proto::search::SearchRequest {
index_id_patterns: vec!["test-index1".to_string(), "test-index2".to_string()],
query_ast: qast_json_helper("test-query", &["body"]),
max_hits: 10,
ignore_missing_indexes: true,
..Default::default()
},
mock_metastore_service.clone(),
)
.await
.unwrap();

// plan with ignore_missing_indexes=false fails
let search_error = search_plan(
quickwit_proto::search::SearchRequest {
index_id_patterns: vec!["test-index1".to_string(), "test-index2".to_string()],
query_ast: qast_json_helper("test-query", &["body"]),
max_hits: 10,
ignore_missing_indexes: false,
..Default::default()
},
mock_metastore_service.clone(),
)
.await
.unwrap_err();
if let SearchError::IndexesNotFound { index_ids } = search_error {
assert_eq!(index_ids, vec!["test-index2".to_string()]);
} else {
panic!("unexpected error type: {search_error}");
}
Ok(())
}

#[test]
fn test_extract_timestamp_range_from_ast() {
use std::ops::Bound;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::simple_list::{from_simple_list, to_simple_list};

// Multi search doc: https://www.elastic.co/guide/en/elasticsearch/reference/current/search-multi-search.html

#[serde_as]
#[serde_with::skip_serializing_none]
#[derive(Default, Debug, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
Expand All @@ -50,6 +51,10 @@ pub struct MultiSearchQueryParams {
pub ignore_throttled: Option<bool>,
#[serde(default)]
pub ignore_unavailable: Option<bool>,
/// List of indexes to search.
#[serde_as(deserialize_as = "OneOrMany<_, PreferMany>")]
#[serde(default, rename = "index")]
pub indexes: Vec<String>,
#[serde(default)]
pub max_concurrent_searches: Option<u64>,
#[serde(default)]
Expand Down Expand Up @@ -90,8 +95,8 @@ pub struct MultiSearchHeader {
#[serde(default)]
pub ignore_unavailable: Option<bool>,
#[serde_as(deserialize_as = "OneOrMany<_, PreferMany>")]
#[serde(default)]
pub index: Vec<String>,
#[serde(default, rename = "index")]
pub indexes: Vec<String>,
#[serde(default)]
pub preference: Option<String>,
#[serde(default)]
Expand All @@ -100,6 +105,26 @@ pub struct MultiSearchHeader {
pub routing: Option<Vec<String>>,
}

impl MultiSearchHeader {
pub fn apply_query_param_defaults(&mut self, defaults: &MultiSearchQueryParams) {
if self.allow_no_indices.is_none() {
self.allow_no_indices = defaults.allow_no_indices;
}
if self.expand_wildcards.is_none() {
self.expand_wildcards = defaults.expand_wildcards.clone();
}
if self.ignore_unavailable.is_none() {
self.ignore_unavailable = defaults.ignore_unavailable;
}
if self.indexes.is_empty() {
self.indexes = defaults.indexes.clone();
}
if self.routing.is_none() {
self.routing = defaults.routing.clone();
}
}
}

#[derive(Serialize)]
pub struct MultiSearchResponse {
pub responses: Vec<MultiSearchSingleResponse>,
Expand Down
24 changes: 14 additions & 10 deletions quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ fn build_request_for_es_api(

let max_hits = search_params.size.or(search_body.size).unwrap_or(10);
let start_offset = search_params.from.or(search_body.from).unwrap_or(0);
let ignore_missing_indexes = search_params.ignore_unavailable.unwrap_or(false);
let count_hits = match search_params
.track_total_hits
.or(search_body.track_total_hits)
Expand Down Expand Up @@ -410,6 +411,7 @@ fn build_request_for_es_api(
scroll_ttl_secs,
search_after,
count_hits,
ignore_missing_indexes,
},
has_doc_id_field,
))
Expand Down Expand Up @@ -814,26 +816,28 @@ async fn es_compat_index_multi_search(
let mut payload_lines = str_lines(str_payload);

while let Some(line) = payload_lines.next() {
let request_header = serde_json::from_str::<MultiSearchHeader>(line).map_err(|err| {
SearchError::InvalidArgument(format!(
"failed to parse request header `{}...`: {}",
truncate_str(line, 20),
err
))
})?;
if request_header.index.is_empty() {
let mut request_header =
serde_json::from_str::<MultiSearchHeader>(line).map_err(|err| {
SearchError::InvalidArgument(format!(
"failed to parse request header `{}...`: {}",
truncate_str(line, 20),
err
))
})?;
request_header.apply_query_param_defaults(&multi_search_params);
if request_header.indexes.is_empty() {
return Err(ElasticsearchError::from(SearchError::InvalidArgument(
"`_msearch` request header must define at least one index".to_string(),
)));
}
for index in &request_header.index {
for index in &request_header.indexes {
validate_index_id_pattern(index, true).map_err(|err| {
SearchError::InvalidArgument(format!(
"request header contains an invalid index: {err}"
))
})?;
}
let index_ids_patterns = request_header.index.clone();
let index_ids_patterns = request_header.indexes.clone();
let search_body = payload_lines
.next()
.ok_or_else(|| {
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-serve/src/search_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ pub fn search_request_from_api_request(
scroll_ttl_secs: None,
search_after: None,
count_hits: search_request.count_all.into(),
ignore_missing_indexes: false,
};
Ok(search_request)
}
Expand Down
Loading