From 26bba5b69887f9c89664f1e0bf3c0bbfb57184d8 Mon Sep 17 00:00:00 2001 From: coldWater Date: Fri, 7 Nov 2025 15:50:01 +0800 Subject: [PATCH 1/5] asof --- src/query/service/src/physical_plans/mod.rs | 1 - .../src/physical_plans/physical_asof_join.rs | 425 ------------------ .../src/physical_plans/physical_hash_join.rs | 12 +- .../src/physical_plans/physical_join.rs | 106 +++-- .../src/physical_plans/physical_range_join.rs | 13 +- .../bind_table_reference/bind_asof_join.rs | 264 +++++++++++ .../binder/bind_table_reference/bind_join.rs | 70 +-- .../binder/bind_table_reference/mod.rs | 1 + src/query/sql/src/planner/binder/window.rs | 2 +- .../optimizers/hyper_dp/join_node.rs | 2 +- .../operator/decorrelate/flatten_plan.rs | 2 +- .../outer_join_to_inner_join.rs | 4 +- src/query/sql/src/planner/plans/join.rs | 4 +- .../join/asof/test_asof_join_double.test | 2 +- 14 files changed, 392 insertions(+), 516 deletions(-) delete mode 100644 src/query/service/src/physical_plans/physical_asof_join.rs create mode 100644 src/query/sql/src/planner/binder/bind_table_reference/bind_asof_join.rs diff --git a/src/query/service/src/physical_plans/mod.rs b/src/query/service/src/physical_plans/mod.rs index f8661a3f16632..fc064eda9d2fa 100644 --- a/src/query/service/src/physical_plans/mod.rs +++ b/src/query/service/src/physical_plans/mod.rs @@ -113,7 +113,6 @@ pub use physical_window_partition::*; pub mod explain; mod format; -mod physical_asof_join; mod physical_cte_consumer; mod physical_materialized_cte; mod physical_plan; diff --git a/src/query/service/src/physical_plans/physical_asof_join.rs b/src/query/service/src/physical_plans/physical_asof_join.rs deleted file mode 100644 index 15bc4a1c56434..0000000000000 --- a/src/query/service/src/physical_plans/physical_asof_join.rs +++ /dev/null @@ -1,425 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use databend_common_exception::ErrorCode; -use databend_common_exception::Result; -use databend_common_expression::types::DataType; -use databend_common_expression::types::NumberScalar; -use databend_common_expression::Scalar; -use databend_common_sql::binder::bind_window_function_info; -use databend_common_sql::binder::WindowFunctionInfo; -use databend_common_sql::binder::WindowOrderByInfo; -use databend_common_sql::optimizer::ir::RelExpr; -use databend_common_sql::optimizer::ir::SExpr; -use databend_common_sql::plans::BoundColumnRef; -use databend_common_sql::plans::ComparisonOp; -use databend_common_sql::plans::ConstantExpr; -use databend_common_sql::plans::FunctionCall; -use databend_common_sql::plans::Join; -use databend_common_sql::plans::LagLeadFunction; -use databend_common_sql::plans::ScalarItem; -use databend_common_sql::plans::WindowFunc; -use databend_common_sql::plans::WindowFuncFrame; -use databend_common_sql::plans::WindowFuncFrameBound; -use databend_common_sql::plans::WindowFuncFrameUnits; -use databend_common_sql::plans::WindowFuncType; -use databend_common_sql::plans::WindowOrderBy; -use databend_common_sql::ColumnBindingBuilder; -use databend_common_sql::ColumnEntry; -use databend_common_sql::ColumnSet; -use databend_common_sql::DerivedColumn; -use databend_common_sql::ScalarExpr; -use databend_common_sql::Visibility; - -use crate::physical_plans::physical_plan::PhysicalPlan; -use crate::physical_plans::physical_plan_builder::PhysicalPlanBuilder; - -impl PhysicalPlanBuilder { - pub async fn build_asof_join( - &mut self, - join: &Join, - s_expr: &SExpr, - required: (ColumnSet, ColumnSet), - mut range_conditions: Vec, - mut other_conditions: Vec, - ) -> Result { - let mut window_index: usize = 0; - - if range_conditions.is_empty() { - return Err(ErrorCode::Internal("Missing inequality condition!")); - } - if range_conditions.len() > 1 { - return Err(ErrorCode::Internal("Multiple inequalities condition!")); - } - let (window_func, right_column, left_type) = - self.bind_window_func(join, s_expr, &range_conditions, &mut other_conditions)?; - let window_plan = self.build_window_plan(&window_func, s_expr, &mut window_index)?; - self.add_range_condition( - &window_func, - window_index, - &mut range_conditions, - right_column, - left_type, - )?; - let mut ss_expr = s_expr.clone(); - ss_expr.children[0] = Arc::new(window_plan); - ss_expr.children[1] = Arc::new(s_expr.child(0)?.clone()); - let left_prop = RelExpr::with_s_expr(ss_expr.child(0)?).derive_relational_prop()?; - let right_prop = RelExpr::with_s_expr(ss_expr.child(1)?).derive_relational_prop()?; - let left_required = required.0.union(&left_prop.used_columns).cloned().collect(); - let right_required = required - .1 - .union(&right_prop.used_columns) - .cloned() - .collect(); - self.build_range_join( - join, - &ss_expr, - left_required, - right_required, - range_conditions, - other_conditions, - ) - .await - } - - fn add_range_condition( - &mut self, - window_func: &WindowFunc, - window_index: usize, - range_conditions: &mut Vec, - right_column: ScalarExpr, - left_type: DataType, - ) -> Result { - let mut folded_args: Vec = Vec::with_capacity(2); - let mut func_name = String::from("eq"); - // Generate a ColumnBinding for each argument of aggregates - let column = ColumnBindingBuilder::new( - window_func.display_name.clone(), - window_index, - Box::new(left_type), - Visibility::Visible, - ) - .build(); - folded_args.push(right_column.clone()); - folded_args.push( - BoundColumnRef { - span: right_column.span(), - column, - } - .into(), - ); - for condition in range_conditions.iter() { - if let ScalarExpr::FunctionCall(func) = condition { - match ComparisonOp::try_from_func_name(func.func_name.as_str()).unwrap() { - ComparisonOp::GTE => { - func_name = String::from("lt"); - } - ComparisonOp::GT => { - func_name = String::from("lte"); - } - ComparisonOp::LT => { - func_name = String::from("gte"); - } - ComparisonOp::LTE => { - func_name = String::from("gt"); - } - _ => unreachable!("must be range condition!"), - } - } - } - range_conditions.push( - FunctionCall { - span: range_conditions[0].span(), - params: vec![], - arguments: folded_args, - func_name, - } - .into(), - ); - Ok(true) - } - - fn bind_window_func( - &mut self, - join: &Join, - s_expr: &SExpr, - range_conditions: &[ScalarExpr], - other_conditions: &mut Vec, - ) -> Result<(WindowFunc, ScalarExpr, DataType)> { - let right_prop = RelExpr::with_s_expr(s_expr.child(0)?).derive_relational_prop()?; - let left_prop = RelExpr::with_s_expr(s_expr.child(1)?).derive_relational_prop()?; - - let mut right_column = range_conditions[0].clone(); - let mut left_column = range_conditions[0].clone(); - let mut order_items: Vec = Vec::with_capacity(range_conditions.len()); - let mut constant_default = ConstantExpr { - span: right_column.span(), - value: Scalar::Null, - }; - for condition in range_conditions.iter() { - if let ScalarExpr::FunctionCall(func) = condition { - if func.arguments.len() == 2 { - for arg in func.arguments.iter() { - if let ScalarExpr::BoundColumnRef(_) = arg { - let asc = - match ComparisonOp::try_from_func_name(func.func_name.as_str()) - .unwrap() - { - ComparisonOp::GT | ComparisonOp::GTE => Ok(Some(true)), - ComparisonOp::LT | ComparisonOp::LTE => Ok(Some(false)), - _ => Err(ErrorCode::Internal("must be range condition!")), - }?; - if arg.used_columns().is_subset(&left_prop.output_columns) { - left_column = arg.clone(); - constant_default.span = left_column.span(); - constant_default.value = left_column - .data_type()? - .remove_nullable() - .infinity() - .unwrap(); - if let Some(false) = asc { - constant_default.value = left_column - .data_type()? - .remove_nullable() - .ninfinity() - .unwrap(); - } - order_items.push(WindowOrderBy { - expr: arg.clone(), - asc, - nulls_first: Some(true), - }); - } - if arg.used_columns().is_subset(&right_prop.output_columns) { - right_column = arg.clone(); - } - } else { - return Err(ErrorCode::Internal( - "Cannot downcast Scalar to BoundColumnRef", - )); - } - } - } - } - } - - let mut partition_items: Vec = Vec::with_capacity(join.equi_conditions.len()); - let mut other_args: Vec = Vec::with_capacity(2); - for condition in join.equi_conditions.iter() { - if matches!(condition.right, ScalarExpr::BoundColumnRef(_)) - && matches!(condition.left, ScalarExpr::BoundColumnRef(_)) - { - partition_items.push(condition.right.clone()); - other_args.clear(); - other_args.push(condition.left.clone()); - other_args.push(condition.right.clone()); - other_conditions.push( - FunctionCall { - span: range_conditions[0].span(), - params: vec![], - arguments: other_args.clone(), - func_name: String::from("eq"), - } - .into(), - ); - } else { - return Err(ErrorCode::Internal( - "Cannot downcast Scalar to BoundColumnRef", - )); - } - } - let func_type = WindowFuncType::LagLead(LagLeadFunction { - is_lag: false, - arg: Box::new(left_column.clone()), - offset: 1, - default: Some(Box::new(constant_default.into())), - return_type: Box::new(left_column.data_type()?.clone()), - }); - let window_func = WindowFunc { - span: range_conditions[0].span(), - display_name: func_type.func_name(), - partition_by: partition_items, - func: func_type, - order_by: order_items, - frame: WindowFuncFrame { - units: WindowFuncFrameUnits::Rows, - start_bound: WindowFuncFrameBound::Following(Some(Scalar::Number( - NumberScalar::UInt64(1), - ))), - end_bound: WindowFuncFrameBound::Following(Some(Scalar::Number( - NumberScalar::UInt64(1), - ))), - }, - }; - Ok((window_func, right_column, left_column.data_type()?.clone())) - } - - fn build_window_plan( - &mut self, - window: &WindowFunc, - s_expr: &SExpr, - window_index: &mut usize, - ) -> Result { - let mut window_args = vec![]; - let window_func_name = window.func.func_name(); - let func = match &window.func { - WindowFuncType::LagLead(ll) => { - let (new_arg, new_default) = - self.replace_lag_lead_args(&mut window_args, &window_func_name, ll)?; - - WindowFuncType::LagLead(LagLeadFunction { - is_lag: ll.is_lag, - arg: Box::new(new_arg), - offset: ll.offset, - default: new_default, - return_type: ll.return_type.clone(), - }) - } - func => func.clone(), - }; - - // resolve partition by - let mut partition_by_items = vec![]; - for (i, part) in window.partition_by.iter().enumerate() { - let part = part.clone(); - let name = format!("{window_func_name}_part_{i}"); - let replaced_part = self.replace_expr(&name, &part)?; - partition_by_items.push(ScalarItem { - index: replaced_part.column.index, - scalar: part, - }); - } - - // resolve order by - let mut order_by_items = vec![]; - for (i, order) in window.order_by.iter().enumerate() { - let order_expr = order.expr.clone(); - let name = format!("{window_func_name}_order_{i}"); - let replaced_order = self.replace_expr(&name, &order_expr)?; - order_by_items.push(WindowOrderByInfo { - order_by_item: ScalarItem { - index: replaced_order.column.index, - scalar: order_expr, - }, - asc: order.asc, - nulls_first: order.nulls_first, - }); - } - - let index = self.metadata.write().add_derived_column( - window.display_name.clone(), - window.func.return_type(), - None, - ); - - *window_index = index; - - let window_info = WindowFunctionInfo { - display_name: window.display_name.clone(), - span: window.span, - index, - partition_by_items, - func, - arguments: window_args, - order_by_items, - frame: window.frame.clone(), - }; - bind_window_function_info(&self.ctx, &window_info, s_expr.child(1)?.clone()) - } - - fn replace_lag_lead_args( - &mut self, - window_args: &mut Vec, - window_func_name: &String, - f: &LagLeadFunction, - ) -> Result<(ScalarExpr, Option>)> { - let arg = (*f.arg).clone(); - let name = format!("{window_func_name}_arg"); - let replaced_arg = self.replace_expr(&name, &arg)?; - window_args.push(ScalarItem { - scalar: arg, - index: replaced_arg.column.index, - }); - let new_default = match &f.default { - None => None, - Some(d) => { - let d = (**d).clone(); - let name = format!("{window_func_name}_default_value"); - let replaced_default = self.replace_expr(&name, &d)?; - window_args.push(ScalarItem { - scalar: d, - index: replaced_default.column.index, - }); - Some(Box::new(replaced_default.into())) - } - }; - - Ok((replaced_arg.into(), new_default)) - } - - fn replace_expr(&self, name: &str, arg: &ScalarExpr) -> Result { - if let ScalarExpr::BoundColumnRef(col) = &arg { - Ok(col.clone()) - } else { - for entry in self.metadata.read().columns() { - if let ColumnEntry::DerivedColumn(DerivedColumn { - scalar_expr, - alias, - column_index, - data_type, - .. - }) = entry - { - if scalar_expr.as_ref() == Some(arg) { - // Generate a ColumnBinding for each argument of aggregates - let column = ColumnBindingBuilder::new( - alias.to_string(), - *column_index, - Box::new(data_type.clone()), - Visibility::Visible, - ) - .build(); - - return Ok(BoundColumnRef { - span: arg.span(), - column, - }); - } - } - } - let ty = arg.data_type()?; - let index = self.metadata.write().add_derived_column( - name.to_string(), - ty.clone(), - Some(arg.clone()), - ); - - // Generate a ColumnBinding for each argument of aggregates - let column = ColumnBindingBuilder::new( - name.to_string(), - index, - Box::new(ty), - Visibility::Visible, - ) - .build(); - Ok(BoundColumnRef { - span: arg.span(), - column, - }) - } - } -} diff --git a/src/query/service/src/physical_plans/physical_hash_join.rs b/src/query/service/src/physical_plans/physical_hash_join.rs index 64d15be1f7743..5221b251efee1 100644 --- a/src/query/service/src/physical_plans/physical_hash_join.rs +++ b/src/query/service/src/physical_plans/physical_hash_join.rs @@ -505,8 +505,8 @@ impl PhysicalPlanBuilder { left_required: ColumnSet, right_required: ColumnSet, ) -> Result<(PhysicalPlan, PhysicalPlan)> { - let probe_side = self.build(s_expr.child(0)?, left_required).await?; - let build_side = self.build(s_expr.child(1)?, right_required).await?; + let probe_side = self.build(s_expr.left_child(), left_required).await?; + let build_side = self.build(s_expr.right_child(), right_required).await?; Ok((probe_side, build_side)) } @@ -537,7 +537,7 @@ impl PhysicalPlanBuilder { /// * `Result` - The prepared schema for the build side pub fn prepare_build_schema( &self, - join_type: &JoinType, + join_type: JoinType, build_side: &PhysicalPlan, ) -> Result { match join_type { @@ -577,7 +577,7 @@ impl PhysicalPlanBuilder { /// * `Result` - The prepared schema for the probe side pub fn prepare_probe_schema( &self, - join_type: &JoinType, + join_type: JoinType, probe_side: &PhysicalPlan, ) -> Result { match join_type { @@ -1252,8 +1252,8 @@ impl PhysicalPlanBuilder { self.unify_keys(&mut probe_side, &mut build_side)?; // Step 4: Prepare schemas for both sides - let build_schema = self.prepare_build_schema(&join.join_type, &build_side)?; - let probe_schema = self.prepare_probe_schema(&join.join_type, &probe_side)?; + let build_schema = self.prepare_build_schema(join.join_type, &build_side)?; + let probe_schema = self.prepare_probe_schema(join.join_type, &probe_side)?; // Step 5: Process join conditions let ( diff --git a/src/query/service/src/physical_plans/physical_join.rs b/src/query/service/src/physical_plans/physical_join.rs index 1297edca161b8..2adf8d491a30f 100644 --- a/src/query/service/src/physical_plans/physical_join.rs +++ b/src/query/service/src/physical_plans/physical_join.rs @@ -18,6 +18,7 @@ use databend_common_sql::binder::JoinPredicate; use databend_common_sql::optimizer::ir::RelExpr; use databend_common_sql::optimizer::ir::RelationalProperty; use databend_common_sql::optimizer::ir::SExpr; +use databend_common_sql::plans::FunctionCall; use databend_common_sql::plans::Join; use databend_common_sql::plans::JoinType; use databend_common_sql::ColumnSet; @@ -27,31 +28,26 @@ use crate::physical_plans::explain::PlanStatsInfo; use crate::physical_plans::physical_plan::PhysicalPlan; use crate::physical_plans::PhysicalPlanBuilder; -pub enum PhysicalJoinType { +enum PhysicalJoinType { Hash, // The first arg is range conditions, the second arg is other conditions RangeJoin(Vec, Vec), - AsofJoin(Vec, Vec), } // Choose physical join type by join conditions -pub fn physical_join(join: &Join, s_expr: &SExpr) -> Result { - let check_asof = matches!( - join.join_type, - JoinType::Asof | JoinType::LeftAsof | JoinType::RightAsof - ); - +fn physical_join(join: &Join, s_expr: &SExpr) -> Result { if join.equi_conditions.is_empty() && join.join_type.is_any_join() { return Err(ErrorCode::SemanticError( "ANY JOIN only supports equality-based hash joins", )); } - if !join.equi_conditions.is_empty() && !check_asof { + + if !join.equi_conditions.is_empty() { // Contain equi condition, use hash join return Ok(PhysicalJoinType::Hash); } - if join.build_side_cache_info.is_some() && !check_asof { + if join.build_side_cache_info.is_some() { // There is a build side cache, use hash join. return Ok(PhysicalJoinType::Hash); } @@ -59,9 +55,8 @@ pub fn physical_join(join: &Join, s_expr: &SExpr) -> Result { let left_rel_expr = RelExpr::with_s_expr(s_expr.child(0)?); let right_rel_expr = RelExpr::with_s_expr(s_expr.child(1)?); let right_stat_info = right_rel_expr.derive_cardinality()?; - if !check_asof - && (matches!(right_stat_info.statistics.precise_cardinality, Some(1)) - || right_stat_info.cardinality == 1.0) + if matches!(right_stat_info.statistics.precise_cardinality, Some(1)) + || right_stat_info.cardinality == 1.0 { // If the output rows of build side is equal to 1, we use CROSS JOIN + FILTER instead of RANGE JOIN. return Ok(PhysicalJoinType::Hash); @@ -86,12 +81,6 @@ pub fn physical_join(join: &Join, s_expr: &SExpr) -> Result { other_conditions, )); } - if check_asof { - return Ok(PhysicalJoinType::AsofJoin( - range_conditions, - other_conditions, - )); - } // Leverage hash join to execute nested loop join Ok(PhysicalJoinType::Hash) } @@ -175,27 +164,72 @@ impl PhysicalPlanBuilder { // 2. Build physical plan. // Choose physical join type by join conditions - let physical_join = physical_join(join, s_expr)?; - match physical_join { - PhysicalJoinType::Hash => { - self.build_hash_join( - join, - s_expr, - required, - others_required, - left_required, - right_required, - stat_info, + if matches!( + join.join_type, + JoinType::Asof | JoinType::LeftAsof | JoinType::RightAsof + ) { + let left_rel_expr = RelExpr::with_s_expr(s_expr.left_child()); + let right_rel_expr = RelExpr::with_s_expr(s_expr.right_child()); + let left_prop = left_rel_expr.derive_relational_prop()?; + let right_prop = right_rel_expr.derive_relational_prop()?; + let mut range_conditions = vec![]; + let mut other_conditions = vec![]; + + for condition in join.equi_conditions.iter().cloned() { + other_conditions.push( + FunctionCall { + span: condition.left.span(), + func_name: "eq".to_string(), + params: vec![], + arguments: vec![condition.left, condition.right], + } + .into(), + ); + } + for condition in join.non_equi_conditions.iter() { + check_condition( + condition, + &left_prop, + &right_prop, + &mut range_conditions, + &mut other_conditions, ) - .await } - PhysicalJoinType::AsofJoin(range, other) => { - self.build_asof_join(join, s_expr, (left_required, right_required), range, other) + + self.build_range_join( + join.join_type, + s_expr, + left_required, + right_required, + range_conditions, + other_conditions, + ) + .await + } else { + match physical_join(join, s_expr)? { + PhysicalJoinType::Hash => { + self.build_hash_join( + join, + s_expr, + required, + others_required, + left_required, + right_required, + stat_info, + ) .await - } - PhysicalJoinType::RangeJoin(range, other) => { - self.build_range_join(join, s_expr, left_required, right_required, range, other) + } + PhysicalJoinType::RangeJoin(range, other) => { + self.build_range_join( + join.join_type, + s_expr, + left_required, + right_required, + range, + other, + ) .await + } } } } diff --git a/src/query/service/src/physical_plans/physical_range_join.rs b/src/query/service/src/physical_plans/physical_range_join.rs index 377f172c9fb55..d173b3086d0e3 100644 --- a/src/query/service/src/physical_plans/physical_range_join.rs +++ b/src/query/service/src/physical_plans/physical_range_join.rs @@ -30,7 +30,6 @@ use databend_common_sql::binder::JoinPredicate; use databend_common_sql::optimizer::ir::RelExpr; use databend_common_sql::optimizer::ir::RelationalProperty; use databend_common_sql::optimizer::ir::SExpr; -use databend_common_sql::plans::Join; use databend_common_sql::plans::JoinType; use databend_common_sql::ColumnSet; use databend_common_sql::ScalarExpr; @@ -203,15 +202,15 @@ pub struct RangeJoinCondition { impl PhysicalPlanBuilder { pub async fn build_range_join( &mut self, - join: &Join, + join_type: JoinType, s_expr: &SExpr, left_required: ColumnSet, right_required: ColumnSet, mut range_conditions: Vec, mut other_conditions: Vec, ) -> Result { - let left_prop = RelExpr::with_s_expr(s_expr.child(1)?).derive_relational_prop()?; - let right_prop = RelExpr::with_s_expr(s_expr.child(0)?).derive_relational_prop()?; + let left_prop = RelExpr::with_s_expr(s_expr.right_child()).derive_relational_prop()?; + let right_prop = RelExpr::with_s_expr(s_expr.left_child()).derive_relational_prop()?; debug_assert!(!range_conditions.is_empty()); @@ -230,8 +229,8 @@ impl PhysicalPlanBuilder { .build_join_sides(s_expr, left_required, right_required) .await?; - let left_schema = self.prepare_probe_schema(&join.join_type, &left_side)?; - let right_schema = self.prepare_build_schema(&join.join_type, &right_side)?; + let left_schema = self.prepare_probe_schema(join_type, &left_side)?; + let right_schema = self.prepare_build_schema(join_type, &right_side)?; let mut output_schema = Vec::clone(left_schema.fields()); output_schema.extend_from_slice(right_schema.fields()); @@ -265,7 +264,7 @@ impl PhysicalPlanBuilder { .iter() .map(|scalar| resolve_scalar(scalar, &merged_schema)) .collect::>()?, - join_type: join.join_type.clone(), + join_type: join_type.clone(), range_join_type, output_schema: Arc::new(DataSchema::new(output_schema)), stat_info: Some(self.build_plan_stat_info(s_expr)?), diff --git a/src/query/sql/src/planner/binder/bind_table_reference/bind_asof_join.rs b/src/query/sql/src/planner/binder/bind_table_reference/bind_asof_join.rs new file mode 100644 index 0000000000000..fb57cb2862cd7 --- /dev/null +++ b/src/query/sql/src/planner/binder/bind_table_reference/bind_asof_join.rs @@ -0,0 +1,264 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::types::DataType; +use databend_common_expression::types::NumberScalar; +use databend_common_expression::Scalar; + +use crate::binder::bind_window_function_info; +use crate::binder::window::WindowRewriter; +use crate::binder::JoinPredicate; +use crate::binder::Visibility; +use crate::optimizer::ir::RelExpr; +use crate::optimizer::ir::RelationalProperty; +use crate::optimizer::ir::SExpr; +use crate::planner::binder::Binder; +use crate::plans::BoundColumnRef; +use crate::plans::ComparisonOp; +use crate::plans::ConstantExpr; +use crate::plans::FunctionCall; +use crate::plans::Join; +use crate::plans::JoinEquiCondition; +use crate::plans::LagLeadFunction; +use crate::plans::ScalarExpr; +use crate::plans::WindowFunc; +use crate::plans::WindowFuncFrame; +use crate::plans::WindowFuncFrameBound; +use crate::plans::WindowFuncFrameUnits; +use crate::plans::WindowFuncType; +use crate::plans::WindowOrderBy; +use crate::BindContext; +use crate::ColumnBindingBuilder; + +impl Binder { + pub(super) fn rewrite_asof(&self, mut join: Join, left: SExpr, right: SExpr) -> Result { + let left_prop = left.derive_relational_prop()?; + let right_prop = right.derive_relational_prop()?; + + let mut range_condition = None; + for condition in join.non_equi_conditions.iter() { + if check_range_condition(condition, &left_prop, &right_prop) { + if range_condition.is_none() { + range_condition = Some(condition); + } else { + return Err(ErrorCode::Internal("Multiple inequalities condition!")); + } + } + } + let range_condition = match range_condition { + Some(range_condition) => range_condition, + None => { + return Err(ErrorCode::Internal("Missing inequality condition!")); + } + }; + + let (window, right_column, left_type) = + self.bind_window_func(&join, &left, &right, range_condition)?; + + let mut bind_ctx = BindContext::new(); + let mut rewriter = WindowRewriter::new(&mut bind_ctx, self.metadata.clone()); + + let window_func = rewriter.replace_window_function(&window)?; + let window_info = bind_ctx + .windows + .get_window_info(&window_func.display_name) + .unwrap(); + + let folded_args = [ + right_column.clone(), + BoundColumnRef { + span: right_column.span(), + column: ColumnBindingBuilder::new( + window_func.display_name.clone(), + window_info.index, + Box::new(left_type), + Visibility::Visible, + ) + .build(), + } + .into(), + ]; + + for condition in join.equi_conditions.iter_mut() { + std::mem::swap(&mut condition.left, &mut condition.right) + } + + if let ScalarExpr::FunctionCall(func) = range_condition.clone() { + let func_name = + match ComparisonOp::try_from_func_name(func.func_name.as_str()).unwrap() { + ComparisonOp::GTE => "lt", + ComparisonOp::GT => "lte", + ComparisonOp::LT => "gte", + ComparisonOp::LTE => "gt", + _ => unreachable!("must be range condition!"), + } + .to_string(); + join.non_equi_conditions.push( + FunctionCall { + span: range_condition.span(), + params: vec![], + arguments: folded_args.to_vec(), + func_name, + } + .into(), + ); + } else { + let [left, right] = folded_args; + join.equi_conditions.push(JoinEquiCondition { + left, + right, + is_null_equal: false, + }); + }; + + let window_plan = bind_window_function_info(&self.ctx, window_info, right)?; + Ok(SExpr::create_binary( + Arc::new(join.into()), + Arc::new(window_plan), + Arc::new(left), + )) + } + + fn bind_window_func( + &self, + join: &Join, + left: &SExpr, + right: &SExpr, + range_condition: &ScalarExpr, + ) -> Result<(WindowFunc, ScalarExpr, DataType)> { + let right_prop = RelExpr::with_s_expr(left).derive_relational_prop()?; + let left_prop = RelExpr::with_s_expr(right).derive_relational_prop()?; + + let mut right_column = range_condition.clone(); + let mut left_column = range_condition.clone(); + let mut order_items: Vec = Vec::with_capacity(1); + let mut constant_default = ConstantExpr { + span: right_column.span(), + value: Scalar::Null, + }; + if let ScalarExpr::FunctionCall(func) = range_condition + && func.arguments.len() == 2 + { + for arg in func.arguments.iter() { + if let ScalarExpr::BoundColumnRef(_) = arg { + let asc = + match ComparisonOp::try_from_func_name(func.func_name.as_str()).unwrap() { + ComparisonOp::GT | ComparisonOp::GTE => Ok(Some(true)), + ComparisonOp::LT | ComparisonOp::LTE => Ok(Some(false)), + _ => Err(ErrorCode::Internal("must be range condition!")), + }?; + if arg.used_columns().is_subset(&left_prop.output_columns) { + left_column = arg.clone(); + constant_default.span = left_column.span(); + constant_default.value = left_column + .data_type()? + .remove_nullable() + .infinity() + .unwrap(); + if asc == Some(false) { + constant_default.value = left_column + .data_type()? + .remove_nullable() + .ninfinity() + .unwrap(); + } + order_items.push(WindowOrderBy { + expr: arg.clone(), + asc, + nulls_first: Some(true), + }); + } + if arg.used_columns().is_subset(&right_prop.output_columns) { + right_column = arg.clone(); + } + } else { + return Err(ErrorCode::Internal( + "Cannot downcast Scalar to BoundColumnRef", + )); + } + } + } + + let mut partition_items: Vec = Vec::with_capacity(join.equi_conditions.len()); + for condition in join.equi_conditions.iter() { + if matches!(condition.right, ScalarExpr::BoundColumnRef(_)) + && matches!(condition.left, ScalarExpr::BoundColumnRef(_)) + { + partition_items.push(condition.right.clone()); + } else { + return Err(ErrorCode::Internal( + "Cannot downcast Scalar to BoundColumnRef", + )); + } + } + let func_type = WindowFuncType::LagLead(LagLeadFunction { + is_lag: false, + arg: Box::new(left_column.clone()), + offset: 1, + default: Some(Box::new(constant_default.into())), + return_type: Box::new(left_column.data_type()?.clone()), + }); + + let window_func = WindowFunc { + span: range_condition.span(), + display_name: func_type.func_name(), + partition_by: partition_items, + func: func_type, + order_by: order_items, + frame: WindowFuncFrame { + units: WindowFuncFrameUnits::Rows, + start_bound: WindowFuncFrameBound::Following(Some(Scalar::Number( + NumberScalar::UInt64(1), + ))), + end_bound: WindowFuncFrameBound::Following(Some(Scalar::Number( + NumberScalar::UInt64(1), + ))), + }, + }; + Ok((window_func, right_column, left_column.data_type()?.clone())) + } +} + +fn check_range_condition( + expr: &ScalarExpr, + left_prop: &RelationalProperty, + right_prop: &RelationalProperty, +) -> bool { + let ScalarExpr::FunctionCall(FunctionCall { + func_name, + arguments, + .. + }) = expr + else { + return false; + }; + if !matches!(func_name.as_str(), "gt" | "lt" | "gte" | "lte") { + return false; + } + let [left, right] = arguments.as_slice() else { + unreachable!() + }; + + matches!( + ( + JoinPredicate::new(left, left_prop, right_prop), + JoinPredicate::new(right, left_prop, right_prop), + ), + (JoinPredicate::Left(_), JoinPredicate::Right(_)) + ) +} diff --git a/src/query/sql/src/planner/binder/bind_table_reference/bind_join.rs b/src/query/sql/src/planner/binder/bind_table_reference/bind_join.rs index e4bd537f2b21f..5cddc074bd408 100644 --- a/src/query/sql/src/planner/binder/bind_table_reference/bind_join.rs +++ b/src/query/sql/src/planner/binder/bind_table_reference/bind_join.rs @@ -152,7 +152,7 @@ impl Binder { let join_type = join_type(&join.op); let s_expr = self.bind_join_with_type( - join_type.clone(), + join_type, join_conditions, left_child, right_child, @@ -161,7 +161,7 @@ impl Binder { )?; let mut bind_context = join_bind_context( - &join_type, + join_type, bind_context, left_context.clone(), right_context.clone(), @@ -212,19 +212,15 @@ impl Binder { let join_type = join_type(&join_op); let s_expr = self.bind_join_with_type( - join_type.clone(), + join_type, join_conditions, left_child, right_child, vec![], None, )?; - let bind_context = join_bind_context( - &join_type, - bind_context, - left_context.clone(), - right_context, - ); + let bind_context = + join_bind_context(join_type, bind_context, left_context.clone(), right_context); Ok((s_expr, bind_context)) } @@ -354,31 +350,31 @@ impl Binder { pub(crate) fn bind_join_with_type( &mut self, mut join_type: JoinType, - join_conditions: JoinConditions, + JoinConditions { + mut left_conditions, + mut right_conditions, + mut non_equi_conditions, + other_conditions, + }: JoinConditions, mut left_child: SExpr, mut right_child: SExpr, mut is_null_equal: Vec, build_side_cache_info: Option, ) -> Result { - let mut left_conditions = join_conditions.left_conditions; - let mut right_conditions = join_conditions.right_conditions; - let mut non_equi_conditions = join_conditions.non_equi_conditions; - let other_conditions = join_conditions.other_conditions; - - if join_type == JoinType::Cross - && (!left_conditions.is_empty() || !right_conditions.is_empty()) - { - return Err(ErrorCode::SemanticError( - "Join conditions should be empty in cross join", - )); - } - if matches!( - join_type, + match join_type { + JoinType::Cross if !left_conditions.is_empty() || !right_conditions.is_empty() => { + return Err(ErrorCode::SemanticError( + "Join conditions should be empty in cross join", + )); + } JoinType::Asof | JoinType::LeftAsof | JoinType::RightAsof - ) && non_equi_conditions.is_empty() - { - return Err(ErrorCode::SemanticError("Missing inequality condition!")); + if non_equi_conditions.is_empty() => + { + return Err(ErrorCode::SemanticError("Missing inequality condition!")); + } + _ => (), } + self.push_down_other_conditions( &join_type, &mut left_child, @@ -456,11 +452,19 @@ impl Binder { single_to_inner: None, build_side_cache_info, }; - Ok(SExpr::create_binary( - Arc::new(logical_join.into()), - Arc::new(left_child), - Arc::new(right_child), - )) + + if matches!( + logical_join.join_type, + JoinType::Asof | JoinType::LeftAsof | JoinType::RightAsof + ) { + self.rewrite_asof(logical_join, left_child, right_child) + } else { + Ok(SExpr::create_binary( + Arc::new(logical_join.into()), + Arc::new(left_child), + Arc::new(right_child), + )) + } } fn push_down_other_conditions( @@ -1038,7 +1042,7 @@ fn join_type(join_type: &JoinOperator) -> JoinType { } fn join_bind_context( - join_type: &JoinType, + join_type: JoinType, bind_context: BindContext, left_context: BindContext, right_context: BindContext, diff --git a/src/query/sql/src/planner/binder/bind_table_reference/mod.rs b/src/query/sql/src/planner/binder/bind_table_reference/mod.rs index 75a42e6f60ddf..98f8fdb344157 100644 --- a/src/query/sql/src/planner/binder/bind_table_reference/mod.rs +++ b/src/query/sql/src/planner/binder/bind_table_reference/mod.rs @@ -13,6 +13,7 @@ // limitations under the License. mod bind; +mod bind_asof_join; mod bind_cte; mod bind_join; mod bind_location; diff --git a/src/query/sql/src/planner/binder/window.rs b/src/query/sql/src/planner/binder/window.rs index db6140b55129d..3ff0a9d715d0d 100644 --- a/src/query/sql/src/planner/binder/window.rs +++ b/src/query/sql/src/planner/binder/window.rs @@ -240,7 +240,7 @@ impl<'a> WindowRewriter<'a> { } } - fn replace_window_function(&mut self, window: &WindowFunc) -> Result { + pub fn replace_window_function(&mut self, window: &WindowFunc) -> Result { let mut replaced_partition_items: Vec = Vec::with_capacity(window.partition_by.len()); let mut replaced_order_by_items: Vec = diff --git a/src/query/sql/src/planner/optimizer/optimizers/hyper_dp/join_node.rs b/src/query/sql/src/planner/optimizer/optimizers/hyper_dp/join_node.rs index adb26fc7b2320..0f08abd205ff7 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/hyper_dp/join_node.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/hyper_dp/join_node.rs @@ -88,7 +88,7 @@ impl JoinNode { vec![], ), non_equi_conditions: vec![], - join_type: self.join_type.clone(), + join_type: self.join_type, marker_index: None, from_correlated_subquery: false, need_hold_hash_table: false, diff --git a/src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/flatten_plan.rs b/src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/flatten_plan.rs index 16a8878897b8f..54974ec93b5fa 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/flatten_plan.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/flatten_plan.rs @@ -441,7 +441,7 @@ impl SubqueryDecorrelatorOptimizer { vec![], ), non_equi_conditions, - join_type: join.join_type.clone(), + join_type: join.join_type, marker_index: join.marker_index, from_correlated_subquery: false, need_hold_hash_table: false, diff --git a/src/query/sql/src/planner/optimizer/optimizers/rule/join_rules/push_down_filter_join/outer_join_to_inner_join.rs b/src/query/sql/src/planner/optimizer/optimizers/rule/join_rules/push_down_filter_join/outer_join_to_inner_join.rs index 2a3705d2a2f38..c955c3aa309d7 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/rule/join_rules/push_down_filter_join/outer_join_to_inner_join.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/rule/join_rules/push_down_filter_join/outer_join_to_inner_join.rs @@ -100,7 +100,7 @@ pub fn outer_join_to_inner_join(s_expr: &SExpr, metadata: MetadataRef) -> Result } } - let original_join_type = join.join_type.clone(); + let original_join_type = join.join_type; join.join_type = eliminate_outer_join_type(join.join_type, can_filter_left_null, can_filter_right_null); if join.join_type == original_join_type { @@ -111,7 +111,7 @@ pub fn outer_join_to_inner_join(s_expr: &SExpr, metadata: MetadataRef) -> Result original_join_type, JoinType::LeftSingle | JoinType::RightSingle ) { - join.join_type = original_join_type.clone(); + join.join_type = original_join_type; join.single_to_inner = Some(original_join_type); } diff --git a/src/query/sql/src/planner/plans/join.rs b/src/query/sql/src/planner/plans/join.rs index fe4ada479b32a..a3a80c0e263a1 100644 --- a/src/query/sql/src/planner/plans/join.rs +++ b/src/query/sql/src/planner/plans/join.rs @@ -42,7 +42,7 @@ use crate::plans::ScalarExpr; use crate::ColumnSet; use crate::IndexType; -#[derive(Clone, Debug, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] +#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] pub enum JoinType { Cross, Inner, @@ -89,7 +89,7 @@ impl JoinType { JoinType::RightMark => JoinType::LeftMark, JoinType::RightAsof => JoinType::LeftAsof, JoinType::LeftAsof => JoinType::RightAsof, - _ => self.clone(), + _ => *self, } } diff --git a/tests/sqllogictests/suites/duckdb/join/asof/test_asof_join_double.test b/tests/sqllogictests/suites/duckdb/join/asof/test_asof_join_double.test index 6df546a6db2a0..4c13a74d51312 100644 --- a/tests/sqllogictests/suites/duckdb/join/asof/test_asof_join_double.test +++ b/tests/sqllogictests/suites/duckdb/join/asof/test_asof_join_double.test @@ -44,7 +44,7 @@ INSERT INTO probes VALUES ; # INNER Window version -query III nosort inner_equality +query IRI nosort inner_equality SELECT p.key, p.ts, e.value FROM probes p From 42751f5fef36bc5b3b9c5195c8d930292141c361 Mon Sep 17 00:00:00 2001 From: coldWater Date: Fri, 7 Nov 2025 16:43:35 +0800 Subject: [PATCH 2/5] refine --- .../src/physical_plans/physical_hash_join.rs | 10 ++-- .../src/physical_plans/physical_range_join.rs | 4 +- .../processors/transforms/hash_join/desc.rs | 4 +- .../hash_join/hash_join_build_state.rs | 2 +- .../hash_join/hash_join_probe_state.rs | 2 +- .../transforms/hash_join/hash_join_spiller.rs | 18 +++---- .../transforms/hash_join/hash_join_state.rs | 2 +- .../transforms/hash_join/spill_common.rs | 2 +- .../new_hash_join/grace/grace_join.rs | 4 +- .../transforms/range_join/range_join_state.rs | 2 +- .../push_down_filter_join_test.rs | 2 +- src/query/sql/src/planner/binder/aggregate.rs | 50 +++++++------------ .../planner/binder/bind_mutation/update.rs | 9 ++-- .../planner/binder/bind_query/bind_value.rs | 30 ++--------- .../binder/bind_table_reference/bind_join.rs | 9 ++-- .../bind_table_function.rs | 27 +++++----- src/query/sql/src/planner/binder/binder.rs | 10 ++-- src/query/sql/src/planner/binder/project.rs | 8 ++- .../sql/src/planner/binder/project_set.rs | 9 ++-- src/query/sql/src/planner/binder/select.rs | 1 - src/query/sql/src/planner/binder/sort.rs | 1 - src/query/sql/src/planner/binder/table.rs | 7 +-- src/query/sql/src/planner/binder/window.rs | 18 +++---- .../sql/src/planner/metadata/metadata.rs | 12 +---- .../operator/decorrelate/decorrelate.rs | 2 - .../operator/decorrelate/flatten_plan.rs | 16 +++--- .../decorrelate/subquery_decorrelator.rs | 11 ++-- .../operator/filter/pull_up_filter.rs | 1 - .../rule/agg_rules/rule_eager_aggregation.rs | 4 -- .../rule_merge_filter_into_mutation.rs | 9 ++-- .../semantic/async_function_rewriter.rs | 10 ++-- .../sql/src/planner/semantic/udf_rewriter.rs | 18 +++---- 32 files changed, 115 insertions(+), 199 deletions(-) diff --git a/src/query/service/src/physical_plans/physical_hash_join.rs b/src/query/service/src/physical_plans/physical_hash_join.rs index 5221b251efee1..8750c31799b0e 100644 --- a/src/query/service/src/physical_plans/physical_hash_join.rs +++ b/src/query/service/src/physical_plans/physical_hash_join.rs @@ -240,14 +240,14 @@ impl IPhysicalPlan for HashJoin { probe_keys: self.probe_keys.clone(), is_null_equal: self.is_null_equal.clone(), non_equi_conditions: self.non_equi_conditions.clone(), - join_type: self.join_type.clone(), + join_type: self.join_type, marker_index: self.marker_index, from_correlated_subquery: self.from_correlated_subquery, probe_to_build: self.probe_to_build.clone(), output_schema: self.output_schema.clone(), need_hold_hash_table: self.need_hold_hash_table, stat_info: self.stat_info.clone(), - single_to_inner: self.single_to_inner.clone(), + single_to_inner: self.single_to_inner, build_side_cache_info: self.build_side_cache_info.clone(), runtime_filter: self.runtime_filter.clone(), broadcast_id: self.broadcast_id, @@ -445,7 +445,7 @@ impl HashJoin { build_input.clone(), probe_input.clone(), joined_output.clone(), - factory.create_hash_join(self.join_type.clone(), 0)?, + factory.create_hash_join(self.join_type, 0)?, stage_sync_barrier.clone(), self.projections.clone(), rf_desc.clone(), @@ -1210,7 +1210,7 @@ impl PhysicalPlanBuilder { probe_projections, build: build_side, probe: probe_side, - join_type: join.join_type.clone(), + join_type: join.join_type, build_keys: right_join_conditions, probe_keys: left_join_conditions, is_null_equal, @@ -1222,7 +1222,7 @@ impl PhysicalPlanBuilder { output_schema, need_hold_hash_table: join.need_hold_hash_table, stat_info: Some(stat_info), - single_to_inner: join.single_to_inner.clone(), + single_to_inner: join.single_to_inner, build_side_cache_info, runtime_filter, broadcast_id, diff --git a/src/query/service/src/physical_plans/physical_range_join.rs b/src/query/service/src/physical_plans/physical_range_join.rs index d173b3086d0e3..a20dda4b0698a 100644 --- a/src/query/service/src/physical_plans/physical_range_join.rs +++ b/src/query/service/src/physical_plans/physical_range_join.rs @@ -133,7 +133,7 @@ impl IPhysicalPlan for RangeJoin { right: right_child, conditions: self.conditions.clone(), other_conditions: self.other_conditions.clone(), - join_type: self.join_type.clone(), + join_type: self.join_type, range_join_type: self.range_join_type.clone(), output_schema: self.output_schema.clone(), stat_info: self.stat_info.clone(), @@ -264,7 +264,7 @@ impl PhysicalPlanBuilder { .iter() .map(|scalar| resolve_scalar(scalar, &merged_schema)) .collect::>()?, - join_type: join_type.clone(), + join_type, range_join_type, output_schema: Arc::new(DataSchema::new(output_schema)), stat_info: Some(self.build_plan_stat_info(s_expr)?), diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs index ee191e992b35e..ad3d09f1308a3 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs @@ -118,7 +118,7 @@ impl HashJoinDesc { .collect(); Ok(HashJoinDesc { - join_type: join.join_type.clone(), + join_type: join.join_type, build_keys, probe_keys, is_null_equal: join.is_null_equal.clone(), @@ -128,7 +128,7 @@ impl HashJoinDesc { // marker_index: join.marker_index, }, from_correlated_subquery: join.from_correlated_subquery, - single_to_inner: join.single_to_inner.clone(), + single_to_inner: join.single_to_inner, runtime_filter: (&join.runtime_filter).into(), probe_to_build: join.probe_to_build.clone(), build_projection: join.build_projections.clone(), diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs index 6fe0ce8beca7e..cab6108169932 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs @@ -870,7 +870,7 @@ impl HashJoinBuildState { } pub(crate) fn join_type(&self) -> JoinType { - self.hash_join_state.hash_join_desc.join_type.clone() + self.hash_join_state.hash_join_desc.join_type } pub fn runtime_filter_desc(&self) -> &[RuntimeFilterDesc] { diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs index 7dfc2fc93a30b..1632c7dab8371 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs @@ -784,6 +784,6 @@ impl HashJoinProbeState { } pub(crate) fn join_type(&self) -> JoinType { - self.hash_join_state.hash_join_desc.join_type.clone() + self.hash_join_state.hash_join_desc.join_type } } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_spiller.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_spiller.rs index 6db28a7809670..957b2a780d72c 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_spiller.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_spiller.rs @@ -123,7 +123,7 @@ impl HashJoinSpiller { } for data_block in data_blocks { - let mut hashes = self.get_hashes(&data_block, &self.join_type)?; + let mut hashes = self.get_hashes(&data_block, self.join_type)?; for hash in hashes.iter_mut() { *hash = Self::get_partition_id(*hash, self.spill_partition_bits as u64); @@ -143,10 +143,10 @@ impl HashJoinSpiller { return Ok(()); } - let join_type = self.join_type.clone(); + let join_type = self.join_type; let data_block = DataBlock::concat(&data_blocks)?; let partition_data_blocks = - self.partition_data_block(&data_block, &join_type, self.spill_partition_bits)?; + self.partition_data_block(&data_block, join_type, self.spill_partition_bits)?; for (partition_id, data_block) in partition_data_blocks.into_iter().enumerate() { if !data_block.is_empty() { self.partition_buffer @@ -184,7 +184,7 @@ impl HashJoinSpiller { let mut partitions_writer = HashMap::::new(); for data_block in data_blocks { - let mut hashes = self.get_hashes(&data_block, &self.join_type)?; + let mut hashes = self.get_hashes(&data_block, self.join_type)?; for hash in hashes.iter_mut() { *hash = Self::get_partition_id(*hash, self.spill_partition_bits as u64); @@ -249,14 +249,14 @@ impl HashJoinSpiller { data_blocks: &[DataBlock], partition_need_to_spill: Option<&HashSet>, ) -> Result> { - let join_type = self.join_type.clone(); + let join_type = self.join_type; let mut unspilled_data_blocks = vec![]; let data_block = DataBlock::concat(data_blocks)?; let fetch_option = PartitionBufferFetchOption::PickPartitionWithThreshold(self.partition_threshold); for (partition_id, data_block) in self - .partition_data_block(&data_block, &join_type, self.spill_partition_bits)? + .partition_data_block(&data_block, join_type, self.spill_partition_bits)? .into_iter() .enumerate() { @@ -343,10 +343,10 @@ impl HashJoinSpiller { fn partition_data_block( &mut self, data_block: &DataBlock, - join_type: &JoinType, + join_type: JoinType, partition_bits: usize, ) -> Result> { - if join_type == &JoinType::Cross { + if join_type == JoinType::Cross { Ok(vec![data_block.clone()]) } else { let mut hashes = self.get_hashes(data_block, join_type)?; @@ -364,7 +364,7 @@ impl HashJoinSpiller { } // Get all hashes for build input data. - fn get_hashes(&self, data_block: &DataBlock, join_type: &JoinType) -> Result> { + fn get_hashes(&self, data_block: &DataBlock, join_type: JoinType) -> Result> { let mut hashes = Vec::with_capacity(data_block.num_rows()); get_hashes( &self.func_ctx, diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs index 72b9ca5d0f16e..d696ca2304e7b 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs @@ -236,7 +236,7 @@ impl HashJoinState { } pub fn join_type(&self) -> JoinType { - self.hash_join_desc.join_type.clone() + self.hash_join_desc.join_type } pub fn need_outer_scan(&self) -> bool { diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/spill_common.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/spill_common.rs index 9f300e8936f1d..4812fa400c695 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/spill_common.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/spill_common.rs @@ -32,7 +32,7 @@ pub fn get_hashes( block: &DataBlock, keys: &[Expr], method: &HashMethodKind, - join_type: &JoinType, + join_type: JoinType, from_build: bool, is_null_equal: &[bool], hashes: &mut Vec, diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_join.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_join.rs index e5cb1932ffc58..7663631cf321c 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_join.rs @@ -277,7 +277,7 @@ impl GraceHashJoin { &data, &self.desc.build_keys, &self.hash_method_kind, - &self.desc.join_type, + self.desc.join_type, true, &self.desc.is_null_equal, &mut hashes, @@ -298,7 +298,7 @@ impl GraceHashJoin { &data, &self.desc.probe_keys, &self.hash_method_kind, - &self.desc.join_type, + self.desc.join_type, false, &self.desc.is_null_equal, &mut hashes, diff --git a/src/query/service/src/pipelines/processors/transforms/range_join/range_join_state.rs b/src/query/service/src/pipelines/processors/transforms/range_join/range_join_state.rs index e270c1709154a..45b5acea21e5e 100644 --- a/src/query/service/src/pipelines/processors/transforms/range_join/range_join_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/range_join/range_join_state.rs @@ -99,7 +99,7 @@ impl RangeJoinState { finished_tasks: AtomicU64::new(0), completed_pair: AtomicU64::new(0), ie_join_state, - join_type: range_join.join_type.clone(), + join_type: range_join.join_type, left_match: RwLock::new(MutableBitmap::new()), right_match: RwLock::new(MutableBitmap::new()), partition_count: AtomicU64::new(0), diff --git a/src/query/service/tests/it/sql/planner/optimizer/optimizers/rule/filter_rules/push_down_filter_join_test.rs b/src/query/service/tests/it/sql/planner/optimizer/optimizers/rule/filter_rules/push_down_filter_join_test.rs index 8f208b29421ab..b6ced07c4f0f3 100644 --- a/src/query/service/tests/it/sql/planner/optimizer/optimizers/rule/filter_rules/push_down_filter_join_test.rs +++ b/src/query/service/tests/it/sql/planner/optimizer/optimizers/rule/filter_rules/push_down_filter_join_test.rs @@ -252,7 +252,7 @@ fn run_join_filter_test(test_case: &JoinFilterTestCase, metadata: &MetadataRef) left_scan, right_scan, vec![join_condition], - test_case.join_type.clone(), + test_case.join_type, ); // Create filter diff --git a/src/query/sql/src/planner/binder/aggregate.rs b/src/query/sql/src/planner/binder/aggregate.rs index 33d4173ecd57b..5007300fa56b7 100644 --- a/src/query/sql/src/planner/binder/aggregate.rs +++ b/src/query/sql/src/planner/binder/aggregate.rs @@ -206,7 +206,6 @@ impl<'a> AggregateRewriter<'a> { let index = self.metadata.write().add_derived_column( aggregate.display_name.clone(), *aggregate.return_type.clone(), - Some(ScalarExpr::AggregateFunction(aggregate.clone())), ); let replaced_agg = AggregateFunction { @@ -243,11 +242,10 @@ impl<'a> AggregateRewriter<'a> { let replaced_args = self.replace_function_args(&udaf.arguments, &udaf.name)?; - let index = self.metadata.write().add_derived_column( - udaf.display_name.clone(), - *udaf.return_type.clone(), - Some(ScalarExpr::UDAFCall(udaf.clone())), - ); + let index = self + .metadata + .write() + .add_derived_column(udaf.display_name.clone(), *udaf.return_type.clone()); let replaced_udaf = UDAFCall { span: udaf.span, @@ -321,11 +319,10 @@ impl<'a> AggregateRewriter<'a> { }) } else { let data_type = expr.data_type()?; - let index = self.metadata.write().add_derived_column( - name.clone(), - data_type.clone(), - Some(expr.clone()), - ); + let index = self + .metadata + .write() + .add_derived_column(name.clone(), data_type.clone()); // Generate a ColumnBinding for each sort describe of aggregates let column_binding = ColumnBindingBuilder::new( @@ -401,11 +398,10 @@ impl<'a> AggregateRewriter<'a> { }); } - let index = self.metadata.write().add_derived_column( - name.clone(), - data_type.clone(), - Some(arg.clone()), - ); + let index = self + .metadata + .write() + .add_derived_column(name.clone(), data_type.clone()); // Generate a ColumnBinding for each argument of aggregates let column_binding = ColumnBindingBuilder::new( @@ -767,7 +763,6 @@ impl Binder { let dummy = self.create_derived_column_binding( format!("_dup_group_item_{i}"), item.scalar.data_type()?, - Some(item.scalar.clone()), ); dup_group_items.push((dummy.index, *dummy.data_type)); } @@ -776,7 +771,6 @@ impl Binder { let grouping_id_column = self.create_derived_column_binding( "_grouping_id".to_string(), DataType::Number(NumberDataType::UInt32), - None, ); let bound_grouping_id_col = BoundColumnRef { @@ -857,11 +851,7 @@ impl Binder { { column_ref.column.clone() } else { - self.create_derived_column_binding( - alias, - scalar.data_type()?, - Some(scalar.clone()), - ) + self.create_derived_column_binding(alias, scalar.data_type()?) }; bind_context.aggregate_info.group_items.push(ScalarItem { scalar: scalar.clone(), @@ -912,11 +902,9 @@ impl Binder { { *index } else { - self.metadata.write().add_derived_column( - group_item_name.clone(), - scalar_expr.data_type()?, - Some(scalar_expr.clone()), - ) + self.metadata + .write() + .add_derived_column(group_item_name.clone(), scalar_expr.data_type()?) }; bind_context.aggregate_info.group_items.push(ScalarItem { @@ -1073,11 +1061,7 @@ impl Binder { column.column_name = alias.clone(); column } else { - self.create_derived_column_binding( - alias.clone(), - scalar.data_type()?, - Some(scalar.clone()), - ) + self.create_derived_column_binding(alias.clone(), scalar.data_type()?) }; if scalar_column_index.is_none() { diff --git a/src/query/sql/src/planner/binder/bind_mutation/update.rs b/src/query/sql/src/planner/binder/bind_mutation/update.rs index 5df5e39045569..e18e303edc90b 100644 --- a/src/query/sql/src/planner/binder/bind_mutation/update.rs +++ b/src/query/sql/src/planner/binder/bind_mutation/update.rs @@ -257,11 +257,10 @@ impl Binder { let column = ScalarExpr::BoundColumnRef(BoundColumnRef { span: None, column }); let cast = column.unify_to_data_type(&data_type); - let index = self.metadata.write().add_derived_column( - column_name, - *data_type, - Some(cast.clone()), - ); + let index = self + .metadata + .write() + .add_derived_column(column_name, *data_type); (Some(cast), index) } else { (None, new) diff --git a/src/query/sql/src/planner/binder/bind_query/bind_value.rs b/src/query/sql/src/planner/binder/bind_query/bind_value.rs index de75c0073aa4e..68d8d620fa241 100644 --- a/src/query/sql/src/planner/binder/bind_query/bind_value.rs +++ b/src/query/sql/src/planner/binder/bind_query/bind_value.rs @@ -472,7 +472,6 @@ pub fn bind_values( bind_expression_scan( metadata, bind_context, - span, num_values, num_columns, column_scalars, @@ -497,7 +496,6 @@ pub fn bind_values( pub fn bind_expression_scan( metadata: MetadataRef, bind_context: &mut BindContext, - span: Span, num_values: usize, num_columns: usize, column_scalars: Vec>, @@ -551,14 +549,7 @@ pub fn bind_expression_scan( Visibility::Visible, ) .build(); - let _ = metadata.add_derived_column( - field.name().clone(), - field.data_type().clone(), - Some(ScalarExpr::BoundColumnRef(BoundColumnRef { - span, - column: column_binding.clone(), - })), - ); + let _ = metadata.add_derived_column(field.name().clone(), field.data_type().clone()); bind_context.add_column_binding(column_binding); column_indexes.push(index); } @@ -585,14 +576,7 @@ pub fn bind_expression_scan( ) .build(); - let _ = metadata.add_derived_column( - name, - data_type, - Some(ScalarExpr::BoundColumnRef(BoundColumnRef { - span, - column: new_column_binding.clone(), - })), - ); + let _ = metadata.add_derived_column(name, data_type); bind_context.add_column_binding(new_column_binding); column_indexes.push(new_column_index); @@ -690,14 +674,8 @@ pub fn bind_constant_scan( Visibility::Visible, ) .build(); - let _ = metadata.add_derived_column( - value_field.name().clone(), - value_field.data_type().clone(), - Some(ScalarExpr::BoundColumnRef(BoundColumnRef { - span, - column: column_binding.clone(), - })), - ); + let _ = metadata + .add_derived_column(value_field.name().clone(), value_field.data_type().clone()); bind_context.add_column_binding(column_binding); let field = DataField::new(&index.to_string(), value_field.data_type().clone()); diff --git a/src/query/sql/src/planner/binder/bind_table_reference/bind_join.rs b/src/query/sql/src/planner/binder/bind_table_reference/bind_join.rs index 5cddc074bd408..e2917e0c812f8 100644 --- a/src/query/sql/src/planner/binder/bind_table_reference/bind_join.rs +++ b/src/query/sql/src/planner/binder/bind_table_reference/bind_join.rs @@ -259,11 +259,10 @@ impl Binder { } // If the column is not nullable, generate a new column wrap nullable. let target_type = column.data_type.wrap_nullable(); - let new_index = self.metadata.write().add_derived_column( - column.column_name.clone(), - target_type.clone(), - None, - ); + let new_index = self + .metadata + .write() + .add_derived_column(column.column_name.clone(), target_type.clone()); let old_scalar = ScalarExpr::BoundColumnRef(BoundColumnRef { span: None, column: column.clone(), diff --git a/src/query/sql/src/planner/binder/bind_table_reference/bind_table_function.rs b/src/query/sql/src/planner/binder/bind_table_reference/bind_table_function.rs index 03f370a2ac8cf..31632e3773c35 100644 --- a/src/query/sql/src/planner/binder/bind_table_reference/bind_table_function.rs +++ b/src/query/sql/src/planner/binder/bind_table_reference/bind_table_function.rs @@ -215,11 +215,10 @@ impl Binder { argument: Box::new(input_expr), target_type: Box::new(return_type.clone()), }); - let index = self.metadata.write().add_derived_column( - return_name.clone(), - return_type.clone(), - Some(cast_expr.clone()), - ); + let index = self + .metadata + .write() + .add_derived_column(return_name.clone(), return_type.clone()); let output_binding = ColumnBindingBuilder::new( return_name, index, @@ -397,11 +396,10 @@ impl Binder { arguments: vec![scalar.clone()], }); let data_type = field_expr.data_type()?; - let index = self.metadata.write().add_derived_column( - field.clone(), - data_type.clone(), - Some(field_expr.clone()), - ); + let index = self + .metadata + .write() + .add_derived_column(field.clone(), data_type.clone()); let column_binding = ColumnBindingBuilder::new( field, @@ -500,11 +498,10 @@ impl Binder { } else { // Add result column to metadata let data_type = srf_result.data_type()?; - let index = self.metadata.write().add_derived_column( - srf.to_string(), - data_type.clone(), - Some(srf_result.clone()), - ); + let index = self + .metadata + .write() + .add_derived_column(srf.to_string(), data_type.clone()); ColumnBindingBuilder::new( srf.to_string(), index, diff --git a/src/query/sql/src/planner/binder/binder.rs b/src/query/sql/src/planner/binder/binder.rs index 5e342fc72a094..79e888fa93e1b 100644 --- a/src/query/sql/src/planner/binder/binder.rs +++ b/src/query/sql/src/planner/binder/binder.rs @@ -953,13 +953,11 @@ impl Binder { &mut self, column_name: String, data_type: DataType, - scalar_expr: Option, ) -> ColumnBinding { - let index = self.metadata.write().add_derived_column( - column_name.clone(), - data_type.clone(), - scalar_expr, - ); + let index = self + .metadata + .write() + .add_derived_column(column_name.clone(), data_type.clone()); ColumnBindingBuilder::new(column_name, index, Box::new(data_type), Visibility::Visible) .build() } diff --git a/src/query/sql/src/planner/binder/project.rs b/src/query/sql/src/planner/binder/project.rs index 66ca80bad7d0b..e65de9b97c342 100644 --- a/src/query/sql/src/planner/binder/project.rs +++ b/src/query/sql/src/planner/binder/project.rs @@ -119,11 +119,9 @@ impl Binder { ScalarExpr::WindowFunction(win) => { find_replaced_window_function(window_info, win, &item.alias).unwrap() } - _ => self.create_derived_column_binding( - item.alias.clone(), - item.scalar.data_type()?, - Some(item.scalar.clone()), - ), + _ => { + self.create_derived_column_binding(item.alias.clone(), item.scalar.data_type()?) + } }; if is_grouping_sets_item { diff --git a/src/query/sql/src/planner/binder/project_set.rs b/src/query/sql/src/planner/binder/project_set.rs index ab2edc093f27a..3d8e757745c66 100644 --- a/src/query/sql/src/planner/binder/project_set.rs +++ b/src/query/sql/src/planner/binder/project_set.rs @@ -109,11 +109,10 @@ impl<'a> VisitorMut<'a> for SetReturningAnalyzer<'a> { return Ok(()); } - let index = self.metadata.write().add_derived_column( - srf_display_name.clone(), - replaced_expr.data_type()?, - Some(replaced_expr.clone()), - ); + let index = self + .metadata + .write() + .add_derived_column(srf_display_name.clone(), replaced_expr.data_type()?); // Add the srf to bind context, build ProjectSet plan later. self.bind_context.srf_info.srfs.push(ScalarItem { diff --git a/src/query/sql/src/planner/binder/select.rs b/src/query/sql/src/planner/binder/select.rs index 3523baa4c938e..176cb5620c70a 100644 --- a/src/query/sql/src/planner/binder/select.rs +++ b/src/query/sql/src/planner/binder/select.rs @@ -479,7 +479,6 @@ impl Binder { let column_binding = self.create_derived_column_binding( left_col.column_name.clone(), coercion_types[idx].clone(), - None, ); new_bind_context.add_column_binding(column_binding); } diff --git a/src/query/sql/src/planner/binder/sort.rs b/src/query/sql/src/planner/binder/sort.rs index 617aad8308f0b..22dbd6645b133 100644 --- a/src/query/sql/src/planner/binder/sort.rs +++ b/src/query/sql/src/planner/binder/sort.rs @@ -163,7 +163,6 @@ impl Binder { self.create_derived_column_binding( format!("{:#}", order.expr), rewrite_scalar.data_type()?, - Some(rewrite_scalar.clone()), ) }; let item = ScalarItem { diff --git a/src/query/sql/src/planner/binder/table.rs b/src/query/sql/src/planner/binder/table.rs index 519b29c556278..037f70d0af34c 100644 --- a/src/query/sql/src/planner/binder/table.rs +++ b/src/query/sql/src/planner/binder/table.rs @@ -266,11 +266,8 @@ impl Binder { } _ => col.data_type.clone(), }; - let idx = metadata.add_derived_column( - col.column_name.clone(), - *expand_data_type.clone(), - None, - ); + let idx = + metadata.add_derived_column(col.column_name.clone(), *expand_data_type.clone()); new_bind_ctx.columns.push( ColumnBindingBuilder::new( col.column_name.clone(), diff --git a/src/query/sql/src/planner/binder/window.rs b/src/query/sql/src/planner/binder/window.rs index 3ff0a9d715d0d..d1e1c355f7372 100644 --- a/src/query/sql/src/planner/binder/window.rs +++ b/src/query/sql/src/planner/binder/window.rs @@ -367,11 +367,10 @@ impl<'a> WindowRewriter<'a> { }); } - let index = self.metadata.write().add_derived_column( - window.display_name.clone(), - window.func.return_type(), - Some(ScalarExpr::WindowFunction(window.clone())), - ); + let index = self + .metadata + .write() + .add_derived_column(window.display_name.clone(), window.func.return_type()); // create window info let window_info = WindowFunctionInfo { @@ -468,11 +467,10 @@ impl<'a> WindowRewriter<'a> { } let ty = arg.data_type()?; - let index = self.metadata.write().add_derived_column( - name.to_string(), - ty.clone(), - Some(arg.clone()), - ); + let index = self + .metadata + .write() + .add_derived_column(name.to_string(), ty.clone()); // Generate a ColumnBinding for each argument of aggregates let column = ColumnBindingBuilder::new( diff --git a/src/query/sql/src/planner/metadata/metadata.rs b/src/query/sql/src/planner/metadata/metadata.rs index 8221d6f7fee61..946e376180dc1 100644 --- a/src/query/sql/src/planner/metadata/metadata.rs +++ b/src/query/sql/src/planner/metadata/metadata.rs @@ -34,7 +34,6 @@ use databend_common_expression::TableField; use parking_lot::RwLock; use crate::optimizer::ir::SExpr; -use crate::ScalarExpr; /// Planner use [`usize`] as it's index type. /// @@ -259,18 +258,12 @@ impl Metadata { column_index } - pub fn add_derived_column( - &mut self, - alias: String, - data_type: DataType, - expr: Option, - ) -> IndexType { + pub fn add_derived_column(&mut self, alias: String, data_type: DataType) -> IndexType { let column_index = self.columns.len(); let column_entry = ColumnEntry::DerivedColumn(DerivedColumn { column_index, alias, data_type, - scalar_expr: expr, }); self.columns.push(column_entry); column_index @@ -676,9 +669,6 @@ pub struct DerivedColumn { pub column_index: IndexType, pub alias: String, pub data_type: DataType, - // if the derived column is generated by the scalar expr, save the `scalar_expr`. - // Currently, it's only used by WindowRewriter. - pub scalar_expr: Option, } #[derive(Clone, Debug)] diff --git a/src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/decorrelate.rs b/src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/decorrelate.rs index b6d15700e1e75..f5115251e9efc 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/decorrelate.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/decorrelate.rs @@ -328,7 +328,6 @@ impl SubqueryDecorrelatorOptimizer { self.metadata.write().add_derived_column( "marker".to_string(), DataType::Nullable(Box::new(DataType::Boolean)), - None, ) }; let join_plan = Join { @@ -410,7 +409,6 @@ impl SubqueryDecorrelatorOptimizer { self.metadata.write().add_derived_column( "marker".to_string(), DataType::Nullable(Box::new(DataType::Boolean)), - None, ) }; diff --git a/src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/flatten_plan.rs b/src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/flatten_plan.rs index 54974ec93b5fa..29f5c4552763b 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/flatten_plan.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/flatten_plan.rs @@ -721,7 +721,7 @@ impl SubqueryDecorrelatorOptimizer { let column_entry = metadata.column(old); let name = column_entry.name(); let data_type = column_entry.data_type(); - let new = metadata.add_derived_column(name, data_type, None); + let new = metadata.add_derived_column(name, data_type); self.derived_columns.insert(old, new); new })); @@ -841,7 +841,7 @@ impl SubqueryDecorrelatorOptimizer { let column_entry = metadata.column(*mark); let name = column_entry.name(); let data_type = column_entry.data_type(); - let new_mark = metadata.add_derived_column(name, data_type, None); + let new_mark = metadata.add_derived_column(name, data_type); self.derived_columns.insert(*mark, new_mark); *mark = new_mark; } @@ -884,8 +884,7 @@ impl SubqueryDecorrelatorOptimizer { .map(|index| { let (value, field) = scan.value(index)?; let name = metadata.column(index).name(); - let derived_index = - metadata.add_derived_column(name, field.data_type().clone(), None); + let derived_index = metadata.add_derived_column(name, field.data_type().clone()); let field = DataField::new(&derived_index.to_string(), field.data_type().clone()); self.derived_columns.insert(index, derived_index); @@ -910,11 +909,8 @@ impl SubqueryDecorrelatorOptimizer { .copied() .map(|col| { let column_entry = metadata.column(col).clone(); - let derived_index = metadata.add_derived_column( - column_entry.name(), - column_entry.data_type(), - None, - ); + let derived_index = + metadata.add_derived_column(column_entry.name(), column_entry.data_type()); self.derived_columns.insert(col, derived_index); derived_index }) @@ -963,7 +959,7 @@ impl SubqueryDecorrelatorOptimizer { let name = column_entry.name(); let data_type = column_entry.data_type(); let old = index; - let index = metadata.add_derived_column(name, data_type, Some(scalar.clone())); + let index = metadata.add_derived_column(name, data_type); self.derived_columns.insert(old, index); Ok(ScalarItem { scalar, index }) } diff --git a/src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/subquery_decorrelator.rs b/src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/subquery_decorrelator.rs index 87ead95a12077..50e9498237c0a 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/subquery_decorrelator.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/subquery_decorrelator.rs @@ -597,11 +597,10 @@ impl SubqueryDecorrelatorOptimizer { // `(SELECT COUNT(*) = 1 FROM t WHERE a > 1 LIMIT 1)`. let count_type = AggregateCountFunction::try_create("", vec![], vec![], vec![])? .return_type()?; - let count_func_index = self.metadata.write().add_derived_column( - "count(*)".to_string(), - count_type.clone(), - None, - ); + let count_func_index = self + .metadata + .write() + .add_derived_column("count(*)".to_string(), count_type.clone()); let agg = Aggregate { aggregate_functions: vec![ScalarItem { @@ -665,7 +664,6 @@ impl SubqueryDecorrelatorOptimizer { let column_index = self.metadata.write().add_derived_column( "_exists_scalar_subquery".to_string(), DataType::Boolean, - None, ); output_index = Some(column_index); let eval_scalar = EvalScalar { @@ -734,7 +732,6 @@ impl SubqueryDecorrelatorOptimizer { self.metadata.write().add_derived_column( "marker".to_string(), DataType::Nullable(Box::new(DataType::Boolean)), - None, ) }; diff --git a/src/query/sql/src/planner/optimizer/optimizers/operator/filter/pull_up_filter.rs b/src/query/sql/src/planner/optimizer/optimizers/operator/filter/pull_up_filter.rs index 9425d2be65bc3..ef30a55c6571d 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/operator/filter/pull_up_filter.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/operator/filter/pull_up_filter.rs @@ -187,7 +187,6 @@ impl PullUpFilterOptimizer { let new_index = metadata.write().add_derived_column( column.column.column_name.clone(), *column.column.data_type.clone(), - None, ); let new_column = column.clone(); items.push(ScalarItem { diff --git a/src/query/sql/src/planner/optimizer/optimizers/rule/agg_rules/rule_eager_aggregation.rs b/src/query/sql/src/planner/optimizer/optimizers/rule/agg_rules/rule_eager_aggregation.rs index 0c246862c8cc8..83e8ab352a26d 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/rule/agg_rules/rule_eager_aggregation.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/rule/agg_rules/rule_eager_aggregation.rs @@ -1331,7 +1331,6 @@ fn add_eager_count(final_agg: &mut Aggregate, metadata: MetadataRef) -> (usize, let eager_count_index = metadata.write().add_derived_column( "count(*)".to_string(), DataType::Number(NumberDataType::UInt64), - None, ); if let ScalarExpr::AggregateFunction(agg) = &mut eager_count_aggregation_function.scalar { agg.func_name = "count".to_string(); @@ -1401,7 +1400,6 @@ fn decompose_avg( let count_index = metadata.write().add_derived_column( format!("avg_count_{}({}.{})", &func_name, table_name, column_name), count_aggregation_function.scalar.data_type()?, - None, ); // AVG => COUNT @@ -1428,7 +1426,6 @@ fn update_aggregate_and_eval( let new_index = metadata.write().add_derived_column( format!("_eager_final_{}", &func_name), final_aggregate_function.scalar.data_type()?, - None, ); // Modify final aggregate functions. @@ -1482,7 +1479,6 @@ fn create_eager_count_multiply_scalar_item( let new_index = metadata.write().add_derived_column( format!("{} * _eager_count", aggregate_function.display_name), aggregate_function.args[0].data_type()?, - None, ); let new_scalar = if let ScalarExpr::BoundColumnRef(column) = &aggregate_function.args[0] { diff --git a/src/query/sql/src/planner/optimizer/optimizers/rule/filter_rules/rule_merge_filter_into_mutation.rs b/src/query/sql/src/planner/optimizer/optimizers/rule/filter_rules/rule_merge_filter_into_mutation.rs index 92f1083594cfd..7e0190073cc60 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/rule/filter_rules/rule_merge_filter_into_mutation.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/rule/filter_rules/rule_merge_filter_into_mutation.rs @@ -68,11 +68,10 @@ impl Rule for RuleMergeFilterIntoMutation { mutation.predicates = filter.predicates; if mutation.mutation_type == MutationType::Update { - let column_index = self.metadata.write().add_derived_column( - "_predicate".to_string(), - DataType::Boolean, - None, - ); + let column_index = self + .metadata + .write() + .add_derived_column("_predicate".to_string(), DataType::Boolean); mutation.predicate_column_index = Some(column_index); } diff --git a/src/query/sql/src/planner/semantic/async_function_rewriter.rs b/src/query/sql/src/planner/semantic/async_function_rewriter.rs index adc33a98edefd..74dccc7174529 100644 --- a/src/query/sql/src/planner/semantic/async_function_rewriter.rs +++ b/src/query/sql/src/planner/semantic/async_function_rewriter.rs @@ -172,11 +172,10 @@ impl<'a> VisitorMut<'a> for AsyncFunctionRewriter { column_ref.clone() } else { let name = format!("{}_arg_{}", &async_func.display_name, i); - let index = self.metadata.write().add_derived_column( - name.clone(), - arg.data_type()?, - Some(arg.clone()), - ); + let index = self + .metadata + .write() + .add_derived_column(name.clone(), arg.data_type()?); // Generate a ColumnBinding for each argument of async function let column = ColumnBindingBuilder::new( @@ -205,7 +204,6 @@ impl<'a> VisitorMut<'a> for AsyncFunctionRewriter { None => self.metadata.write().add_derived_column( async_func.display_name.clone(), (*async_func.return_type).clone(), - Some(async_func.clone().into()), ), }; diff --git a/src/query/sql/src/planner/semantic/udf_rewriter.rs b/src/query/sql/src/planner/semantic/udf_rewriter.rs index 969e0e83f6926..02999fc837950 100644 --- a/src/query/sql/src/planner/semantic/udf_rewriter.rs +++ b/src/query/sql/src/planner/semantic/udf_rewriter.rs @@ -180,11 +180,10 @@ impl<'a> VisitorMut<'a> for UdfRewriter { column_ref.clone() } else { let name = format!("{}_arg_{}", &udf.display_name, i); - let index = self.metadata.write().add_derived_column( - name.clone(), - arg.data_type()?, - Some(arg.clone()), - ); + let index = self + .metadata + .write() + .add_derived_column(name.clone(), arg.data_type()?); // Generate a ColumnBinding for each argument of udf function let column = ColumnBindingBuilder::new( @@ -212,11 +211,10 @@ impl<'a> VisitorMut<'a> for UdfRewriter { let index = match self.udf_functions_index_map.get(&udf.display_name) { Some(index) => *index, - None => self.metadata.write().add_derived_column( - udf.display_name.clone(), - (*udf.return_type).clone(), - Some(ScalarExpr::UDFCall(udf.clone())), - ), + None => self + .metadata + .write() + .add_derived_column(udf.display_name.clone(), (*udf.return_type).clone()), }; // Generate a ColumnBinding for the udf function From 08632709c38270764bd92cbdb49a6fe9cc8a0e45 Mon Sep 17 00:00:00 2001 From: coldWater Date: Fri, 7 Nov 2025 16:48:55 +0800 Subject: [PATCH 3/5] is_asof_join --- src/query/service/src/physical_plans/physical_join.rs | 5 +---- .../src/planner/binder/bind_table_reference/bind_join.rs | 5 +---- src/query/sql/src/planner/plans/join.rs | 7 +++++++ 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/query/service/src/physical_plans/physical_join.rs b/src/query/service/src/physical_plans/physical_join.rs index 2adf8d491a30f..30721858c3aa8 100644 --- a/src/query/service/src/physical_plans/physical_join.rs +++ b/src/query/service/src/physical_plans/physical_join.rs @@ -164,10 +164,7 @@ impl PhysicalPlanBuilder { // 2. Build physical plan. // Choose physical join type by join conditions - if matches!( - join.join_type, - JoinType::Asof | JoinType::LeftAsof | JoinType::RightAsof - ) { + if join.join_type.is_asof_join() { let left_rel_expr = RelExpr::with_s_expr(s_expr.left_child()); let right_rel_expr = RelExpr::with_s_expr(s_expr.right_child()); let left_prop = left_rel_expr.derive_relational_prop()?; diff --git a/src/query/sql/src/planner/binder/bind_table_reference/bind_join.rs b/src/query/sql/src/planner/binder/bind_table_reference/bind_join.rs index e2917e0c812f8..b68d04d3c97c9 100644 --- a/src/query/sql/src/planner/binder/bind_table_reference/bind_join.rs +++ b/src/query/sql/src/planner/binder/bind_table_reference/bind_join.rs @@ -452,10 +452,7 @@ impl Binder { build_side_cache_info, }; - if matches!( - logical_join.join_type, - JoinType::Asof | JoinType::LeftAsof | JoinType::RightAsof - ) { + if logical_join.join_type.is_asof_join() { self.rewrite_asof(logical_join, left_child, right_child) } else { Ok(SExpr::create_binary( diff --git a/src/query/sql/src/planner/plans/join.rs b/src/query/sql/src/planner/plans/join.rs index a3a80c0e263a1..1fdb713b5a348 100644 --- a/src/query/sql/src/planner/plans/join.rs +++ b/src/query/sql/src/planner/plans/join.rs @@ -118,6 +118,13 @@ impl JoinType { JoinType::InnerAny | JoinType::LeftAny | JoinType::RightAny ) } + + pub fn is_asof_join(&self) -> bool { + matches!( + self, + JoinType::Asof | JoinType::LeftAsof | JoinType::RightAsof + ) + } } impl Display for JoinType { From 91def031819df27461176c30e07a8645676199a7 Mon Sep 17 00:00:00 2001 From: coldWater Date: Fri, 7 Nov 2025 18:43:08 +0800 Subject: [PATCH 4/5] refine --- .../src/physical_plans/physical_join.rs | 95 +++---- .../bind_table_reference/bind_asof_join.rs | 253 ++++++++---------- .../binder/bind_table_reference/bind_join.rs | 18 +- src/query/sql/src/planner/binder/select.rs | 67 ++--- src/query/sql/src/planner/dataframe.rs | 16 +- 5 files changed, 179 insertions(+), 270 deletions(-) diff --git a/src/query/service/src/physical_plans/physical_join.rs b/src/query/service/src/physical_plans/physical_join.rs index 30721858c3aa8..8e75dcac8e2e1 100644 --- a/src/query/service/src/physical_plans/physical_join.rs +++ b/src/query/service/src/physical_plans/physical_join.rs @@ -64,17 +64,12 @@ fn physical_join(join: &Join, s_expr: &SExpr) -> Result { let left_prop = left_rel_expr.derive_relational_prop()?; let right_prop = right_rel_expr.derive_relational_prop()?; - let mut range_conditions = vec![]; - let mut other_conditions = vec![]; - for condition in join.non_equi_conditions.iter() { - check_condition( - condition, - &left_prop, - &right_prop, - &mut range_conditions, - &mut other_conditions, - ) - } + let (range_conditions, other_conditions) = join + .non_equi_conditions + .iter() + .cloned() + .partition::, _>(|condition| is_range_condition(condition, &left_prop, &right_prop)); + if !range_conditions.is_empty() && matches!(join.join_type, JoinType::Inner | JoinType::Cross) { return Ok(PhysicalJoinType::RangeJoin( range_conditions, @@ -85,38 +80,28 @@ fn physical_join(join: &Join, s_expr: &SExpr) -> Result { Ok(PhysicalJoinType::Hash) } -fn check_condition( +fn is_range_condition( expr: &ScalarExpr, left_prop: &RelationalProperty, right_prop: &RelationalProperty, - range_conditions: &mut Vec, - other_conditions: &mut Vec, -) { - if let ScalarExpr::FunctionCall(func) = expr { - if func.arguments.len() != 2 - || !matches!(func.func_name.as_str(), "gt" | "lt" | "gte" | "lte") - { - other_conditions.push(expr.clone()); - return; - } - let mut left = false; - let mut right = false; - for arg in func.arguments.iter() { - let join_predicate = JoinPredicate::new(arg, left_prop, right_prop); - match join_predicate { - JoinPredicate::Left(_) => left = true, - JoinPredicate::Right(_) => right = true, - JoinPredicate::Both { .. } | JoinPredicate::Other(_) | JoinPredicate::ALL(_) => { - return; - } - } - } - if left && right { - range_conditions.push(expr.clone()); - return; - } +) -> bool { + let ScalarExpr::FunctionCall(func) = expr else { + return false; + }; + if !matches!(func.func_name.as_str(), "gt" | "lt" | "gte" | "lte") { + return false; } - other_conditions.push(expr.clone()); + let [left, right] = func.arguments.as_slice() else { + unreachable!() + }; + + matches!( + ( + JoinPredicate::new(left, left_prop, right_prop), + JoinPredicate::new(right, left_prop, right_prop), + ), + (JoinPredicate::Left(_), JoinPredicate::Right(_)) + ) } impl PhysicalPlanBuilder { @@ -165,33 +150,23 @@ impl PhysicalPlanBuilder { // 2. Build physical plan. // Choose physical join type by join conditions if join.join_type.is_asof_join() { - let left_rel_expr = RelExpr::with_s_expr(s_expr.left_child()); - let right_rel_expr = RelExpr::with_s_expr(s_expr.right_child()); - let left_prop = left_rel_expr.derive_relational_prop()?; - let right_prop = right_rel_expr.derive_relational_prop()?; - let mut range_conditions = vec![]; - let mut other_conditions = vec![]; - - for condition in join.equi_conditions.iter().cloned() { - other_conditions.push( + let left_prop = s_expr.left_child().derive_relational_prop()?; + let right_prop = s_expr.right_child().derive_relational_prop()?; + + let (range_conditions, other_conditions) = join + .non_equi_conditions + .iter() + .cloned() + .chain(join.equi_conditions.iter().cloned().map(|condition| { FunctionCall { span: condition.left.span(), func_name: "eq".to_string(), params: vec![], arguments: vec![condition.left, condition.right], } - .into(), - ); - } - for condition in join.non_equi_conditions.iter() { - check_condition( - condition, - &left_prop, - &right_prop, - &mut range_conditions, - &mut other_conditions, - ) - } + .into() + })) + .partition(|condition| is_range_condition(condition, &left_prop, &right_prop)); self.build_range_join( join.join_type, diff --git a/src/query/sql/src/planner/binder/bind_table_reference/bind_asof_join.rs b/src/query/sql/src/planner/binder/bind_table_reference/bind_asof_join.rs index fb57cb2862cd7..550158e1f82d2 100644 --- a/src/query/sql/src/planner/binder/bind_table_reference/bind_asof_join.rs +++ b/src/query/sql/src/planner/binder/bind_table_reference/bind_asof_join.rs @@ -16,7 +16,6 @@ use std::sync::Arc; use databend_common_exception::ErrorCode; use databend_common_exception::Result; -use databend_common_expression::types::DataType; use databend_common_expression::types::NumberScalar; use databend_common_expression::Scalar; @@ -24,16 +23,13 @@ use crate::binder::bind_window_function_info; use crate::binder::window::WindowRewriter; use crate::binder::JoinPredicate; use crate::binder::Visibility; -use crate::optimizer::ir::RelExpr; use crate::optimizer::ir::RelationalProperty; use crate::optimizer::ir::SExpr; use crate::planner::binder::Binder; use crate::plans::BoundColumnRef; -use crate::plans::ComparisonOp; use crate::plans::ConstantExpr; use crate::plans::FunctionCall; use crate::plans::Join; -use crate::plans::JoinEquiCondition; use crate::plans::LagLeadFunction; use crate::plans::ScalarExpr; use crate::plans::WindowFunc; @@ -45,86 +41,83 @@ use crate::plans::WindowOrderBy; use crate::BindContext; use crate::ColumnBindingBuilder; +const GT: &str = "gt"; +const GTE: &str = "gte"; +const LT: &str = "lt"; +const LTE: &str = "lte"; + impl Binder { - pub(super) fn rewrite_asof(&self, mut join: Join, left: SExpr, right: SExpr) -> Result { + pub(super) fn rewrite_asof( + &self, + mut join: Join, + left: SExpr, + (right, right_context): (SExpr, &mut BindContext), + ) -> Result { let left_prop = left.derive_relational_prop()?; let right_prop = right.derive_relational_prop()?; let mut range_condition = None; for condition in join.non_equi_conditions.iter() { - if check_range_condition(condition, &left_prop, &right_prop) { - if range_condition.is_none() { - range_condition = Some(condition); - } else { + if let Some(func) = check_range_condition(condition, &left_prop, &right_prop) { + if range_condition.is_some() { return Err(ErrorCode::Internal("Multiple inequalities condition!")); } + range_condition = Some(func); } } - let range_condition = match range_condition { - Some(range_condition) => range_condition, - None => { - return Err(ErrorCode::Internal("Missing inequality condition!")); - } + let Some(range_func) = range_condition else { + return Err(ErrorCode::Internal("Missing inequality condition!")); }; - let (window, right_column, left_type) = - self.bind_window_func(&join, &left, &right, range_condition)?; + let (window_func, right_column) = + self.create_window_func(&join, &left_prop, &right_prop, range_func)?; - let mut bind_ctx = BindContext::new(); - let mut rewriter = WindowRewriter::new(&mut bind_ctx, self.metadata.clone()); + let mut rewriter = WindowRewriter::new(right_context, self.metadata.clone()); + let window_func = rewriter.replace_window_function(&window_func)?; - let window_func = rewriter.replace_window_function(&window)?; - let window_info = bind_ctx + let window_info = right_context .windows .get_window_info(&window_func.display_name) .unwrap(); - let folded_args = [ - right_column.clone(), + for condition in join.equi_conditions.iter_mut() { + std::mem::swap(&mut condition.left, &mut condition.right) + } + + let span = right_column.span(); + let arguments = [ + right_column, BoundColumnRef { - span: right_column.span(), + span, column: ColumnBindingBuilder::new( window_func.display_name.clone(), window_info.index, - Box::new(left_type), + Box::new(window_func.func.return_type()), Visibility::Visible, ) .build(), } .into(), - ]; - - for condition in join.equi_conditions.iter_mut() { - std::mem::swap(&mut condition.left, &mut condition.right) + ] + .to_vec(); + + let func_name = match range_func.func_name.as_str() { + GTE => LT, + GT => LTE, + LT => GTE, + LTE => GT, + _ => unreachable!(), } - - if let ScalarExpr::FunctionCall(func) = range_condition.clone() { - let func_name = - match ComparisonOp::try_from_func_name(func.func_name.as_str()).unwrap() { - ComparisonOp::GTE => "lt", - ComparisonOp::GT => "lte", - ComparisonOp::LT => "gte", - ComparisonOp::LTE => "gt", - _ => unreachable!("must be range condition!"), - } - .to_string(); - join.non_equi_conditions.push( - FunctionCall { - span: range_condition.span(), - params: vec![], - arguments: folded_args.to_vec(), - func_name, - } - .into(), - ); - } else { - let [left, right] = folded_args; - join.equi_conditions.push(JoinEquiCondition { - left, - right, - is_null_equal: false, - }); - }; + .to_string(); + join.non_equi_conditions.push( + FunctionCall { + span: range_func.span, + params: vec![], + arguments, + func_name, + } + .into(), + ); let window_plan = bind_window_function_info(&self.ctx, window_info, right)?; Ok(SExpr::create_binary( @@ -134,88 +127,78 @@ impl Binder { )) } - fn bind_window_func( + fn create_window_func( &self, join: &Join, - left: &SExpr, - right: &SExpr, - range_condition: &ScalarExpr, - ) -> Result<(WindowFunc, ScalarExpr, DataType)> { - let right_prop = RelExpr::with_s_expr(left).derive_relational_prop()?; - let left_prop = RelExpr::with_s_expr(right).derive_relational_prop()?; - - let mut right_column = range_condition.clone(); - let mut left_column = range_condition.clone(); - let mut order_items: Vec = Vec::with_capacity(1); - let mut constant_default = ConstantExpr { - span: right_column.span(), - value: Scalar::Null, - }; - if let ScalarExpr::FunctionCall(func) = range_condition - && func.arguments.len() == 2 - { - for arg in func.arguments.iter() { - if let ScalarExpr::BoundColumnRef(_) = arg { - let asc = - match ComparisonOp::try_from_func_name(func.func_name.as_str()).unwrap() { - ComparisonOp::GT | ComparisonOp::GTE => Ok(Some(true)), - ComparisonOp::LT | ComparisonOp::LTE => Ok(Some(false)), - _ => Err(ErrorCode::Internal("must be range condition!")), - }?; - if arg.used_columns().is_subset(&left_prop.output_columns) { - left_column = arg.clone(); - constant_default.span = left_column.span(); - constant_default.value = left_column - .data_type()? - .remove_nullable() - .infinity() - .unwrap(); - if asc == Some(false) { - constant_default.value = left_column - .data_type()? - .remove_nullable() - .ninfinity() - .unwrap(); - } - order_items.push(WindowOrderBy { - expr: arg.clone(), - asc, - nulls_first: Some(true), - }); - } - if arg.used_columns().is_subset(&right_prop.output_columns) { - right_column = arg.clone(); - } - } else { - return Err(ErrorCode::Internal( - "Cannot downcast Scalar to BoundColumnRef", - )); + left_prop: &RelationalProperty, + right_prop: &RelationalProperty, + range_func: &FunctionCall, + ) -> Result<(WindowFunc, ScalarExpr)> { + let (left_column, right_column) = { + match range_func.arguments.as_slice() { + [right, left] + if right.used_columns().is_subset(&right_prop.output_columns) + && left.used_columns().is_subset(&left_prop.output_columns) => + { + (right.clone(), left.clone()) } + [left, right] + if right.used_columns().is_subset(&right_prop.output_columns) + && left.used_columns().is_subset(&left_prop.output_columns) => + { + (right.clone(), left.clone()) + } + _ => unreachable!(), } - } + }; - let mut partition_items: Vec = Vec::with_capacity(join.equi_conditions.len()); - for condition in join.equi_conditions.iter() { - if matches!(condition.right, ScalarExpr::BoundColumnRef(_)) - && matches!(condition.left, ScalarExpr::BoundColumnRef(_)) - { - partition_items.push(condition.right.clone()); + let asc = match range_func.func_name.as_str() { + GT | GTE => true, + LT | LTE => false, + _ => unreachable!(), + }; + + let constant_default = { + let value = if asc { + left_column + .data_type()? + .remove_nullable() + .infinity() + .unwrap() } else { - return Err(ErrorCode::Internal( - "Cannot downcast Scalar to BoundColumnRef", - )); + left_column + .data_type()? + .remove_nullable() + .ninfinity() + .unwrap() + }; + ConstantExpr { + span: left_column.span(), + value, } + }; + + let order_items = vec![WindowOrderBy { + expr: left_column.clone(), + asc: Some(asc), + nulls_first: Some(true), + }]; + + let mut partition_items: Vec<_> = Vec::with_capacity(join.equi_conditions.len()); + for condition in join.equi_conditions.iter() { + partition_items.push(condition.right.clone()); } + let func_type = WindowFuncType::LagLead(LagLeadFunction { is_lag: false, - arg: Box::new(left_column.clone()), + return_type: Box::new(left_column.data_type()?.clone()), + arg: Box::new(left_column), offset: 1, default: Some(Box::new(constant_default.into())), - return_type: Box::new(left_column.data_type()?.clone()), }); let window_func = WindowFunc { - span: range_condition.span(), + span: range_func.span, display_name: func_type.func_name(), partition_by: partition_items, func: func_type, @@ -230,27 +213,22 @@ impl Binder { ))), }, }; - Ok((window_func, right_column, left_column.data_type()?.clone())) + Ok((window_func, right_column)) } } -fn check_range_condition( - expr: &ScalarExpr, +fn check_range_condition<'a>( + expr: &'a ScalarExpr, left_prop: &RelationalProperty, right_prop: &RelationalProperty, -) -> bool { - let ScalarExpr::FunctionCall(FunctionCall { - func_name, - arguments, - .. - }) = expr - else { - return false; +) -> Option<&'a FunctionCall> { + let ScalarExpr::FunctionCall(func) = expr else { + return None; }; - if !matches!(func_name.as_str(), "gt" | "lt" | "gte" | "lte") { - return false; + if !matches!(func.func_name.as_str(), GT | LT | GTE | LTE) { + return None; } - let [left, right] = arguments.as_slice() else { + let [left, right] = func.arguments.as_slice() else { unreachable!() }; @@ -261,4 +239,5 @@ fn check_range_condition( ), (JoinPredicate::Left(_), JoinPredicate::Right(_)) ) + .then_some(func) } diff --git a/src/query/sql/src/planner/binder/bind_table_reference/bind_join.rs b/src/query/sql/src/planner/binder/bind_table_reference/bind_join.rs index b68d04d3c97c9..43bd47bb5e75f 100644 --- a/src/query/sql/src/planner/binder/bind_table_reference/bind_join.rs +++ b/src/query/sql/src/planner/binder/bind_table_reference/bind_join.rs @@ -154,8 +154,8 @@ impl Binder { let s_expr = self.bind_join_with_type( join_type, join_conditions, - left_child, - right_child, + (left_child, &mut left_context), + (right_child, &mut right_context), vec![], build_side_cache_info, )?; @@ -178,8 +178,8 @@ impl Binder { pub(crate) async fn bind_merge_into_join( &mut self, bind_context: &mut BindContext, - left_context: BindContext, - right_context: BindContext, + mut left_context: BindContext, + mut right_context: BindContext, left_child: SExpr, right_child: SExpr, join_op: JoinOperator, @@ -214,8 +214,8 @@ impl Binder { let s_expr = self.bind_join_with_type( join_type, join_conditions, - left_child, - right_child, + (left_child, &mut left_context), + (right_child, &mut right_context), vec![], None, )?; @@ -355,8 +355,8 @@ impl Binder { mut non_equi_conditions, other_conditions, }: JoinConditions, - mut left_child: SExpr, - mut right_child: SExpr, + (mut left_child, _): (SExpr, &mut BindContext), + (mut right_child, right_context): (SExpr, &mut BindContext), mut is_null_equal: Vec, build_side_cache_info: Option, ) -> Result { @@ -453,7 +453,7 @@ impl Binder { }; if logical_join.join_type.is_asof_join() { - self.rewrite_asof(logical_join, left_child, right_child) + self.rewrite_asof(logical_join, left_child, (right_child, right_context)) } else { Ok(SExpr::create_binary( Arc::new(logical_join.into()), diff --git a/src/query/sql/src/planner/binder/select.rs b/src/query/sql/src/planner/binder/select.rs index 176cb5620c70a..4e97aba60ae2a 100644 --- a/src/query/sql/src/planner/binder/select.rs +++ b/src/query/sql/src/planner/binder/select.rs @@ -161,23 +161,15 @@ impl Binder { SetOperator::Intersect if !all => { // Transfer Intersect to Semi join self.bind_intersect( - left.span(), - right.span(), - left_bind_context, - right_bind_context, - left_expr, - right_expr, + (left.span(),left_expr,left_bind_context), + (right.span(),right_expr,right_bind_context), ) } SetOperator::Except if !all => { // Transfer Except to Anti join self.bind_except( - left.span(), - right.span(), - left_bind_context, - right_bind_context, - left_expr, - right_expr, + (left.span(),left_expr,left_bind_context), + (right.span(),right_expr,right_bind_context), ) } SetOperator::Union => self.bind_union( @@ -302,53 +294,24 @@ impl Binder { pub fn bind_intersect( &mut self, - left_span: Span, - right_span: Span, - left_context: BindContext, - right_context: BindContext, - left_expr: SExpr, - right_expr: SExpr, + left: (Span, SExpr, BindContext), + right: (Span, SExpr, BindContext), ) -> Result<(SExpr, BindContext)> { - self.bind_intersect_or_except( - left_span, - right_span, - left_context, - right_context, - left_expr, - right_expr, - JoinType::LeftSemi, - ) + self.bind_intersect_or_except(left, right, JoinType::LeftSemi) } pub fn bind_except( &mut self, - left_span: Span, - right_span: Span, - left_context: BindContext, - right_context: BindContext, - left_expr: SExpr, - right_expr: SExpr, + left: (Span, SExpr, BindContext), + right: (Span, SExpr, BindContext), ) -> Result<(SExpr, BindContext)> { - self.bind_intersect_or_except( - left_span, - right_span, - left_context, - right_context, - left_expr, - right_expr, - JoinType::LeftAnti, - ) + self.bind_intersect_or_except(left, right, JoinType::LeftAnti) } - #[allow(clippy::too_many_arguments)] - pub fn bind_intersect_or_except( + fn bind_intersect_or_except( &mut self, - left_span: Span, - right_span: Span, - mut left_context: BindContext, - right_context: BindContext, - left_expr: SExpr, - right_expr: SExpr, + (left_span, left_expr, mut left_context): (Span, SExpr, BindContext), + (right_span, right_expr, mut right_context): (Span, SExpr, BindContext), join_type: JoinType, ) -> Result<(SExpr, BindContext)> { let mut left_conditions = Vec::with_capacity(left_context.columns.len()); @@ -384,8 +347,8 @@ impl Binder { let s_expr = self.bind_join_with_type( join_type, join_conditions, - left_expr, - right_expr, + (left_expr, &mut left_context), + (right_expr, &mut right_context), is_null_equal, None, )?; diff --git a/src/query/sql/src/planner/dataframe.rs b/src/query/sql/src/planner/dataframe.rs index 40233a25d2a3e..1fd54cbef93e2 100644 --- a/src/query/sql/src/planner/dataframe.rs +++ b/src/query/sql/src/planner/dataframe.rs @@ -420,12 +420,8 @@ impl Dataframe { pub async fn except(mut self, dataframe: Dataframe) -> Result { let (s_expr, bind_context) = self.binder.bind_except( - None, - None, - self.bind_context, - dataframe.bind_context, - self.s_expr, - dataframe.s_expr, + (None, self.s_expr, self.bind_context), + (None, dataframe.s_expr, dataframe.bind_context), )?; self.s_expr = s_expr; self.bind_context = bind_context; @@ -434,12 +430,8 @@ impl Dataframe { pub async fn intersect(mut self, dataframe: Dataframe) -> Result { let (s_expr, bind_context) = self.binder.bind_intersect( - None, - None, - self.bind_context, - dataframe.bind_context, - self.s_expr, - dataframe.s_expr, + (None, self.s_expr, self.bind_context), + (None, dataframe.s_expr, dataframe.bind_context), )?; self.s_expr = s_expr; self.bind_context = bind_context; From a3849efe08dca2fac4a5b49f9c2f688ad6ae63d2 Mon Sep 17 00:00:00 2001 From: coldWater Date: Fri, 7 Nov 2025 18:43:08 +0800 Subject: [PATCH 5/5] fix --- .../src/physical_plans/physical_join.rs | 35 ++++--------------- .../bind_table_reference/bind_asof_join.rs | 33 +++++++++++------ .../binder/bind_table_reference/mod.rs | 1 + src/query/sql/src/planner/binder/mod.rs | 1 + 4 files changed, 31 insertions(+), 39 deletions(-) diff --git a/src/query/service/src/physical_plans/physical_join.rs b/src/query/service/src/physical_plans/physical_join.rs index 8e75dcac8e2e1..bc48246312e38 100644 --- a/src/query/service/src/physical_plans/physical_join.rs +++ b/src/query/service/src/physical_plans/physical_join.rs @@ -14,9 +14,8 @@ use databend_common_exception::ErrorCode; use databend_common_exception::Result; -use databend_common_sql::binder::JoinPredicate; +use databend_common_sql::binder::is_range_join_condition; use databend_common_sql::optimizer::ir::RelExpr; -use databend_common_sql::optimizer::ir::RelationalProperty; use databend_common_sql::optimizer::ir::SExpr; use databend_common_sql::plans::FunctionCall; use databend_common_sql::plans::Join; @@ -68,7 +67,9 @@ fn physical_join(join: &Join, s_expr: &SExpr) -> Result { .non_equi_conditions .iter() .cloned() - .partition::, _>(|condition| is_range_condition(condition, &left_prop, &right_prop)); + .partition::, _>(|condition| { + is_range_join_condition(condition, &left_prop, &right_prop).is_some() + }); if !range_conditions.is_empty() && matches!(join.join_type, JoinType::Inner | JoinType::Cross) { return Ok(PhysicalJoinType::RangeJoin( @@ -80,30 +81,6 @@ fn physical_join(join: &Join, s_expr: &SExpr) -> Result { Ok(PhysicalJoinType::Hash) } -fn is_range_condition( - expr: &ScalarExpr, - left_prop: &RelationalProperty, - right_prop: &RelationalProperty, -) -> bool { - let ScalarExpr::FunctionCall(func) = expr else { - return false; - }; - if !matches!(func.func_name.as_str(), "gt" | "lt" | "gte" | "lte") { - return false; - } - let [left, right] = func.arguments.as_slice() else { - unreachable!() - }; - - matches!( - ( - JoinPredicate::new(left, left_prop, right_prop), - JoinPredicate::new(right, left_prop, right_prop), - ), - (JoinPredicate::Left(_), JoinPredicate::Right(_)) - ) -} - impl PhysicalPlanBuilder { pub async fn build_join( &mut self, @@ -166,7 +143,9 @@ impl PhysicalPlanBuilder { } .into() })) - .partition(|condition| is_range_condition(condition, &left_prop, &right_prop)); + .partition(|condition| { + is_range_join_condition(condition, &left_prop, &right_prop).is_some() + }); self.build_range_join( join.join_type, diff --git a/src/query/sql/src/planner/binder/bind_table_reference/bind_asof_join.rs b/src/query/sql/src/planner/binder/bind_table_reference/bind_asof_join.rs index 550158e1f82d2..0738c28d95b8c 100644 --- a/src/query/sql/src/planner/binder/bind_table_reference/bind_asof_join.rs +++ b/src/query/sql/src/planner/binder/bind_table_reference/bind_asof_join.rs @@ -58,7 +58,7 @@ impl Binder { let mut range_condition = None; for condition in join.non_equi_conditions.iter() { - if let Some(func) = check_range_condition(condition, &left_prop, &right_prop) { + if let Some(func) = is_range_join_condition(condition, &left_prop, &right_prop) { if range_condition.is_some() { return Err(ErrorCode::Internal("Multiple inequalities condition!")); } @@ -217,7 +217,7 @@ impl Binder { } } -fn check_range_condition<'a>( +pub fn is_range_join_condition<'a>( expr: &'a ScalarExpr, left_prop: &RelationalProperty, right_prop: &RelationalProperty, @@ -228,16 +228,27 @@ fn check_range_condition<'a>( if !matches!(func.func_name.as_str(), GT | LT | GTE | LTE) { return None; } - let [left, right] = func.arguments.as_slice() else { + let [a, b] = func.arguments.as_slice() else { unreachable!() }; - matches!( - ( - JoinPredicate::new(left, left_prop, right_prop), - JoinPredicate::new(right, left_prop, right_prop), - ), - (JoinPredicate::Left(_), JoinPredicate::Right(_)) - ) - .then_some(func) + match JoinPredicate::new(a, left_prop, right_prop) { + JoinPredicate::Left(_) + if matches!( + JoinPredicate::new(b, left_prop, right_prop), + JoinPredicate::Right(_) + ) => + { + Some(func) + } + JoinPredicate::Right(_) + if matches!( + JoinPredicate::new(b, left_prop, right_prop), + JoinPredicate::Left(_) + ) => + { + Some(func) + } + _ => None, + } } diff --git a/src/query/sql/src/planner/binder/bind_table_reference/mod.rs b/src/query/sql/src/planner/binder/bind_table_reference/mod.rs index 98f8fdb344157..38766d581616a 100644 --- a/src/query/sql/src/planner/binder/bind_table_reference/mod.rs +++ b/src/query/sql/src/planner/binder/bind_table_reference/mod.rs @@ -22,5 +22,6 @@ mod bind_subquery; mod bind_table; mod bind_table_function; +pub use bind_asof_join::is_range_join_condition; pub use bind_join::JoinConditions; pub use bind_table_function::parse_result_scan_args; diff --git a/src/query/sql/src/planner/binder/mod.rs b/src/query/sql/src/planner/binder/mod.rs index 2b96a0ecd239d..989571eab718c 100644 --- a/src/query/sql/src/planner/binder/mod.rs +++ b/src/query/sql/src/planner/binder/mod.rs @@ -68,6 +68,7 @@ pub use bind_mutation::target_probe; pub use bind_mutation::MutationStrategy; pub use bind_mutation::MutationType; pub use bind_query::bind_values; +pub use bind_table_reference::is_range_join_condition; pub use bind_table_reference::parse_result_scan_args; pub use binder::Binder; pub use builders::*;