1515// specific language governing permissions and limitations
1616// under the License.
1717
18- //! Enforcement optimizer rules are used to make sure the plan's Distribution and Ordering
19- //! requirements are met by inserting necessary [[ RepartitionExec]] and [[SortExec]].
20- //!
18+ //! EnforceDistribution optimizer rule inspects the physical plan with respect
19+ //! to distribution requirements and adds [ RepartitionExec]s to satisfy them
20+ //! when necessary.
2121use crate :: config:: ConfigOptions ;
2222use crate :: error:: Result ;
23- use crate :: physical_optimizer:: utils:: { add_sort_above_child, ordering_satisfy} ;
2423use crate :: physical_optimizer:: PhysicalOptimizerRule ;
2524use crate :: physical_plan:: aggregates:: { AggregateExec , AggregateMode , PhysicalGroupBy } ;
2625use crate :: physical_plan:: coalesce_partitions:: CoalescePartitionsExec ;
@@ -46,25 +45,25 @@ use datafusion_physical_expr::{
4645use std:: collections:: HashMap ;
4746use std:: sync:: Arc ;
4847
49- /// BasicEnforcement rule, it ensures the Distribution and Ordering requirements are met
50- /// in the strictest way. It might add additional [[ RepartitionExec] ] to the plan tree
48+ /// The EnforceDistribution rule ensures that distribution requirements are met
49+ /// in the strictest way. It might add additional [RepartitionExec] to the plan tree
5150/// and give a non-optimal plan, but it can avoid the possible data skew in joins.
5251///
5352/// For example for a HashJoin with keys(a, b, c), the required Distribution(a, b, c) can be satisfied by
5453/// several alternative partitioning ways: [(a, b, c), (a, b), (a, c), (b, c), (a), (b), (c), ( )].
5554///
5655/// This rule only chooses the exactly match and satisfies the Distribution(a, b, c) by a HashPartition(a, b, c).
5756#[ derive( Default ) ]
58- pub struct BasicEnforcement { }
57+ pub struct EnforceDistribution { }
5958
60- impl BasicEnforcement {
59+ impl EnforceDistribution {
6160 #[ allow( missing_docs) ]
6261 pub fn new ( ) -> Self {
6362 Self { }
6463 }
6564}
6665
67- impl PhysicalOptimizerRule for BasicEnforcement {
66+ impl PhysicalOptimizerRule for EnforceDistribution {
6867 fn optimize (
6968 & self ,
7069 plan : Arc < dyn ExecutionPlan > ,
@@ -81,24 +80,21 @@ impl PhysicalOptimizerRule for BasicEnforcement {
8180 } else {
8281 plan
8382 } ;
84- // Distribution and Ordering enforcement need to be applied bottom-up.
83+ // Distribution enforcement needs to be applied bottom-up.
8584 new_plan. transform_up ( & {
8685 |plan| {
8786 let adjusted = if !top_down_join_key_reordering {
8887 reorder_join_keys_to_inputs ( plan) ?
8988 } else {
9089 plan
9190 } ;
92- Ok ( Some ( ensure_distribution_and_ordering (
93- adjusted,
94- target_partitions,
95- ) ?) )
91+ Ok ( Some ( ensure_distribution ( adjusted, target_partitions) ?) )
9692 }
9793 } )
9894 }
9995
10096 fn name ( & self ) -> & str {
101- "BasicEnforcement "
97+ "EnforceDistribution "
10298 }
10399
104100 fn schema_check ( & self ) -> bool {
@@ -829,10 +825,11 @@ fn new_join_conditions(
829825 new_join_on
830826}
831827
832- /// Within this function, it checks whether we need to add additional plan operators
833- /// of data exchanging and data ordering to satisfy the required distribution and ordering.
834- /// And we should avoid to manually add plan operators of data exchanging and data ordering in other places
835- fn ensure_distribution_and_ordering (
828+ /// This function checks whether we need to add additional data exchange
829+ /// operators to satisfy distribution requirements. Since this function
830+ /// takes care of such requirements, we should avoid manually adding data
831+ /// exchange operators in other places.
832+ fn ensure_distribution (
836833 plan : Arc < dyn crate :: physical_plan:: ExecutionPlan > ,
837834 target_partitions : usize ,
838835) -> Result < Arc < dyn crate :: physical_plan:: ExecutionPlan > > {
@@ -841,13 +838,11 @@ fn ensure_distribution_and_ordering(
841838 }
842839
843840 let required_input_distributions = plan. required_input_distribution ( ) ;
844- let required_input_orderings = plan. required_input_ordering ( ) ;
845841 let children: Vec < Arc < dyn ExecutionPlan > > = plan. children ( ) ;
846842 assert_eq ! ( children. len( ) , required_input_distributions. len( ) ) ;
847- assert_eq ! ( children. len( ) , required_input_orderings. len( ) ) ;
848843
849844 // Add RepartitionExec to guarantee output partitioning
850- let children = children
845+ let new_children : Result < Vec < Arc < dyn ExecutionPlan > > > = children
851846 . into_iter ( )
852847 . zip ( required_input_distributions. into_iter ( ) )
853848 . map ( |( child, required) | {
@@ -870,24 +865,8 @@ fn ensure_distribution_and_ordering(
870865 } ;
871866 new_child
872867 }
873- } ) ;
874-
875- // Add local SortExec to guarantee output ordering within each partition
876- let new_children: Result < Vec < Arc < dyn ExecutionPlan > > > = children
877- . zip ( required_input_orderings. into_iter ( ) )
878- . map ( |( child_result, required) | {
879- let child = child_result?;
880- if ordering_satisfy ( child. output_ordering ( ) , required, || {
881- child. equivalence_properties ( )
882- } ) {
883- Ok ( child)
884- } else {
885- let sort_expr = required. unwrap ( ) . to_vec ( ) ;
886- add_sort_above_child ( & child, sort_expr)
887- }
888868 } )
889869 . collect ( ) ;
890-
891870 with_new_children_if_necessary ( plan, new_children?)
892871}
893872
@@ -979,6 +958,7 @@ mod tests {
979958 use super :: * ;
980959 use crate :: datasource:: listing:: PartitionedFile ;
981960 use crate :: datasource:: object_store:: ObjectStoreUrl ;
961+ use crate :: physical_optimizer:: sort_enforcement:: EnforceSorting ;
982962 use crate :: physical_plan:: aggregates:: {
983963 AggregateExec , AggregateMode , PhysicalGroupBy ,
984964 } ;
@@ -1136,8 +1116,15 @@ mod tests {
11361116 config. execution. target_partitions = 10 ;
11371117
11381118 // run optimizer
1139- let optimizer = BasicEnforcement { } ;
1119+ let optimizer = EnforceDistribution { } ;
11401120 let optimized = optimizer. optimize( $PLAN, & config) ?;
1121+ // NOTE: These tests verify the joint `EnforceDistribution` + `EnforceSorting` cascade
1122+ // because they were written prior to the separation of `BasicEnforcement` into
1123+ // `EnforceSorting` and `EnfoceDistribution`.
1124+ // TODO: Orthogonalize the tests here just to verify `EnforceDistribution` and create
1125+ // new tests for the cascade.
1126+ let optimizer = EnforceSorting { } ;
1127+ let optimized = optimizer. optimize( optimized, & config) ?;
11411128
11421129 // Now format correctly
11431130 let plan = displayable( optimized. as_ref( ) ) . indent( ) . to_string( ) ;
@@ -1656,7 +1643,7 @@ mod tests {
16561643 Column :: new_with_schema( "c1" , & right. schema( ) ) . unwrap( ) ,
16571644 ) ,
16581645 ] ;
1659- let bottom_left_join = ensure_distribution_and_ordering (
1646+ let bottom_left_join = ensure_distribution (
16601647 hash_join_exec ( left. clone ( ) , right. clone ( ) , & join_on, & JoinType :: Inner ) ,
16611648 10 ,
16621649 ) ?;
@@ -1686,7 +1673,7 @@ mod tests {
16861673 Column :: new_with_schema( "a1" , & right. schema( ) ) . unwrap( ) ,
16871674 ) ,
16881675 ] ;
1689- let bottom_right_join = ensure_distribution_and_ordering (
1676+ let bottom_right_join = ensure_distribution (
16901677 hash_join_exec ( left, right. clone ( ) , & join_on, & JoinType :: Inner ) ,
16911678 10 ,
16921679 ) ?;
@@ -1775,7 +1762,7 @@ mod tests {
17751762 Column :: new_with_schema( "b1" , & right. schema( ) ) . unwrap( ) ,
17761763 ) ,
17771764 ] ;
1778- let bottom_left_join = ensure_distribution_and_ordering (
1765+ let bottom_left_join = ensure_distribution (
17791766 hash_join_exec ( left. clone ( ) , right. clone ( ) , & join_on, & JoinType :: Inner ) ,
17801767 10 ,
17811768 ) ?;
@@ -1805,7 +1792,7 @@ mod tests {
18051792 Column :: new_with_schema( "a1" , & right. schema( ) ) . unwrap( ) ,
18061793 ) ,
18071794 ] ;
1808- let bottom_right_join = ensure_distribution_and_ordering (
1795+ let bottom_right_join = ensure_distribution (
18091796 hash_join_exec ( left, right. clone ( ) , & join_on, & JoinType :: Inner ) ,
18101797 10 ,
18111798 ) ?;
0 commit comments