Skip to content

Commit 14f34f6

Browse files
fix: preserve byte-size statistics in AggregateExec (#18885)
Previously, AggregateExec dropped total_byte_size statistics (Precision::Absent) through aggregation operations, preventing the optimizer from making informed decisions about memory allocation and execution strategies(join side selection -> dynamic filters). This commit implements proportional byte-size scaling based on row count ratios: - Added calculate_scaled_byte_size helper with inline optimization - Scales byte size for Final/FinalPartitioned without GROUP BY - Scales byte size proportionally for all other aggregation modes - Always returns Precision::Inexact for estimates (semantically correct) - Returns Precision::Absent when insufficient input statistics Added test coverage for edge cases (absent statistics, zero rows). ## Which issue does this PR close? #18850 - Closes #18850 ## Rationale for this change Without byte-size statistics, the optimizer cannot estimate memory requirements for join-side selection, dynamic filter generation, and memory allocation decisions. This preserves statistics using proportional scaling (bytes_per_row × output_rows). ## What changes are included in this PR? 1. Modified `statistics_inner` to calculate proportional byte size instead of returning `Precision::Absent` 2. Added `calculate_scaled_byte_size` helper (inline optimized, guards against division by zero) 3. Updated test assertions and added edge case coverage ## Are these changes tested? Yes: - New `test_aggregate_statistics_edge_cases` covers edge cases scenarios - Existing tests confirm stats propagate correctly through the aggregation pipeline ## Are there any user-facing changes? No breaking changes. Internal optimization that may improve query planning and provide more accurate memory estimates in EXPLAIN output. Co-authored-by: Daniël Heres <danielheres@gmail.com>
1 parent 7cbb443 commit 14f34f6

File tree

4 files changed

+182
-62
lines changed

4 files changed

+182
-62
lines changed

datafusion/core/tests/dataframe/mod.rs

Lines changed: 54 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -3325,30 +3325,33 @@ async fn test_count_wildcard_on_where_scalar_subquery() -> Result<()> {
33253325
assert_snapshot!(
33263326
pretty_format_batches(&sql_results).unwrap(),
33273327
@r"
3328-
+---------------+-------------------------------------------------------------------------------------------------------------------------+
3329-
| plan_type | plan |
3330-
+---------------+-------------------------------------------------------------------------------------------------------------------------+
3331-
| logical_plan | Projection: t1.a, t1.b |
3332-
| | Filter: CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(0) ELSE __scalar_sq_1.count(*) END > Int64(0) |
3333-
| | Projection: t1.a, t1.b, __scalar_sq_1.count(*), __scalar_sq_1.__always_true |
3334-
| | Left Join: t1.a = __scalar_sq_1.a |
3335-
| | TableScan: t1 projection=[a, b] |
3336-
| | SubqueryAlias: __scalar_sq_1 |
3337-
| | Projection: count(Int64(1)) AS count(*), t2.a, Boolean(true) AS __always_true |
3338-
| | Aggregate: groupBy=[[t2.a]], aggr=[[count(Int64(1))]] |
3339-
| | TableScan: t2 projection=[a] |
3340-
| physical_plan | FilterExec: CASE WHEN __always_true@3 IS NULL THEN 0 ELSE count(*)@2 END > 0, projection=[a@0, b@1] |
3341-
| | CoalesceBatchesExec: target_batch_size=8192 |
3342-
| | HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, a@1)], projection=[a@0, b@1, count(*)@2, __always_true@4] |
3343-
| | DataSourceExec: partitions=1, partition_sizes=[1] |
3344-
| | ProjectionExec: expr=[count(Int64(1))@1 as count(*), a@0 as a, true as __always_true] |
3345-
| | AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count(Int64(1))] |
3346-
| | CoalesceBatchesExec: target_batch_size=8192 |
3347-
| | RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1 |
3348-
| | AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(Int64(1))] |
3349-
| | DataSourceExec: partitions=1, partition_sizes=[1] |
3350-
| | |
3351-
+---------------+-------------------------------------------------------------------------------------------------------------------------+
3328+
+---------------+------------------------------------------------------------------------------------------------------------------------------+
3329+
| plan_type | plan |
3330+
+---------------+------------------------------------------------------------------------------------------------------------------------------+
3331+
| logical_plan | Projection: t1.a, t1.b |
3332+
| | Filter: CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(0) ELSE __scalar_sq_1.count(*) END > Int64(0) |
3333+
| | Projection: t1.a, t1.b, __scalar_sq_1.count(*), __scalar_sq_1.__always_true |
3334+
| | Left Join: t1.a = __scalar_sq_1.a |
3335+
| | TableScan: t1 projection=[a, b] |
3336+
| | SubqueryAlias: __scalar_sq_1 |
3337+
| | Projection: count(Int64(1)) AS count(*), t2.a, Boolean(true) AS __always_true |
3338+
| | Aggregate: groupBy=[[t2.a]], aggr=[[count(Int64(1))]] |
3339+
| | TableScan: t2 projection=[a] |
3340+
| physical_plan | FilterExec: CASE WHEN __always_true@3 IS NULL THEN 0 ELSE count(*)@2 END > 0, projection=[a@0, b@1] |
3341+
| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 |
3342+
| | ProjectionExec: expr=[a@2 as a, b@3 as b, count(*)@0 as count(*), __always_true@1 as __always_true] |
3343+
| | CoalesceBatchesExec: target_batch_size=8192 |
3344+
| | HashJoinExec: mode=CollectLeft, join_type=Right, on=[(a@1, a@0)], projection=[count(*)@0, __always_true@2, a@3, b@4] |
3345+
| | CoalescePartitionsExec |
3346+
| | ProjectionExec: expr=[count(Int64(1))@1 as count(*), a@0 as a, true as __always_true] |
3347+
| | AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count(Int64(1))] |
3348+
| | CoalesceBatchesExec: target_batch_size=8192 |
3349+
| | RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1 |
3350+
| | AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(Int64(1))] |
3351+
| | DataSourceExec: partitions=1, partition_sizes=[1] |
3352+
| | DataSourceExec: partitions=1, partition_sizes=[1] |
3353+
| | |
3354+
+---------------+------------------------------------------------------------------------------------------------------------------------------+
33523355
"
33533356
);
33543357

@@ -3380,30 +3383,33 @@ async fn test_count_wildcard_on_where_scalar_subquery() -> Result<()> {
33803383
assert_snapshot!(
33813384
pretty_format_batches(&df_results).unwrap(),
33823385
@r"
3383-
+---------------+-------------------------------------------------------------------------------------------------------------------------+
3384-
| plan_type | plan |
3385-
+---------------+-------------------------------------------------------------------------------------------------------------------------+
3386-
| logical_plan | Projection: t1.a, t1.b |
3387-
| | Filter: CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(0) ELSE __scalar_sq_1.count(*) END > Int64(0) |
3388-
| | Projection: t1.a, t1.b, __scalar_sq_1.count(*), __scalar_sq_1.__always_true |
3389-
| | Left Join: t1.a = __scalar_sq_1.a |
3390-
| | TableScan: t1 projection=[a, b] |
3391-
| | SubqueryAlias: __scalar_sq_1 |
3392-
| | Projection: count(*), t2.a, Boolean(true) AS __always_true |
3393-
| | Aggregate: groupBy=[[t2.a]], aggr=[[count(Int64(1)) AS count(*)]] |
3394-
| | TableScan: t2 projection=[a] |
3395-
| physical_plan | FilterExec: CASE WHEN __always_true@3 IS NULL THEN 0 ELSE count(*)@2 END > 0, projection=[a@0, b@1] |
3396-
| | CoalesceBatchesExec: target_batch_size=8192 |
3397-
| | HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, a@1)], projection=[a@0, b@1, count(*)@2, __always_true@4] |
3398-
| | DataSourceExec: partitions=1, partition_sizes=[1] |
3399-
| | ProjectionExec: expr=[count(*)@1 as count(*), a@0 as a, true as __always_true] |
3400-
| | AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count(*)] |
3401-
| | CoalesceBatchesExec: target_batch_size=8192 |
3402-
| | RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1 |
3403-
| | AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(*)] |
3404-
| | DataSourceExec: partitions=1, partition_sizes=[1] |
3405-
| | |
3406-
+---------------+-------------------------------------------------------------------------------------------------------------------------+
3386+
+---------------+------------------------------------------------------------------------------------------------------------------------------+
3387+
| plan_type | plan |
3388+
+---------------+------------------------------------------------------------------------------------------------------------------------------+
3389+
| logical_plan | Projection: t1.a, t1.b |
3390+
| | Filter: CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(0) ELSE __scalar_sq_1.count(*) END > Int64(0) |
3391+
| | Projection: t1.a, t1.b, __scalar_sq_1.count(*), __scalar_sq_1.__always_true |
3392+
| | Left Join: t1.a = __scalar_sq_1.a |
3393+
| | TableScan: t1 projection=[a, b] |
3394+
| | SubqueryAlias: __scalar_sq_1 |
3395+
| | Projection: count(*), t2.a, Boolean(true) AS __always_true |
3396+
| | Aggregate: groupBy=[[t2.a]], aggr=[[count(Int64(1)) AS count(*)]] |
3397+
| | TableScan: t2 projection=[a] |
3398+
| physical_plan | FilterExec: CASE WHEN __always_true@3 IS NULL THEN 0 ELSE count(*)@2 END > 0, projection=[a@0, b@1] |
3399+
| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 |
3400+
| | ProjectionExec: expr=[a@2 as a, b@3 as b, count(*)@0 as count(*), __always_true@1 as __always_true] |
3401+
| | CoalesceBatchesExec: target_batch_size=8192 |
3402+
| | HashJoinExec: mode=CollectLeft, join_type=Right, on=[(a@1, a@0)], projection=[count(*)@0, __always_true@2, a@3, b@4] |
3403+
| | CoalescePartitionsExec |
3404+
| | ProjectionExec: expr=[count(*)@1 as count(*), a@0 as a, true as __always_true] |
3405+
| | AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count(*)] |
3406+
| | CoalesceBatchesExec: target_batch_size=8192 |
3407+
| | RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1 |
3408+
| | AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(*)] |
3409+
| | DataSourceExec: partitions=1, partition_sizes=[1] |
3410+
| | DataSourceExec: partitions=1, partition_sizes=[1] |
3411+
| | |
3412+
+---------------+------------------------------------------------------------------------------------------------------------------------------+
34073413
"
34083414
);
34093415

datafusion/core/tests/physical_optimizer/partition_statistics.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -627,7 +627,7 @@ mod test {
627627

628628
let expected_p0_statistics = Statistics {
629629
num_rows: Precision::Inexact(2),
630-
total_byte_size: Precision::Absent,
630+
total_byte_size: Precision::Inexact(110),
631631
column_statistics: vec![
632632
ColumnStatistics {
633633
null_count: Precision::Absent,
@@ -645,7 +645,7 @@ mod test {
645645

646646
let expected_p1_statistics = Statistics {
647647
num_rows: Precision::Inexact(2),
648-
total_byte_size: Precision::Absent,
648+
total_byte_size: Precision::Inexact(110),
649649
column_statistics: vec![
650650
ColumnStatistics {
651651
null_count: Precision::Absent,

0 commit comments

Comments
 (0)