Skip to content

Commit f911e26

Browse files
authored
Fix empty intermediate aggregation results merge (#5930)
1 parent 2ba3e39 commit f911e26

File tree

1 file changed

+44
-21
lines changed

1 file changed

+44
-21
lines changed

quickwit/quickwit-search/src/collector.rs

Lines changed: 44 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -860,8 +860,10 @@ impl Collector for QuickwitCollector {
860860
}
861861
}
862862

863-
fn map_error(err: postcard::Error) -> TantivyError {
864-
TantivyError::InternalError(format!("merge result Postcard error: {err}"))
863+
fn map_error(error: postcard::Error) -> TantivyError {
864+
TantivyError::InternalError(format!(
865+
"failed to merge intermediate aggregation results: Postcard error: {error}"
866+
))
865867
}
866868

867869
/// Merges a set of Leaf Results.
@@ -883,24 +885,24 @@ fn merge_intermediate_aggregation_result<'a>(
883885
Some(serialized)
884886
}
885887
Some(QuickwitAggregations::TantivyAggregations(_)) => {
886-
let fruits: Vec<IntermediateAggregationResults> = intermediate_aggregation_results
887-
.map(|intermediate_aggregation_result| {
888-
postcard::from_bytes(intermediate_aggregation_result).map_err(map_error)
889-
})
890-
.collect::<Result<_, _>>()?;
891-
892-
let mut fruit_iter = fruits.into_iter();
893-
if let Some(first_fruit) = fruit_iter.next() {
894-
let mut merged_fruit = first_fruit;
895-
for fruit in fruit_iter {
896-
merged_fruit.merge_fruits(fruit)?;
897-
}
898-
let serialized = postcard::to_allocvec(&merged_fruit).map_err(map_error)?;
899-
900-
Some(serialized)
901-
} else {
902-
None
903-
}
888+
let merged_opt = intermediate_aggregation_results
889+
.map(|bytes| postcard::from_bytes(bytes).map_err(map_error))
890+
.try_fold::<_, _, Result<_, TantivyError>>(
891+
None,
892+
|acc: Option<IntermediateAggregationResults>, fruits_res| {
893+
let fruits = fruits_res?;
894+
match acc {
895+
Some(mut merged_fruits) => {
896+
merged_fruits.merge_fruits(fruits)?;
897+
Ok(Some(merged_fruits))
898+
}
899+
None => Ok(Some(fruits)),
900+
}
901+
},
902+
)?;
903+
let serialized =
904+
postcard::to_allocvec(&merged_opt.unwrap_or_default()).map_err(map_error)?;
905+
Some(serialized)
904906
}
905907
None => None,
906908
};
@@ -1293,10 +1295,13 @@ mod tests {
12931295
SortOrder, SortValue, SplitSearchError,
12941296
};
12951297
use tantivy::TantivyDocument;
1298+
use tantivy::aggregation::agg_req::Aggregations;
1299+
use tantivy::aggregation::intermediate_agg_result::IntermediateAggregationResults;
12961300
use tantivy::collector::Collector;
12971301

12981302
use super::{IncrementalCollector, make_merge_collector};
1299-
use crate::collector::top_k_partial_hits;
1303+
use crate::QuickwitAggregations;
1304+
use crate::collector::{merge_intermediate_aggregation_result, top_k_partial_hits};
13001305

13011306
#[test]
13021307
fn test_merge_partial_hits_no_tie() {
@@ -2002,4 +2007,22 @@ mod tests {
20022007
);
20032008
// TODO would be nice to test aggregation too.
20042009
}
2010+
2011+
#[test]
2012+
fn test_merge_empty_intermediate_aggregation_result() {
2013+
let merged = merge_intermediate_aggregation_result(&None, std::iter::empty()).unwrap();
2014+
assert!(merged.is_none());
2015+
2016+
let aggregations_json = r#"{
2017+
"avg_price": { "avg": { "field": "price" } }
2018+
}"#;
2019+
let ttv_aggregations: Aggregations = serde_json::from_str(aggregations_json).unwrap();
2020+
let qw_aggregations = QuickwitAggregations::TantivyAggregations(ttv_aggregations);
2021+
let serialized =
2022+
merge_intermediate_aggregation_result(&Some(qw_aggregations), std::iter::empty())
2023+
.unwrap()
2024+
.unwrap();
2025+
let _merged: IntermediateAggregationResults = postcard::from_bytes(&serialized).unwrap();
2026+
// Hopefully `_merged` is empty but the API does not allow us to assert that.
2027+
}
20052028
}

0 commit comments

Comments
 (0)