From dfaed0baed48ebf1051944e337bb6aa86e28ffa6 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Wed, 29 Oct 2025 14:03:35 +0100 Subject: [PATCH] Add composite aggregation Uses the latest composite PR in Tantivy --- quickwit/Cargo.lock | 18 ++-- quickwit/Cargo.toml | 2 +- quickwit/quickwit-query/src/aggregations.rs | 90 ++++++++++++++++++- .../aggregations/0001-aggregations.yaml | 84 +++++++++++++++++ 4 files changed, 183 insertions(+), 11 deletions(-) diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index f5cd19ec89e..0d2107ff5c9 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -5682,7 +5682,7 @@ checksum = "1a80800c0488c3a21695ea981a54918fbb37abf04f4d0720c453632255e2ff0e" [[package]] name = "ownedbytes" version = "0.9.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=dabcaa5#dabcaa58093a3f7f10e98a5a3b06cfe2370482f9" +source = "git+https://github.com/SekoiaLab/tantivy/?rev=bbdf83e#bbdf83e82e6ddc334727c35df6cc55452319ccc6" dependencies = [ "stable_deref_trait", ] @@ -9570,7 +9570,7 @@ checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" [[package]] name = "tantivy" version = "0.25.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=dabcaa5#dabcaa58093a3f7f10e98a5a3b06cfe2370482f9" +source = "git+https://github.com/SekoiaLab/tantivy/?rev=bbdf83e#bbdf83e82e6ddc334727c35df6cc55452319ccc6" dependencies = [ "aho-corasick", "arc-swap", @@ -9625,7 +9625,7 @@ dependencies = [ [[package]] name = "tantivy-bitpacker" version = "0.9.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=dabcaa5#dabcaa58093a3f7f10e98a5a3b06cfe2370482f9" +source = "git+https://github.com/SekoiaLab/tantivy/?rev=bbdf83e#bbdf83e82e6ddc334727c35df6cc55452319ccc6" dependencies = [ "bitpacking", ] @@ -9633,7 +9633,7 @@ dependencies = [ [[package]] name = "tantivy-columnar" version = "0.6.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=dabcaa5#dabcaa58093a3f7f10e98a5a3b06cfe2370482f9" +source = "git+https://github.com/SekoiaLab/tantivy/?rev=bbdf83e#bbdf83e82e6ddc334727c35df6cc55452319ccc6" dependencies = [ "downcast-rs", "fastdivide", @@ -9648,7 +9648,7 @@ dependencies = [ [[package]] name = "tantivy-common" version = "0.10.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=dabcaa5#dabcaa58093a3f7f10e98a5a3b06cfe2370482f9" +source = "git+https://github.com/SekoiaLab/tantivy/?rev=bbdf83e#bbdf83e82e6ddc334727c35df6cc55452319ccc6" dependencies = [ "async-trait", "byteorder", @@ -9671,7 +9671,7 @@ dependencies = [ [[package]] name = "tantivy-query-grammar" version = "0.25.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=dabcaa5#dabcaa58093a3f7f10e98a5a3b06cfe2370482f9" +source = "git+https://github.com/SekoiaLab/tantivy/?rev=bbdf83e#bbdf83e82e6ddc334727c35df6cc55452319ccc6" dependencies = [ "fnv", "nom 7.1.3", @@ -9683,7 +9683,7 @@ dependencies = [ [[package]] name = "tantivy-sstable" version = "0.6.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=dabcaa5#dabcaa58093a3f7f10e98a5a3b06cfe2370482f9" +source = "git+https://github.com/SekoiaLab/tantivy/?rev=bbdf83e#bbdf83e82e6ddc334727c35df6cc55452319ccc6" dependencies = [ "futures-util", "itertools 0.14.0", @@ -9696,7 +9696,7 @@ dependencies = [ [[package]] name = "tantivy-stacker" version = "0.6.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=dabcaa5#dabcaa58093a3f7f10e98a5a3b06cfe2370482f9" +source = "git+https://github.com/SekoiaLab/tantivy/?rev=bbdf83e#bbdf83e82e6ddc334727c35df6cc55452319ccc6" dependencies = [ "murmurhash32", "rand_distr", @@ -9706,7 +9706,7 @@ dependencies = [ [[package]] name = "tantivy-tokenizer-api" version = "0.6.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=dabcaa5#dabcaa58093a3f7f10e98a5a3b06cfe2370482f9" +source = "git+https://github.com/SekoiaLab/tantivy/?rev=bbdf83e#bbdf83e82e6ddc334727c35df6cc55452319ccc6" dependencies = [ "serde", ] diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index 89e1429d9ab..e7ed41793c3 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -346,7 +346,7 @@ quickwit-serve = { path = "quickwit-serve" } quickwit-storage = { path = "quickwit-storage" } quickwit-telemetry = { path = "quickwit-telemetry" } -tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "dabcaa5", default-features = false, features = [ +tantivy = { git = "https://github.com/SekoiaLab/tantivy/", rev = "bbdf83e", default-features = false, features = [ "lz4-compression", "mmap", "quickwit", diff --git a/quickwit/quickwit-query/src/aggregations.rs b/quickwit/quickwit-query/src/aggregations.rs index eca275ad265..5318d7884c6 100644 --- a/quickwit/quickwit-query/src/aggregations.rs +++ b/quickwit/quickwit-query/src/aggregations.rs @@ -18,7 +18,8 @@ use tantivy::aggregation::Key as TantivyKey; use tantivy::aggregation::agg_result::{ AggregationResult as TantivyAggregationResult, AggregationResults as TantivyAggregationResults, BucketEntries as TantivyBucketEntries, BucketEntry as TantivyBucketEntry, - BucketResult as TantivyBucketResult, MetricResult as TantivyMetricResult, + BucketResult as TantivyBucketResult, CompositeBucketEntry as TantivyCompositeBucketEntry, + CompositeKey as TantivyCompositeKey, MetricResult as TantivyMetricResult, RangeBucketEntry as TantivyRangeBucketEntry, }; use tantivy::aggregation::metric::{ @@ -169,6 +170,13 @@ pub enum BucketResult { /// The upper bound error for the doc count of each term. doc_count_error_upper_bound: Option, }, + /// This is the composite aggregation result + Composite { + /// The buckets + buckets: Vec, + /// The key to start after when paginating + after_key: FxHashMap, + }, } impl From for BucketResult { @@ -189,6 +197,10 @@ impl From for BucketResult { sum_other_doc_count, doc_count_error_upper_bound, }, + TantivyBucketResult::Composite { buckets, after_key } => BucketResult::Composite { + buckets: buckets.into_iter().map(Into::into).collect(), + after_key: after_key.into_iter().map(|(k, v)| (k, v.into())).collect(), + }, } } } @@ -211,6 +223,10 @@ impl From for TantivyBucketResult { sum_other_doc_count, doc_count_error_upper_bound, }, + BucketResult::Composite { buckets, after_key } => TantivyBucketResult::Composite { + buckets: buckets.into_iter().map(Into::into).collect(), + after_key: after_key.into_iter().map(|(k, v)| (k, v.into())).collect(), + }, } } } @@ -410,3 +426,75 @@ impl From for TantivyPercentilesMetricResult { TantivyPercentilesMetricResult { values } } } + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum CompositeKey { + /// Boolean key + Bool(bool), + /// String key + Str(String), + /// `i64` key + I64(i64), + /// `u64` key + U64(u64), + /// `f64` key + F64(f64), + /// Null key + Null, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct CompositeBucketEntry { + /// The identifier of the bucket. + pub key: FxHashMap, + /// Number of documents in the bucket. + pub doc_count: u64, + /// Sub-aggregations in this bucket. + pub sub_aggregation: AggregationResults, +} + +impl From for CompositeKey { + fn from(value: TantivyCompositeKey) -> CompositeKey { + match value { + TantivyCompositeKey::Bool(b) => CompositeKey::Bool(b), + TantivyCompositeKey::Str(s) => CompositeKey::Str(s), + TantivyCompositeKey::I64(i) => CompositeKey::I64(i), + TantivyCompositeKey::U64(u) => CompositeKey::U64(u), + TantivyCompositeKey::F64(f) => CompositeKey::F64(f), + TantivyCompositeKey::Null => CompositeKey::Null, + } + } +} + +impl From for TantivyCompositeKey { + fn from(value: CompositeKey) -> TantivyCompositeKey { + match value { + CompositeKey::Bool(b) => TantivyCompositeKey::Bool(b), + CompositeKey::Str(s) => TantivyCompositeKey::Str(s), + CompositeKey::I64(i) => TantivyCompositeKey::I64(i), + CompositeKey::U64(u) => TantivyCompositeKey::U64(u), + CompositeKey::F64(f) => TantivyCompositeKey::F64(f), + CompositeKey::Null => TantivyCompositeKey::Null, + } + } +} + +impl From for CompositeBucketEntry { + fn from(value: TantivyCompositeBucketEntry) -> CompositeBucketEntry { + CompositeBucketEntry { + key: value.key.into_iter().map(|(k, v)| (k, v.into())).collect(), + doc_count: value.doc_count, + sub_aggregation: value.sub_aggregation.into(), + } + } +} + +impl From for TantivyCompositeBucketEntry { + fn from(value: CompositeBucketEntry) -> TantivyCompositeBucketEntry { + TantivyCompositeBucketEntry { + key: value.key.into_iter().map(|(k, v)| (k, v.into())).collect(), + doc_count: value.doc_count, + sub_aggregation: value.sub_aggregation.into(), + } + } +} diff --git a/quickwit/rest-api-tests/scenarii/aggregations/0001-aggregations.yaml b/quickwit/rest-api-tests/scenarii/aggregations/0001-aggregations.yaml index f81c2215f40..63daf92db06 100644 --- a/quickwit/rest-api-tests/scenarii/aggregations/0001-aggregations.yaml +++ b/quickwit/rest-api-tests/scenarii/aggregations/0001-aggregations.yaml @@ -375,6 +375,7 @@ expected: aggregations: response_stats: sum_of_squares: 55300.0 +--- # Test term aggs number precision method: [GET] engines: @@ -393,3 +394,86 @@ expected: buckets: - doc_count: 1 key: 1769070189829214200 +--- +# Test composite aggregation +method: [GET] +engines: + - quickwit +endpoint: _elastic/aggregations/_search +json: + size: 0 + aggs: + host_name_composite: + composite: + size: 5 + sources: + - host: + terms: + field: "host" + missing_bucket: true + - name: + terms: + field: "name" + - response: + histogram: + field: "response" + interval: 50 +expected: + aggregations: + host_name_composite: + buckets: + - key: { "host": null, "name": "Bernhard", "response": 100.0 } + doc_count: 1 + - key: { "host": null, "name": "Fritz", "response": 0.0 } + doc_count: 2 + - key: { "host": "192.168.0.1", "name": "Fred", "response": 100.0 } + doc_count: 1 + - key: { "host": "192.168.0.1", "name": "Fritz", "response": 0.0 } + doc_count: 1 + - key: { "host": "192.168.0.10", "name": "Albert", "response": 100.0 } + doc_count: 1 + after_key: + host: "192.168.0.10" + name: "Albert" + response: 100.0 + +--- +# Test composite aggregation paging +method: [GET] +engines: + - quickwit +endpoint: _elastic/aggregations/_search +json: + size: 0 + aggs: + host_name_composite: + composite: + size: 5 + sources: + - host: + terms: + field: "host" + missing_bucket: true + - name: + terms: + field: "name" + - response: + histogram: + field: "response" + interval: 50 + after: + host: "192.168.0.10" + name: "Albert" + response: 100.0 +expected: + aggregations: + host_name_composite: + buckets: + - key: { "host": "192.168.0.10", "name": "Holger", "response": 0.0 } + doc_count: 1 + # Horst is missing because his response field is missing + - key: { "host": "192.168.0.10", "name": "Werner", "response": 0.0 } + doc_count: 1 + - key: { "host": "192.168.0.11", "name": "Manfred", "response": 100.0 } + doc_count: 1 +--- \ No newline at end of file