Skip to content
Draft

Testing #18891

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions datafusion/catalog-listing/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ pub struct ListingOptions {
/// Group files to avoid that the number of partitions exceeds
/// this limit
pub target_partitions: usize,
/// Preserve the partition column value boundaries when forming file groups.
/// When true, files that share the same partition column values are kept in
/// the same execution partition, allowing downstream operators to rely on
/// those columns being constant per partition.
pub preserve_partition_values: bool,
/// Optional pre-known sort order(s). Must be `SortExpr`s.
///
/// DataFusion may take advantage of this ordering to omit sorts
Expand Down Expand Up @@ -77,6 +82,7 @@ impl ListingOptions {
table_partition_cols: vec![],
collect_stat: false,
target_partitions: 1,
preserve_partition_values: false,
file_sort_order: vec![],
}
}
Expand All @@ -89,6 +95,12 @@ impl ListingOptions {
pub fn with_session_config_options(mut self, config: &SessionConfig) -> Self {
self = self.with_target_partitions(config.target_partitions());
self = self.with_collect_stat(config.collect_statistics());
if !self.table_partition_cols.is_empty() {
self.preserve_partition_values = config
.options()
.execution
.listing_table_preserve_partition_values;
}
self
}

Expand Down Expand Up @@ -239,6 +251,14 @@ impl ListingOptions {
self
}

/// Control whether files that share the same partition column values should
/// remain in the same execution partition. Defaults to `false`, but is
/// automatically enabled when partition columns are provided.
pub fn with_preserve_partition_values(mut self, preserve: bool) -> Self {
self.preserve_partition_values = preserve;
self
}

/// Set file sort order on [`ListingOptions`] and returns self.
///
/// ```
Expand Down
69 changes: 44 additions & 25 deletions datafusion/catalog-listing/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::empty::EmptyExec;
use datafusion_physical_plan::ExecutionPlan;
use futures::{future, stream, Stream, StreamExt, TryStreamExt};
use log::debug;
use object_store::ObjectStore;
use std::any::Any;
use std::collections::HashMap;
Expand Down Expand Up @@ -457,32 +458,42 @@ impl TableProvider for ListingTable {
}

let output_ordering = self.try_create_output_ordering(state.execution_props())?;
match state
.config_options()
.execution
.split_file_groups_by_statistics
.then(|| {
output_ordering.first().map(|output_ordering| {
FileScanConfig::split_groups_by_statistics_with_target_partitions(
&self.table_schema,
&partitioned_file_lists,
output_ordering,
self.options.target_partitions,
)
if !self.options.preserve_partition_values {
// Split file groups by statistics to optimize sorted scans
match state
.config_options()
.execution
.split_file_groups_by_statistics
.then(|| {
output_ordering.first().map(|output_ordering| {
FileScanConfig::split_groups_by_statistics_with_target_partitions(
&self.table_schema,
&partitioned_file_lists,
output_ordering,
self.options.target_partitions,
)
})
})
})
.flatten()
{
Some(Err(e)) => log::debug!("failed to split file groups by statistics: {e}"),
Some(Ok(new_groups)) => {
if new_groups.len() <= self.options.target_partitions {
partitioned_file_lists = new_groups;
} else {
log::debug!("attempted to split file groups by statistics, but there were more file groups than target_partitions; falling back to unordered")
.flatten()
{
Some(Err(e)) => {
log::debug!("failed to split file groups by statistics: {e}")
}
}
None => {} // no ordering required
};
Some(Ok(new_groups)) => {
if new_groups.len() <= self.options.target_partitions {
partitioned_file_lists = new_groups;
} else {
log::debug!("attempted to split file groups by statistics, but there were more file groups than target_partitions; falling back to unordered")
}
}
None => {} // no ordering required
};
} else {
debug!(
"Skipping statistics-based file splitting because preserve_partition_values is enabled. \
File groups will respect partition boundaries instead of sort statistics."
);
}

let Some(object_store_url) =
self.table_paths.first().map(ListingTableUrl::object_store)
Expand All @@ -508,6 +519,9 @@ impl TableProvider for ListingTable {
.with_limit(limit)
.with_output_ordering(output_ordering)
.with_expr_adapter(self.expr_adapter_factory.clone())
.with_preserve_partition_values(
self.options.preserve_partition_values,
)
.build(),
)
.await?;
Expand Down Expand Up @@ -653,7 +667,12 @@ impl ListingTable {
let (file_group, inexact_stats) =
get_files_with_limit(files, limit, self.options.collect_stat).await?;

let file_groups = file_group.split_files(self.options.target_partitions);
let target_partitions = self.options.target_partitions.max(1);
let file_groups = if self.options.preserve_partition_values {
file_group.split_by_partition_values()
} else {
file_group.split_files(target_partitions)
};
let (mut file_groups, mut stats) = compute_all_files_statistics(
file_groups,
self.schema(),
Expand Down
8 changes: 8 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,14 @@ config_namespace! {
/// inferred and will be represented in the table schema).
pub listing_table_factory_infer_partitions: bool, default = true

/// Should a `ListingTable` created through the `ListingTableFactory` keep files that share
/// the same partition column values in the same execution partition when `PARTITIONED BY`
/// columns are declared. Defaults to false to avoid overhead on non-partitioned tables.
/// Optmization is beest with moderate partition counts (10-100 partitions) and large partitions
/// (>500K rows). With many small partitions (500+), task scheduling overhead may outweigh shuffle
/// savings.
pub listing_table_preserve_partition_values: bool, default = false

/// Should DataFusion support recursive CTEs
pub enable_recursive_ctes: bool, default = true

Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -271,3 +271,8 @@ name = "dataframe"
[[bench]]
harness = false
name = "spm"

[[bench]]
harness = false
name = "hive_partitioned"
required-features = ["parquet"]
82 changes: 81 additions & 1 deletion datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,13 @@ impl TableProviderFactory for ListingTableFactory {
};

options = options.with_table_partition_cols(table_partition_cols);

if !cmd.table_partition_cols.is_empty() {
let preserve = session_state
.config_options()
.execution
.listing_table_preserve_partition_values;
options = options.with_preserve_partition_values(preserve);
}
options
.validate_partitions(session_state, &table_path)
.await?;
Expand Down Expand Up @@ -216,6 +222,7 @@ mod tests {
datasource::file_format::csv::CsvFormat, execution::context::SessionContext,
};

use arrow::datatypes::{Field, Schema};
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{Constraints, DFSchema, TableReference};

Expand Down Expand Up @@ -519,4 +526,77 @@ mod tests {
let listing_options = listing_table.options();
assert!(listing_options.table_partition_cols.is_empty());
}

fn partitioned_table_cmd(
location: &Path,
schema: Arc<DFSchema>,
) -> CreateExternalTable {
CreateExternalTable {
name: TableReference::bare("fact"),
location: location.to_str().unwrap().to_string(),
file_type: "parquet".to_string(),
schema,
table_partition_cols: vec!["bucket".to_string()],
if_not_exists: false,
or_replace: false,
temporary: false,
definition: None,
order_exprs: vec![],
unbounded: false,
options: HashMap::new(),
constraints: Constraints::default(),
column_defaults: HashMap::new(),
}
}

fn value_and_partition_schema() -> Arc<DFSchema> {
let arrow_schema = Arc::new(Schema::new(vec![
Field::new("value", DataType::Int64, true),
Field::new("bucket", DataType::Int64, true),
]));
Arc::new(arrow_schema.to_dfschema().unwrap())
}

#[tokio::test]
async fn test_preserve_partition_values_disabled_by_default() {
let dir = tempfile::tempdir().unwrap();
let factory = ListingTableFactory::new();
let context = SessionContext::new();
let state = context.state();
let cmd = partitioned_table_cmd(dir.path(), value_and_partition_schema());

let table_provider = factory.create(&state, &cmd).await.unwrap();
let listing_table = table_provider
.as_any()
.downcast_ref::<ListingTable>()
.unwrap();
assert!(!listing_table.options().preserve_partition_values);
}

#[tokio::test]
async fn test_preserve_partition_values_can_be_enabled() {
let dir = tempfile::tempdir().unwrap();
let factory = ListingTableFactory::new();
let mut config = SessionConfig::new();
config
.options_mut()
.execution
.listing_table_preserve_partition_values = true;
let context = SessionContext::new_with_config(config);
let state = context.state();
assert!(
state
.config_options()
.execution
.listing_table_preserve_partition_values
);
let cmd = partitioned_table_cmd(dir.path(), value_and_partition_schema());

let table_provider = factory.create(&state, &cmd).await.unwrap();
let listing_table = table_provider
.as_any()
.downcast_ref::<ListingTable>()
.unwrap();
assert!(listing_table.options().preserve_partition_values);
}
}
58 changes: 52 additions & 6 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ use datafusion_expr::{
WindowFrame, WindowFrameBound, WriteOp,
};
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
use datafusion_physical_expr::expressions::Literal;
use datafusion_physical_expr::expressions::{Column, Literal};
use datafusion_physical_expr::{
create_physical_sort_exprs, LexOrdering, PhysicalSortExpr,
};
Expand Down Expand Up @@ -799,18 +799,64 @@ impl DefaultPhysicalPlanner {
Arc::clone(&physical_input_schema),
)?);

let can_repartition = !groups.is_empty()
&& session_state.config().target_partitions() > 1
&& session_state.config().repartition_aggregations();
let grouping_input_exprs = groups.input_exprs();
let output_partitioning = initial_aggr.properties().output_partitioning();

// Check if the data is partitioned on the grouping columns.
// If all partition columns are present in the GROUP BY, we can avoid repartitioning.
let key_partition_on_group =
if groups.is_single() && !groups.expr().is_empty() {
if let Partitioning::KeyPartitioned(ref partition_exprs, _) =
output_partitioning
{
// All partition columns must be present in the GROUP BY
partition_exprs.iter().all(|part_expr| {
let Some(part_col) =
part_expr.as_any().downcast_ref::<Column>()
else {
return false;
};

grouping_input_exprs.iter().any(|group_expr| {
group_expr
.as_any()
.downcast_ref::<Column>()
.map_or(false, |group_col| {
group_col.name() == part_col.name()
})
})
})
} else {
false
}
} else {
false
};

// Some aggregators may be modified during initialization for
// optimization purposes. For example, a FIRST_VALUE may turn
// into a LAST_VALUE with the reverse ordering requirement.
// To reflect such changes to subsequent stages, use the updated
// `AggregateFunctionExpr`/`PhysicalSortExpr` objects.
let updated_aggregates = initial_aggr.aggr_expr().to_vec();

let next_partition_mode = if can_repartition {
let has_ordered_aggregate = updated_aggregates
.iter()
.any(|agg| !agg.order_bys().is_empty());
let requires_single_partition =
groups.expr().is_empty() || has_ordered_aggregate;

// Determine if we can use parallel final aggregation:
// 1. The scan is already KeyPartitioned on the grouping keys with >1 partitions
// 2. We are allowed to insert a hash repartition before the final stage
let child_partition_count = output_partitioning.partition_count();
let key_partition_supports_parallel =
key_partition_on_group && child_partition_count > 1;
let hash_repartition_enabled = !groups.is_empty()
&& session_state.config().target_partitions() > 1
&& session_state.config().repartition_aggregations();
let use_partitioned_final = !requires_single_partition
&& (key_partition_supports_parallel || hash_repartition_enabled);
let next_partition_mode = if use_partitioned_final {
// construct a second aggregation with 'AggregateMode::FinalPartitioned'
AggregateMode::FinalPartitioned
} else {
Expand Down
Loading
Loading