Skip to content

Commit 3efed0c

Browse files
feat: improve runtime filter [Part 1] (#18893)
* feat: propagate runtime filters through equivalence classes * make lint * refactor: improve profile of runtime filter * feat: scan wait for runtime filter build ready * fix * remove builder.contain_sink_processor * fix * fix * fix * fix * add TransformRuntimeFilterWait * add log * add log * fix * enable bloom * determine at runtime whether to apply the bloom runtime filter * fix * fix * fix * rm log * improve * fix * update test * update test * update test * update test * update test * update test * update test * update test * update test * fix comment and rm unused code * fix * try fix * fix merge * fix merge * fix * fix comment * fix * update test * update test * update test * update test * update test * update test * update test * update test * fix * fix --------- Co-authored-by: Winter Zhang <coswde@gmail.com>
1 parent a00b55a commit 3efed0c

File tree

162 files changed

+3321
-1134
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

162 files changed

+3321
-1134
lines changed

src/query/catalog/src/runtime_filter_info.rs

Lines changed: 87 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414

1515
use std::fmt::Debug;
1616
use std::fmt::Formatter;
17+
use std::sync::atomic::AtomicU64;
18+
use std::sync::atomic::Ordering;
19+
use std::sync::Arc;
1720

1821
use databend_common_base::base::tokio::sync::watch;
1922
use databend_common_base::base::tokio::sync::watch::Receiver;
@@ -23,42 +26,108 @@ use xorf::BinaryFuse16;
2326

2427
#[derive(Clone, Default)]
2528
pub struct RuntimeFilterInfo {
26-
pub inlist: Vec<Expr<String>>,
27-
pub min_max: Vec<Expr<String>>,
28-
pub bloom: Vec<(String, BinaryFuse16)>,
29+
pub filters: Vec<RuntimeFilterEntry>,
2930
}
3031

3132
impl Debug for RuntimeFilterInfo {
3233
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
3334
write!(
3435
f,
35-
"RuntimeFilterInfo {{ inlist: {}, min_max: {}, bloom: {:?} }}",
36-
self.inlist
36+
"RuntimeFilterInfo {{ filters: [{}] }}",
37+
self.filters
3738
.iter()
38-
.map(|e| e.sql_display())
39+
.map(|entry| format!("#{}(probe:{})", entry.id, entry.probe_expr.sql_display()))
3940
.collect::<Vec<String>>()
40-
.join(","),
41-
self.min_max
42-
.iter()
43-
.map(|e| e.sql_display())
44-
.collect::<Vec<String>>()
45-
.join(","),
46-
self.bloom
47-
.iter()
48-
.map(|(name, _)| name)
49-
.collect::<Vec<&String>>()
41+
.join(",")
5042
)
5143
}
5244
}
5345

5446
impl RuntimeFilterInfo {
5547
pub fn is_empty(&self) -> bool {
56-
self.inlist.is_empty() && self.bloom.is_empty() && self.min_max.is_empty()
48+
self.filters.is_empty()
5749
}
5850

5951
pub fn is_blooms_empty(&self) -> bool {
60-
self.bloom.is_empty()
52+
self.filters.iter().all(|entry| entry.bloom.is_none())
53+
}
54+
}
55+
56+
#[derive(Clone)]
57+
pub struct RuntimeFilterEntry {
58+
pub id: usize,
59+
pub probe_expr: Expr<String>,
60+
pub bloom: Option<RuntimeFilterBloom>,
61+
pub inlist: Option<Expr<String>>,
62+
pub min_max: Option<Expr<String>>,
63+
pub stats: Arc<RuntimeFilterStats>,
64+
pub build_rows: usize,
65+
pub build_table_rows: Option<u64>,
66+
pub enabled: bool,
67+
}
68+
69+
#[derive(Clone)]
70+
pub struct RuntimeFilterBloom {
71+
pub column_name: String,
72+
pub filter: BinaryFuse16,
73+
}
74+
75+
#[derive(Default)]
76+
pub struct RuntimeFilterStats {
77+
bloom_time_ns: AtomicU64,
78+
bloom_rows_filtered: AtomicU64,
79+
inlist_min_max_time_ns: AtomicU64,
80+
min_max_rows_filtered: AtomicU64,
81+
min_max_partitions_pruned: AtomicU64,
82+
}
83+
84+
impl RuntimeFilterStats {
85+
pub fn new() -> Self {
86+
Self::default()
6187
}
88+
89+
pub fn record_bloom(&self, time_ns: u64, rows_filtered: u64) {
90+
self.bloom_time_ns.fetch_add(time_ns, Ordering::Relaxed);
91+
self.bloom_rows_filtered
92+
.fetch_add(rows_filtered, Ordering::Relaxed);
93+
}
94+
95+
pub fn record_inlist_min_max(&self, time_ns: u64, rows_filtered: u64, partitions_pruned: u64) {
96+
self.inlist_min_max_time_ns
97+
.fetch_add(time_ns, Ordering::Relaxed);
98+
self.min_max_rows_filtered
99+
.fetch_add(rows_filtered, Ordering::Relaxed);
100+
self.min_max_partitions_pruned
101+
.fetch_add(partitions_pruned, Ordering::Relaxed);
102+
}
103+
104+
pub fn snapshot(&self) -> RuntimeFilterStatsSnapshot {
105+
RuntimeFilterStatsSnapshot {
106+
bloom_time_ns: self.bloom_time_ns.load(Ordering::Relaxed),
107+
bloom_rows_filtered: self.bloom_rows_filtered.load(Ordering::Relaxed),
108+
inlist_min_max_time_ns: self.inlist_min_max_time_ns.load(Ordering::Relaxed),
109+
min_max_rows_filtered: self.min_max_rows_filtered.load(Ordering::Relaxed),
110+
min_max_partitions_pruned: self.min_max_partitions_pruned.load(Ordering::Relaxed),
111+
}
112+
}
113+
}
114+
115+
#[derive(Default, Clone, Debug)]
116+
pub struct RuntimeFilterStatsSnapshot {
117+
pub bloom_time_ns: u64,
118+
pub bloom_rows_filtered: u64,
119+
pub inlist_min_max_time_ns: u64,
120+
pub min_max_rows_filtered: u64,
121+
pub min_max_partitions_pruned: u64,
122+
}
123+
124+
#[derive(Clone, Debug)]
125+
pub struct RuntimeFilterReport {
126+
pub filter_id: usize,
127+
pub has_bloom: bool,
128+
pub has_inlist: bool,
129+
pub has_min_max: bool,
130+
pub stats: RuntimeFilterStatsSnapshot,
62131
}
63132

64133
pub struct RuntimeFilterReady {

src/query/catalog/src/table_context.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,10 @@ use crate::plan::PartInfoPtr;
7878
use crate::plan::PartStatistics;
7979
use crate::plan::Partitions;
8080
use crate::query_kind::QueryKind;
81+
use crate::runtime_filter_info::RuntimeFilterEntry;
8182
use crate::runtime_filter_info::RuntimeFilterInfo;
8283
use crate::runtime_filter_info::RuntimeFilterReady;
84+
use crate::runtime_filter_info::RuntimeFilterReport;
8385
use crate::session_type::SessionType;
8486
use crate::statistics::data_cache_statistics::DataCacheMetrics;
8587
use crate::table::Table;
@@ -357,22 +359,25 @@ pub trait TableContext: Send + Sync {
357359

358360
fn get_runtime_filter_ready(&self, table_index: usize) -> Vec<Arc<RuntimeFilterReady>>;
359361

360-
fn set_wait_runtime_filter(&self, table_index: usize, need_to_wait: bool);
361-
362-
fn get_wait_runtime_filter(&self, table_index: usize) -> bool;
363-
364362
fn clear_runtime_filter(&self);
363+
fn assert_no_runtime_filter_state(&self) -> Result<()> {
364+
unimplemented!()
365+
}
365366

366367
fn set_merge_into_join(&self, join: MergeIntoJoin);
367368

368369
fn get_merge_into_join(&self) -> MergeIntoJoin;
369370

371+
fn get_runtime_filters(&self, id: usize) -> Vec<RuntimeFilterEntry>;
372+
370373
fn get_bloom_runtime_filter_with_id(&self, id: usize) -> Vec<(String, BinaryFuse16)>;
371374

372375
fn get_inlist_runtime_filter_with_id(&self, id: usize) -> Vec<Expr<String>>;
373376

374377
fn get_min_max_runtime_filter_with_id(&self, id: usize) -> Vec<Expr<String>>;
375378

379+
fn runtime_filter_reports(&self) -> HashMap<usize, Vec<RuntimeFilterReport>>;
380+
376381
fn has_bloom_runtime_filters(&self, id: usize) -> bool;
377382
fn txn_mgr(&self) -> TxnManagerRef;
378383
fn get_table_meta_timestamps(

src/query/service/src/interpreters/interpreter_explain.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ impl Interpreter for ExplainInterpreter {
180180
profs: HashMap::new(),
181181
metadata: &metadata,
182182
scan_id_to_runtime_filters: HashMap::new(),
183+
runtime_filter_reports: HashMap::new(),
183184
};
184185

185186
let formatter = plan.formatter()?;
@@ -483,13 +484,16 @@ impl ExplainInterpreter {
483484
plan.set_pruning_stats(&mut pruned_partitions_stats);
484485
}
485486

487+
let runtime_filter_reports = self.ctx.runtime_filter_reports();
488+
486489
let result = match self.partial {
487490
true => {
488491
let metadata = metadata.read();
489492
let mut context = FormatContext {
490493
profs: query_profiles.clone(),
491494
metadata: &metadata,
492495
scan_id_to_runtime_filters: HashMap::new(),
496+
runtime_filter_reports: runtime_filter_reports.clone(),
493497
};
494498

495499
let formatter = plan.formatter()?;
@@ -498,8 +502,15 @@ impl ExplainInterpreter {
498502
}
499503
false => {
500504
let metadata = metadata.read();
501-
plan.format(&metadata, query_profiles.clone())?
502-
.format_pretty()?
505+
let mut context = FormatContext {
506+
profs: query_profiles.clone(),
507+
metadata: &metadata,
508+
scan_id_to_runtime_filters: HashMap::new(),
509+
runtime_filter_reports: runtime_filter_reports.clone(),
510+
};
511+
let formatter = plan.formatter()?;
512+
let format_node = formatter.format(&mut context)?;
513+
format_node.format_pretty()?
503514
}
504515
};
505516

src/query/service/src/physical_plans/format/common.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use databend_common_ast::ast::FormatTreeNode;
1818
use databend_common_base::base::format_byte_size;
1919
use databend_common_base::runtime::profile::get_statistics_desc;
2020
use databend_common_catalog::plan::PartStatistics;
21+
use databend_common_catalog::runtime_filter_info::RuntimeFilterReport;
2122
use databend_common_expression::DataSchemaRef;
2223
use databend_common_sql::executor::physical_plans::AggregateFunctionDesc;
2324
use databend_common_sql::IndexType;
@@ -32,6 +33,7 @@ pub struct FormatContext<'a> {
3233
pub metadata: &'a Metadata,
3334
pub profs: HashMap<u32, PlanProfile>,
3435
pub scan_id_to_runtime_filters: HashMap<IndexType, Vec<PhysicalRuntimeFilter>>,
36+
pub runtime_filter_reports: HashMap<IndexType, Vec<RuntimeFilterReport>>,
3537
}
3638

3739
pub fn pretty_display_agg_desc(desc: &AggregateFunctionDesc, metadata: &Metadata) -> String {

src/query/service/src/physical_plans/format/format_hash_join.rs

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,14 @@ impl<'a> PhysicalFormat for HashJoinFormatter<'a> {
4343

4444
#[recursive::recursive]
4545
fn format(&self, ctx: &mut FormatContext<'_>) -> Result<FormatTreeNode<String>> {
46+
// Register runtime filters for all probe targets
4647
for rf in self.inner.runtime_filter.filters.iter() {
47-
ctx.scan_id_to_runtime_filters
48-
.entry(rf.scan_id)
49-
.or_default()
50-
.push(rf.clone());
48+
for (_probe_key, scan_id) in &rf.probe_targets {
49+
ctx.scan_id_to_runtime_filters
50+
.entry(*scan_id)
51+
.or_default()
52+
.push(rf.clone());
53+
}
5154
}
5255

5356
let build_keys = self
@@ -83,11 +86,32 @@ impl<'a> PhysicalFormat for HashJoinFormatter<'a> {
8386

8487
let mut build_runtime_filters = vec![];
8588
for rf in self.inner.runtime_filter.filters.iter() {
89+
// Format all probe targets (sorted for stable explain output)
90+
let mut probe_targets = rf
91+
.probe_targets
92+
.iter()
93+
.map(|(probe_key, scan_id)| {
94+
(
95+
probe_key.as_expr(&BUILTIN_FUNCTIONS).sql_display(),
96+
*scan_id,
97+
)
98+
})
99+
.collect::<Vec<_>>();
100+
101+
// Sort by scan_id first, then by probe key string
102+
probe_targets.sort_by(|a, b| a.1.cmp(&b.1).then_with(|| a.0.cmp(&b.0)));
103+
104+
let probe_targets_str = probe_targets
105+
.into_iter()
106+
.map(|(probe_key_str, scan_id)| format!("{}@scan{}", probe_key_str, scan_id))
107+
.collect::<Vec<_>>()
108+
.join(", ");
109+
86110
let mut s = format!(
87-
"filter id:{}, build key:{}, probe key:{}, filter type:",
111+
"filter id:{}, build key:{}, probe targets:[{}], filter type:",
88112
rf.id,
89113
rf.build_key.as_expr(&BUILTIN_FUNCTIONS).sql_display(),
90-
rf.probe_key.as_expr(&BUILTIN_FUNCTIONS).sql_display(),
114+
probe_targets_str,
91115
);
92116
if rf.enable_bloom_runtime_filter {
93117
s += "bloom,";

src/query/service/src/physical_plans/format/format_table_scan.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ impl<'a> PhysicalFormat for TableScanFormatter<'a> {
105105

106106
let mut children = vec![
107107
FormatTreeNode::new(format!("table: {table_name}")),
108+
FormatTreeNode::new(format!("scan id: {}", self.inner.scan_id)),
108109
FormatTreeNode::new(format!(
109110
"output columns: [{}]",
110111
format_output_columns(self.inner.output_schema()?, ctx.metadata, false)

src/query/service/src/physical_plans/mod.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ mod physical_expression_scan;
3535
mod physical_filter;
3636
mod physical_hash_join;
3737
mod physical_join;
38-
mod physical_join_filter;
3938
mod physical_limit;
4039
mod physical_multi_table_insert;
4140
mod physical_mutation;
@@ -59,6 +58,7 @@ mod physical_udf;
5958
mod physical_union_all;
6059
mod physical_window;
6160
mod physical_window_partition;
61+
mod runtime_filter;
6262

6363
pub use physical_add_stream_column::AddStreamColumn;
6464
pub use physical_aggregate_expand::AggregateExpand;
@@ -82,9 +82,6 @@ pub use physical_exchange_sink::ExchangeSink;
8282
pub use physical_exchange_source::ExchangeSource;
8383
pub use physical_filter::Filter;
8484
pub use physical_hash_join::HashJoin;
85-
pub use physical_join_filter::JoinRuntimeFilter;
86-
pub use physical_join_filter::PhysicalRuntimeFilter;
87-
pub use physical_join_filter::PhysicalRuntimeFilters;
8885
pub use physical_limit::Limit;
8986
pub use physical_materialized_cte::*;
9087
pub use physical_multi_table_insert::*;
@@ -110,6 +107,8 @@ pub use physical_udf::UdfFunctionDesc;
110107
pub use physical_union_all::UnionAll;
111108
pub use physical_window::*;
112109
pub use physical_window_partition::*;
110+
pub use runtime_filter::PhysicalRuntimeFilter;
111+
pub use runtime_filter::PhysicalRuntimeFilters;
113112

114113
pub mod explain;
115114
mod format;

src/query/service/src/physical_plans/physical_aggregate_partial.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,6 @@ impl IPhysicalPlan for AggregatePartial {
167167
}
168168

169169
fn build_pipeline2(&self, builder: &mut PipelineBuilder) -> Result<()> {
170-
builder.contain_sink_processor = true;
171170
self.input.build_pipeline(builder)?;
172171

173172
let max_block_size = builder.settings.get_max_block_size()?;

0 commit comments

Comments
 (0)