Skip to content

Commit cd9cb4e

Browse files
authored
refactor(query): experimental aggregate final with recursive spill support (#18866)
* resolve conflict Signed-off-by: dqhl76 <dqhl76@gmail.com> * refactor: add settings * refactor: before repartition * save * save * add back log * remove useless * extract some from final * save * save * fix: partition id * fix: compile * save * save * refactor * refactor * save * this commit add Debug trait for AggregateMeta, should revert before merge * fix: reset aggregate status * fix: fix dispatcher cannot finish caused hang * clean, start spill and restore refactor * check memory pressure * feat: add support for recursive schedule * feat: add support for recursive spill * make lint * partition stream * Revert "this commit add Debug trait for AggregateMeta, should revert before merge" This reverts commit c9ebff3. * chore: clean debug * refactor: merge dispatcher with bucket scheduler * add back aggregate * add back spill restore * recursive spill * make build pass * partition stream need finalize * refine event, make it more clearly to understand * fix: reset flag during new round begin * debug * debug * make lint * fix * chore: add random_spill_percentage used for testing * fix: spill flag not reset * used for debug * fix: skip if output datablock is empty (caused downstream processor merge join processor inf loop) * fix: add blocking layer * disable random_spill_percentage * fix: try fix cluster aggregate hang * fix: stackoverflow in ci * improve performance Revert "chore: add random_spill_percentage used for testing" This reverts commit 0c45ea6 * disable to ready merge --------- Signed-off-by: dqhl76 <dqhl76@gmail.com>
1 parent 2ac084a commit cd9cb4e

File tree

14 files changed

+1631
-55
lines changed

14 files changed

+1631
-55
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,7 @@ opendal = { version = "0.53.2", features = [
399399
"layers-fastrace",
400400
"layers-prometheus-client",
401401
"layers-async-backtrace",
402+
"layers-blocking",
402403
"services-s3",
403404
"services-fs",
404405
"services-gcs",

src/query/service/src/physical_plans/physical_aggregate_final.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,8 @@ impl IPhysicalPlan for AggregateFinal {
148148
.get_enable_experimental_aggregate_hashtable()?;
149149
let max_spill_io_requests = builder.settings.get_max_spill_io_requests()?;
150150
let max_restore_worker = builder.settings.get_max_aggregate_restore_worker()?;
151+
let experiment_aggregate_final =
152+
builder.settings.get_enable_experiment_aggregate_final()?;
151153

152154
let mut is_cluster_aggregate = false;
153155
if ExchangeSource::check_physical_plan(&self.input) {
@@ -199,6 +201,8 @@ impl IPhysicalPlan for AggregateFinal {
199201
params.clone(),
200202
max_restore_worker,
201203
after_group_parallel,
204+
experiment_aggregate_final,
205+
builder.ctx.clone(),
202206
)
203207
}
204208
}

src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ impl SerializedPayload {
107107
}
108108
}
109109

110+
#[derive(Debug)]
110111
pub struct BucketSpilledPayload {
111112
pub bucket: isize,
112113
pub location: String,
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use databend_common_exception::Result;
18+
use databend_common_pipeline_core::processors::InputPort;
19+
use databend_common_pipeline_core::processors::OutputPort;
20+
use databend_common_pipeline_core::processors::ProcessorPtr;
21+
use databend_common_pipeline_core::Pipe;
22+
use databend_common_pipeline_core::PipeItem;
23+
use databend_common_pipeline_core::Pipeline;
24+
use databend_common_pipeline_core::TransformPipeBuilder;
25+
use databend_common_storage::DataOperator;
26+
use parking_lot::Mutex;
27+
use tokio::sync::Barrier;
28+
use tokio::sync::Semaphore;
29+
30+
use crate::pipelines::processors::transforms::aggregator::new_final_aggregate::FinalAggregateSharedState;
31+
use crate::pipelines::processors::transforms::aggregator::new_final_aggregate::FinalAggregateSpiller;
32+
use crate::pipelines::processors::transforms::aggregator::new_final_aggregate::NewFinalAggregateTransform;
33+
use crate::pipelines::processors::transforms::aggregator::new_final_aggregate::TransformPartitionBucketScatter;
34+
use crate::pipelines::processors::transforms::aggregator::transform_partition_bucket::TransformPartitionBucket;
35+
use crate::pipelines::processors::transforms::aggregator::AggregatorParams;
36+
use crate::pipelines::processors::transforms::aggregator::TransformAggregateSpillReader;
37+
use crate::pipelines::processors::transforms::aggregator::TransformFinalAggregate;
38+
use crate::sessions::QueryContext;
39+
40+
fn build_partition_bucket_experimental(
41+
pipeline: &mut Pipeline,
42+
params: Arc<AggregatorParams>,
43+
after_worker: usize,
44+
ctx: Arc<QueryContext>,
45+
) -> Result<()> {
46+
let operator = DataOperator::instance().spill_operator();
47+
48+
// PartitionedPayload only accept power of two partitions
49+
let mut output_num = after_worker.next_power_of_two();
50+
const MAX_PARTITION_COUNT: usize = 128;
51+
if output_num > MAX_PARTITION_COUNT {
52+
output_num = MAX_PARTITION_COUNT;
53+
}
54+
55+
let input_num = pipeline.output_len();
56+
let scatter = TransformPartitionBucketScatter::create(input_num, output_num, params.clone())?;
57+
let scatter_inputs = scatter.get_inputs();
58+
let scatter_outputs = scatter.get_outputs();
59+
60+
pipeline.add_pipe(Pipe::create(
61+
scatter_inputs.len(),
62+
scatter_outputs.len(),
63+
vec![PipeItem::create(
64+
ProcessorPtr::create(Box::new(scatter)),
65+
scatter_inputs,
66+
scatter_outputs,
67+
)],
68+
));
69+
70+
let mut builder = TransformPipeBuilder::create();
71+
let barrier = Arc::new(Barrier::new(output_num));
72+
let shared_state = Arc::new(Mutex::new(FinalAggregateSharedState::new(output_num)));
73+
74+
for id in 0..output_num {
75+
let spiller = FinalAggregateSpiller::try_create(ctx.clone(), operator.clone())?;
76+
let input_port = InputPort::create();
77+
let output_port = OutputPort::create();
78+
let processor = NewFinalAggregateTransform::try_create(
79+
input_port.clone(),
80+
output_port.clone(),
81+
id,
82+
params.clone(),
83+
output_num,
84+
barrier.clone(),
85+
shared_state.clone(),
86+
spiller,
87+
ctx.clone(),
88+
)?;
89+
builder.add_transform(input_port, output_port, ProcessorPtr::create(processor));
90+
}
91+
92+
pipeline.add_pipe(builder.finalize());
93+
pipeline.resize(after_worker, true)?;
94+
95+
Ok(())
96+
}
97+
98+
fn build_partition_bucket_legacy(
99+
pipeline: &mut Pipeline,
100+
params: Arc<AggregatorParams>,
101+
max_restore_worker: u64,
102+
after_worker: usize,
103+
) -> Result<()> {
104+
let operator = DataOperator::instance().spill_operator();
105+
106+
let input_nums = pipeline.output_len();
107+
let transform = TransformPartitionBucket::create(input_nums, params.clone())?;
108+
109+
let output = transform.get_output();
110+
let inputs_port = transform.get_inputs();
111+
112+
pipeline.add_pipe(Pipe::create(inputs_port.len(), 1, vec![PipeItem::create(
113+
ProcessorPtr::create(Box::new(transform)),
114+
inputs_port,
115+
vec![output],
116+
)]));
117+
118+
pipeline.try_resize(std::cmp::min(input_nums, max_restore_worker as usize))?;
119+
let semaphore = Arc::new(Semaphore::new(params.max_spill_io_requests));
120+
pipeline.add_transform(|input, output| {
121+
let operator = operator.clone();
122+
TransformAggregateSpillReader::create(input, output, operator, semaphore.clone())
123+
})?;
124+
pipeline.add_transform(|input, output| {
125+
Ok(ProcessorPtr::create(TransformFinalAggregate::try_create(
126+
input,
127+
output,
128+
params.clone(),
129+
)?))
130+
})?;
131+
pipeline.try_resize(after_worker)?;
132+
133+
Ok(())
134+
}
135+
136+
/// Build partition bucket pipeline based on the experiment_aggregate_final flag.
137+
/// Dispatches to either experimental or legacy implementation.
138+
pub fn build_partition_bucket(
139+
pipeline: &mut Pipeline,
140+
params: Arc<AggregatorParams>,
141+
max_restore_worker: u64,
142+
after_worker: usize,
143+
experiment_aggregate_final: bool,
144+
ctx: Arc<QueryContext>,
145+
) -> Result<()> {
146+
if experiment_aggregate_final {
147+
build_partition_bucket_experimental(pipeline, params, after_worker, ctx)
148+
} else {
149+
build_partition_bucket_legacy(pipeline, params, max_restore_worker, after_worker)
150+
}
151+
}

src/query/service/src/pipelines/processors/transforms/aggregator/mod.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,20 @@
1515
mod aggregate_exchange_injector;
1616
mod aggregate_meta;
1717
mod aggregator_params;
18-
mod new_transform_partition_bucket;
18+
mod build_partition_bucket;
19+
mod new_final_aggregate;
1920
mod serde;
2021
mod transform_aggregate_expand;
2122
mod transform_aggregate_final;
2223
mod transform_aggregate_partial;
24+
mod transform_partition_bucket;
2325
mod transform_single_key;
2426
mod udaf_script;
2527

2628
pub use aggregate_exchange_injector::AggregateInjector;
2729
pub use aggregate_meta::*;
2830
pub use aggregator_params::AggregatorParams;
29-
pub use new_transform_partition_bucket::build_partition_bucket;
31+
pub use build_partition_bucket::build_partition_bucket;
3032
pub use transform_aggregate_expand::TransformExpandGroupingSets;
3133
pub use transform_aggregate_final::TransformFinalAggregate;
3234
pub use transform_aggregate_partial::TransformPartialAggregate;
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use databend_common_expression::DataBlock;
16+
17+
use crate::pipelines::processors::transforms::aggregator::AggregateMeta;
18+
19+
/// Split partitioned metadata evenly into DataBlock chunks.
20+
pub fn split_partitioned_meta_into_datablocks(
21+
bucket: isize,
22+
data: Vec<AggregateMeta>,
23+
outputs_len: usize,
24+
) -> Vec<DataBlock> {
25+
if outputs_len == 0 {
26+
return vec![];
27+
}
28+
29+
let total_len = data.len();
30+
let base_chunk_size = total_len / outputs_len;
31+
let remainder = total_len % outputs_len;
32+
33+
let mut result = Vec::with_capacity(outputs_len);
34+
let mut data_iter = data.into_iter();
35+
36+
for index in 0..outputs_len {
37+
let chunk_size = if index < remainder {
38+
base_chunk_size + 1
39+
} else {
40+
base_chunk_size
41+
};
42+
43+
let chunk: Vec<AggregateMeta> = data_iter.by_ref().take(chunk_size).collect();
44+
result.push(DataBlock::empty_with_meta(
45+
AggregateMeta::create_partitioned(bucket, chunk),
46+
));
47+
}
48+
49+
result
50+
}

0 commit comments

Comments
 (0)