Skip to content

Commit af9d723

Browse files
added parallelization on key partitioning
1 parent 5bcded3 commit af9d723

File tree

2 files changed

+12
-15
lines changed

2 files changed

+12
-15
lines changed

datafusion/sqllogictest/test_files/partitioned_aggregation.slt

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,44 +31,41 @@ CREATE TABLE staging (
3131
(3, 30),
3232
(4, 40);
3333

34-
# Use COPY to create deterministic filenames (like parquet_sorted_statistics.slt)
35-
# This avoids random filenames that change on every test run
3634
query I
3735
COPY (SELECT * FROM staging WHERE part_col = 1)
38-
TO 'test_files/scratch/partitioned_aggregation/test_partitioned_table_optim/part_col=1/0.parquet'
36+
TO 'test_files/scratch/partitioned_aggregation/part_col=1/0.parquet'
3937
STORED AS PARQUET;
4038
----
4139
3
4240

4341
query I
4442
COPY (SELECT * FROM staging WHERE part_col = 2)
45-
TO 'test_files/scratch/partitioned_aggregation/test_partitioned_table_optim/part_col=2/1.parquet'
43+
TO 'test_files/scratch/partitioned_aggregation/part_col=2/1.parquet'
4644
STORED AS PARQUET;
4745
----
4846
2
4947

5048
query I
5149
COPY (SELECT * FROM staging WHERE part_col = 3)
52-
TO 'test_files/scratch/partitioned_aggregation/test_partitioned_table_optim/part_col=3/2.parquet'
50+
TO 'test_files/scratch/partitioned_aggregation/part_col=3/2.parquet'
5351
STORED AS PARQUET;
5452
----
5553
1
5654

5755
query I
5856
COPY (SELECT * FROM staging WHERE part_col = 4)
59-
TO 'test_files/scratch/partitioned_aggregation/test_partitioned_table_optim/part_col=4/3.parquet'
57+
TO 'test_files/scratch/partitioned_aggregation/part_col=4/3.parquet'
6058
STORED AS PARQUET;
6159
----
6260
1
6361

64-
# Use a new location to avoid dirty state from previous runs
6562
statement ok
6663
CREATE EXTERNAL TABLE partitioned_table_optim (
6764
part_col INT,
6865
val INT
6966
)
7067
STORED AS PARQUET
71-
LOCATION 'test_files/scratch/partitioned_aggregation/test_partitioned_table_optim/'
68+
LOCATION 'test_files/scratch/partitioned_aggregation/'
7269
PARTITIONED BY (part_col);
7370

7471
# 1. Verify plan when grouping by partition column
@@ -87,7 +84,7 @@ physical_plan
8784
02)--SortExec: expr=[part_col@0 ASC NULLS LAST], preserve_partitioning=[true]
8885
03)----ProjectionExec: expr=[part_col@0 as part_col, count(Int64(1))@1 as count(*)]
8986
04)------AggregateExec: mode=SinglePartitioned, gby=[part_col@0 as part_col], aggr=[count(Int64(1))]
90-
05)--------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/test_partitioned_table_optim/part_col=1/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/test_partitioned_table_optim/part_col=2/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/test_partitioned_table_optim/part_col=3/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/test_partitioned_table_optim/part_col=4/3.parquet]]}, projection=[part_col], file_type=parquet
87+
05)--------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/part_col=1/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/part_col=2/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/part_col=3/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/part_col=4/3.parquet]]}, projection=[part_col], file_type=parquet
9188

9289
# Verify results
9390
query II
@@ -113,7 +110,7 @@ physical_plan
113110
02)--SortExec: expr=[val@1 ASC NULLS LAST], preserve_partitioning=[true]
114111
03)----ProjectionExec: expr=[part_col@0 as part_col, val@1 as val, count(Int64(1))@2 as count(*)]
115112
04)------AggregateExec: mode=SinglePartitioned, gby=[part_col@1 as part_col, val@0 as val], aggr=[count(Int64(1))], ordering_mode=PartiallySorted([0])
116-
05)--------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/test_partitioned_table_optim/part_col=1/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/test_partitioned_table_optim/part_col=2/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/test_partitioned_table_optim/part_col=3/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/test_partitioned_table_optim/part_col=4/3.parquet]]}, projection=[val, part_col], file_type=parquet
113+
05)--------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/part_col=1/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/part_col=2/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/part_col=3/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/part_col=4/3.parquet]]}, projection=[val, part_col], file_type=parquet
117114

118115
# 3. Verify fallback when grouping by non-partition column
119116
# Should use FinalPartitioned and Repartition (not SinglePartitioned)
@@ -133,7 +130,7 @@ physical_plan
133130
05)--------CoalesceBatchesExec: target_batch_size=8192
134131
06)----------RepartitionExec: partitioning=Hash([val@0], 4), input_partitions=4
135132
07)------------AggregateExec: mode=Partial, gby=[val@0 as val], aggr=[count(Int64(1))]
136-
08)--------------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/test_partitioned_table_optim/part_col=1/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/test_partitioned_table_optim/part_col=2/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/test_partitioned_table_optim/part_col=3/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/test_partitioned_table_optim/part_col=4/3.parquet]]}, projection=[val], file_type=parquet
133+
08)--------------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/part_col=1/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/part_col=2/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/part_col=3/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/part_col=4/3.parquet]]}, projection=[val], file_type=parquet
137134

138135
# 4. Note: Even with preserve_partition_values=false, when grouping by partition column,
139136
# the optimization still applies because files are naturally grouped by partition directories.
@@ -149,7 +146,7 @@ CREATE EXTERNAL TABLE partitioned_table_disabled (
149146
val INT
150147
)
151148
STORED AS PARQUET
152-
LOCATION 'test_files/scratch/partitioned_aggregation/test_partitioned_table_optim/'
149+
LOCATION 'test_files/scratch/partitioned_aggregation/'
153150
PARTITIONED BY (part_col);
154151

155152
query TT
@@ -165,7 +162,7 @@ physical_plan
165162
02)--SortExec: expr=[part_col@0 ASC NULLS LAST], preserve_partitioning=[true]
166163
03)----ProjectionExec: expr=[part_col@0 as part_col, count(Int64(1))@1 as count(*)]
167164
04)------AggregateExec: mode=SinglePartitioned, gby=[part_col@0 as part_col], aggr=[count(Int64(1))]
168-
05)--------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/test_partitioned_table_optim/part_col=1/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/test_partitioned_table_optim/part_col=2/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/test_partitioned_table_optim/part_col=3/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/test_partitioned_table_optim/part_col=4/3.parquet]]}, projection=[part_col], file_type=parquet
165+
05)--------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/part_col=1/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/part_col=2/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/part_col=3/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/partitioned_aggregation/part_col=4/3.parquet]]}, projection=[part_col], file_type=parquet
169166

170167
# Cleanup
171168
statement ok
@@ -175,4 +172,4 @@ statement ok
175172
DROP TABLE partitioned_table_disabled;
176173

177174
statement ok
178-
DROP TABLE staging;
175+
DROP TABLE staging;

parquet-testing

Submodule parquet-testing updated 291 files

0 commit comments

Comments
 (0)