Skip to content

Commit b990987

Browse files
adriangbgabotechs
andauthored
Add PhysicalOptimizerRule::optimize_plan to allow passing more context into optimizer rules (#18739)
Co-authored-by: Gabriel Musat Mestre <gabriel.musatmestre@datadoghq.com>
1 parent 9f725d9 commit b990987

33 files changed

+446
-180
lines changed

datafusion/core/benches/push_down_filter.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@ use arrow::datatypes::{DataType, Field, Schema};
2020
use bytes::{BufMut, BytesMut};
2121
use criterion::{criterion_group, criterion_main, Criterion};
2222
use datafusion::config::ConfigOptions;
23-
use datafusion::prelude::{ParquetReadOptions, SessionContext};
23+
use datafusion::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
2424
use datafusion_execution::object_store::ObjectStoreUrl;
2525
use datafusion_physical_optimizer::filter_pushdown::FilterPushdown;
26-
use datafusion_physical_optimizer::PhysicalOptimizerRule;
26+
use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
2727
use datafusion_physical_plan::ExecutionPlan;
2828
use object_store::memory::InMemory;
2929
use object_store::path::Path;
@@ -106,11 +106,13 @@ fn bench_push_down_filter(c: &mut Criterion) {
106106
config.execution.parquet.pushdown_filters = true;
107107
let plan = BenchmarkPlan { plan, config };
108108
let optimizer = FilterPushdown::new();
109+
let session_config = SessionConfig::from(plan.config.clone());
110+
let optimizer_context = OptimizerContext::new(session_config);
109111

110112
c.bench_function("push_down_filter", |b| {
111113
b.iter(|| {
112114
optimizer
113-
.optimize(Arc::clone(&plan.plan), &plan.config)
115+
.optimize_plan(Arc::clone(&plan.plan), &optimizer_context)
114116
.unwrap();
115117
});
116118
});

datafusion/core/src/physical_planner.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ use datafusion_physical_expr::expressions::Literal;
9292
use datafusion_physical_expr::{
9393
create_physical_sort_exprs, LexOrdering, PhysicalSortExpr,
9494
};
95-
use datafusion_physical_optimizer::PhysicalOptimizerRule;
95+
use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
9696
use datafusion_physical_plan::empty::EmptyExec;
9797
use datafusion_physical_plan::execution_plan::InvariantLevel;
9898
use datafusion_physical_plan::joins::PiecewiseMergeJoinExec;
@@ -2271,11 +2271,14 @@ impl DefaultPhysicalPlanner {
22712271
// to verify that the plan fulfills the base requirements.
22722272
InvariantChecker(InvariantLevel::Always).check(&plan)?;
22732273

2274+
// Create optimizer context from session state
2275+
let optimizer_context = OptimizerContext::new(session_state.config().clone());
2276+
22742277
let mut new_plan = Arc::clone(&plan);
22752278
for optimizer in optimizers {
22762279
let before_schema = new_plan.schema();
22772280
new_plan = optimizer
2278-
.optimize(new_plan, session_state.config_options())
2281+
.optimize_plan(new_plan, &optimizer_context)
22792282
.map_err(|e| {
22802283
DataFusionError::Context(optimizer.name().to_string(), Box::new(e))
22812284
})?;

datafusion/core/tests/execution/coop.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use datafusion_physical_expr::Partitioning;
4040
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
4141
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
4242
use datafusion_physical_optimizer::ensure_coop::EnsureCooperative;
43-
use datafusion_physical_optimizer::PhysicalOptimizerRule;
43+
use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
4444
use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
4545
use datafusion_physical_plan::coop::make_cooperative;
4646
use datafusion_physical_plan::filter::FilterExec;
@@ -810,8 +810,8 @@ async fn query_yields(
810810
task_ctx: Arc<TaskContext>,
811811
) -> Result<(), Box<dyn Error>> {
812812
// Run plan through EnsureCooperative
813-
let optimized =
814-
EnsureCooperative::new().optimize(plan, task_ctx.session_config().options())?;
813+
let optimizer_context = OptimizerContext::new(task_ctx.session_config().clone());
814+
let optimized = EnsureCooperative::new().optimize_plan(plan, &optimizer_context)?;
815815

816816
// Get the stream
817817
let stream = physical_plan::execute_stream(optimized, task_ctx)?;

datafusion/core/tests/parquet/file_statistics.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use datafusion_expr::{col, lit, Expr};
4040
use datafusion::datasource::physical_plan::FileScanConfig;
4141
use datafusion_common::config::ConfigOptions;
4242
use datafusion_physical_optimizer::filter_pushdown::FilterPushdown;
43-
use datafusion_physical_optimizer::PhysicalOptimizerRule;
43+
use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
4444
use datafusion_physical_plan::filter::FilterExec;
4545
use datafusion_physical_plan::ExecutionPlan;
4646
use tempfile::tempdir;
@@ -84,8 +84,10 @@ async fn check_stats_precision_with_filter_pushdown() {
8484
Arc::new(FilterExec::try_new(physical_filter, exec_with_filter).unwrap())
8585
as Arc<dyn ExecutionPlan>;
8686

87+
let session_config = SessionConfig::from(options.clone());
88+
let optimizer_context = OptimizerContext::new(session_config);
8789
let optimized_exec = FilterPushdown::new()
88-
.optimize(filtered_exec, &options)
90+
.optimize_plan(filtered_exec, &optimizer_context)
8991
.unwrap();
9092

9193
assert!(

datafusion/core/tests/physical_optimizer/aggregate_statistics.rs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,13 @@ use arrow::record_batch::RecordBatch;
2525
use datafusion::datasource::memory::MemorySourceConfig;
2626
use datafusion::datasource::source::DataSourceExec;
2727
use datafusion_common::cast::as_int64_array;
28-
use datafusion_common::config::ConfigOptions;
2928
use datafusion_common::Result;
29+
use datafusion_execution::config::SessionConfig;
3030
use datafusion_execution::TaskContext;
3131
use datafusion_expr::Operator;
3232
use datafusion_physical_expr::expressions::{self, cast};
3333
use datafusion_physical_optimizer::aggregate_statistics::AggregateStatistics;
34+
use datafusion_physical_optimizer::OptimizerContext;
3435
use datafusion_physical_optimizer::PhysicalOptimizerRule;
3536
use datafusion_physical_plan::aggregates::AggregateExec;
3637
use datafusion_physical_plan::aggregates::AggregateMode;
@@ -67,8 +68,10 @@ async fn assert_count_optim_success(
6768
let task_ctx = Arc::new(TaskContext::default());
6869
let plan: Arc<dyn ExecutionPlan> = Arc::new(plan);
6970

70-
let config = ConfigOptions::new();
71-
let optimized = AggregateStatistics::new().optimize(Arc::clone(&plan), &config)?;
71+
let session_config = SessionConfig::new();
72+
let optimizer_context = OptimizerContext::new(session_config.clone());
73+
let optimized = AggregateStatistics::new()
74+
.optimize_plan(Arc::clone(&plan), &optimizer_context)?;
7275

7376
// A ProjectionExec is a sign that the count optimization was applied
7477
assert!(optimized.as_any().is::<ProjectionExec>());
@@ -264,8 +267,10 @@ async fn test_count_inexact_stat() -> Result<()> {
264267
Arc::clone(&schema),
265268
)?;
266269

267-
let conf = ConfigOptions::new();
268-
let optimized = AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?;
270+
let session_config = SessionConfig::new();
271+
let optimizer_context = OptimizerContext::new(session_config.clone());
272+
let optimized = AggregateStatistics::new()
273+
.optimize_plan(Arc::new(final_agg), &optimizer_context)?;
269274

270275
// check that the original ExecutionPlan was not replaced
271276
assert!(optimized.as_any().is::<AggregateExec>());
@@ -308,8 +313,10 @@ async fn test_count_with_nulls_inexact_stat() -> Result<()> {
308313
Arc::clone(&schema),
309314
)?;
310315

311-
let conf = ConfigOptions::new();
312-
let optimized = AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?;
316+
let session_config = SessionConfig::new();
317+
let optimizer_context = OptimizerContext::new(session_config.clone());
318+
let optimized = AggregateStatistics::new()
319+
.optimize_plan(Arc::new(final_agg), &optimizer_context)?;
313320

314321
// check that the original ExecutionPlan was not replaced
315322
assert!(optimized.as_any().is::<AggregateExec>());

datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,15 @@ use std::sync::Arc;
2626
use crate::physical_optimizer::test_utils::parquet_exec;
2727

2828
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
29-
use datafusion_common::config::ConfigOptions;
29+
use datafusion_execution::config::SessionConfig;
3030
use datafusion_functions_aggregate::count::count_udaf;
3131
use datafusion_functions_aggregate::sum::sum_udaf;
3232
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
3333
use datafusion_physical_expr::expressions::{col, lit};
3434
use datafusion_physical_expr::Partitioning;
3535
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
3636
use datafusion_physical_optimizer::combine_partial_final_agg::CombinePartialFinalAggregate;
37-
use datafusion_physical_optimizer::PhysicalOptimizerRule;
37+
use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
3838
use datafusion_physical_plan::aggregates::{
3939
AggregateExec, AggregateMode, PhysicalGroupBy,
4040
};
@@ -47,8 +47,9 @@ macro_rules! assert_optimized {
4747
($PLAN: expr, @ $EXPECTED_LINES: literal $(,)?) => {
4848
// run optimizer
4949
let optimizer = CombinePartialFinalAggregate {};
50-
let config = ConfigOptions::new();
51-
let optimized = optimizer.optimize($PLAN, &config)?;
50+
let session_config = SessionConfig::new();
51+
let optimizer_context = OptimizerContext::new(session_config.clone());
52+
let optimized = optimizer.optimize_plan($PLAN, &optimizer_context)?;
5253
// Now format correctly
5354
let plan = displayable(optimized.as_ref()).indent(true).to_string();
5455
let actual_lines = plan.trim();

datafusion/core/tests/physical_optimizer/enforce_distribution.rs

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ use datafusion_physical_expr_common::sort_expr::{
5252
use datafusion_physical_optimizer::enforce_distribution::*;
5353
use datafusion_physical_optimizer::enforce_sorting::EnforceSorting;
5454
use datafusion_physical_optimizer::output_requirements::OutputRequirements;
55-
use datafusion_physical_optimizer::PhysicalOptimizerRule;
55+
use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
5656
use datafusion_physical_plan::aggregates::{
5757
AggregateExec, AggregateMode, PhysicalGroupBy,
5858
};
@@ -489,7 +489,9 @@ impl TestConfig {
489489
) -> Result<Arc<dyn ExecutionPlan>> {
490490
// Add the ancillary output requirements operator at the start:
491491
let optimizer = OutputRequirements::new_add_mode();
492-
let mut optimized = optimizer.optimize(plan.clone(), &self.config)?;
492+
let session_config = SessionConfig::from(self.config.clone());
493+
let optimizer_context = OptimizerContext::new(session_config.clone());
494+
let mut optimized = optimizer.optimize_plan(plan.clone(), &optimizer_context)?;
493495

494496
// This file has 2 rules that use tree node, apply these rules to original plan consecutively
495497
// After these operations tree nodes should be in a consistent state.
@@ -525,21 +527,25 @@ impl TestConfig {
525527
}
526528

527529
for run in optimizers_to_run {
530+
let session_config = SessionConfig::from(self.config.clone());
531+
let optimizer_context = OptimizerContext::new(session_config.clone());
528532
optimized = match run {
529533
Run::Distribution => {
530534
let optimizer = EnforceDistribution::new();
531-
optimizer.optimize(optimized, &self.config)?
535+
optimizer.optimize_plan(optimized, &optimizer_context)?
532536
}
533537
Run::Sorting => {
534538
let optimizer = EnforceSorting::new();
535-
optimizer.optimize(optimized, &self.config)?
539+
optimizer.optimize_plan(optimized, &optimizer_context)?
536540
}
537541
};
538542
}
539543

540544
// Remove the ancillary output requirements operator when done:
541545
let optimizer = OutputRequirements::new_remove_mode();
542-
let optimized = optimizer.optimize(optimized, &self.config)?;
546+
let session_config = SessionConfig::from(self.config.clone());
547+
let optimizer_context = OptimizerContext::new(session_config.clone());
548+
let optimized = optimizer.optimize_plan(optimized, &optimizer_context)?;
543549

544550
Ok(optimized)
545551
}
@@ -3372,7 +3378,10 @@ SortRequiredExec: [a@0 ASC]
33723378
config.execution.target_partitions = 10;
33733379
config.optimizer.enable_round_robin_repartition = true;
33743380
config.optimizer.prefer_existing_sort = false;
3375-
let dist_plan = EnforceDistribution::new().optimize(physical_plan, &config)?;
3381+
let session_config = SessionConfig::from(config);
3382+
let optimizer_context = OptimizerContext::new(session_config.clone());
3383+
let dist_plan =
3384+
EnforceDistribution::new().optimize_plan(physical_plan, &optimizer_context)?;
33763385
// Since at the start of the rule ordering requirement is not satisfied
33773386
// EnforceDistribution rule doesn't satisfy this requirement either.
33783387
assert_plan!(dist_plan, @r"
@@ -3408,7 +3417,10 @@ SortRequiredExec: [a@0 ASC]
34083417
config.execution.target_partitions = 10;
34093418
config.optimizer.enable_round_robin_repartition = true;
34103419
config.optimizer.prefer_existing_sort = false;
3411-
let dist_plan = EnforceDistribution::new().optimize(physical_plan, &config)?;
3420+
let session_config = SessionConfig::from(config);
3421+
let optimizer_context = OptimizerContext::new(session_config.clone());
3422+
let dist_plan =
3423+
EnforceDistribution::new().optimize_plan(physical_plan, &optimizer_context)?;
34123424
// Since at the start of the rule ordering requirement is satisfied
34133425
// EnforceDistribution rule satisfy this requirement also.
34143426
assert_plan!(dist_plan, @r"

datafusion/core/tests/physical_optimizer/enforce_sorting.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ use datafusion_physical_optimizer::enforce_sorting::replace_with_order_preservin
5656
use datafusion_physical_optimizer::enforce_sorting::sort_pushdown::{SortPushDown, assign_initial_requirements, pushdown_sorts};
5757
use datafusion_physical_optimizer::enforce_distribution::EnforceDistribution;
5858
use datafusion_physical_optimizer::output_requirements::OutputRequirementExec;
59-
use datafusion_physical_optimizer::PhysicalOptimizerRule;
59+
use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
6060
use datafusion::prelude::*;
6161
use arrow::array::{Int32Array, RecordBatch};
6262
use arrow::datatypes::{Field};
@@ -175,8 +175,10 @@ impl EnforceSortingTest {
175175
let input_plan_string = displayable(self.plan.as_ref()).indent(true).to_string();
176176

177177
// Run the actual optimizer
178+
let session_config = SessionConfig::from(config);
179+
let optimizer_context = OptimizerContext::new(session_config.clone());
178180
let optimized_physical_plan = EnforceSorting::new()
179-
.optimize(Arc::clone(&self.plan), &config)
181+
.optimize_plan(Arc::clone(&self.plan), &optimizer_context)
180182
.expect("enforce_sorting failed");
181183

182184
// Get string representation of the plan
@@ -2363,23 +2365,26 @@ async fn test_commutativity() -> Result<()> {
23632365
"#);
23642366

23652367
let config = ConfigOptions::new();
2368+
let session_config = SessionConfig::from(config);
2369+
let optimizer_context = OptimizerContext::new(session_config.clone());
23662370
let rules = vec![
23672371
Arc::new(EnforceDistribution::new()) as Arc<dyn PhysicalOptimizerRule>,
23682372
Arc::new(EnforceSorting::new()) as Arc<dyn PhysicalOptimizerRule>,
23692373
];
23702374
let mut first_plan = orig_plan.clone();
23712375
for rule in rules {
2372-
first_plan = rule.optimize(first_plan, &config)?;
2376+
first_plan = rule.optimize_plan(first_plan, &optimizer_context)?;
23732377
}
23742378

2379+
let optimizer_context2 = OptimizerContext::new(session_config.clone());
23752380
let rules = vec![
23762381
Arc::new(EnforceSorting::new()) as Arc<dyn PhysicalOptimizerRule>,
23772382
Arc::new(EnforceDistribution::new()) as Arc<dyn PhysicalOptimizerRule>,
23782383
Arc::new(EnforceSorting::new()) as Arc<dyn PhysicalOptimizerRule>,
23792384
];
23802385
let mut second_plan = orig_plan.clone();
23812386
for rule in rules {
2382-
second_plan = rule.optimize(second_plan, &config)?;
2387+
second_plan = rule.optimize_plan(second_plan, &optimizer_context2)?;
23832388
}
23842389

23852390
assert_eq!(get_plan_string(&first_plan), get_plan_string(&second_plan));

0 commit comments

Comments
 (0)