Skip to content

Commit 2ac084a

Browse files
authored
refactor(query): enable grace hash join if force join spill data (#18878)
* refactor(query): enable grace hash join if force spill join * refactor(query): enable grace hash join if force spill join * refactor(query): enable grace hash join if force spill join * refactor(query): enable grace hash join if force spill join * refactor(query): enable grace hash join if force spill join * refactor(query): enable grace hash join if force spill join * refactor(query): enable grace hash join if force spill join * refactor(query): enable grace hash join if force spill join * refactor(query): enable grace hash join if force spill join * refactor(query): enable grace hash join if force spill join * refactor(query): enable grace hash join if force spill join * refactor(query): enable grace hash join if force spill join * refactor(query): enable grace hash join if force spill join * refactor(query): enable grace hash join if force spill join
1 parent 281de99 commit 2ac084a

File tree

31 files changed

+1396
-130
lines changed

31 files changed

+1396
-130
lines changed

โ€Žsrc/query/expression/src/block.rsโ€Ž

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -907,6 +907,19 @@ impl DataBlock {
907907
})
908908
.sum()
909909
}
910+
911+
pub fn maybe_gc(self) -> DataBlock {
912+
let mut columns = Vec::with_capacity(self.entries.len());
913+
914+
for entry in self.entries {
915+
columns.push(match entry {
916+
BlockEntry::Column(column) => BlockEntry::Column(column.maybe_gc()),
917+
BlockEntry::Const(s, d, n) => BlockEntry::Const(s, d, n),
918+
});
919+
}
920+
921+
DataBlock::new(columns, self.num_rows)
922+
}
910923
}
911924

912925
impl Eq for Box<dyn BlockMetaInfo> {}

โ€Žsrc/query/service/src/global_services.rsโ€Ž

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ use crate::servers::http::v1::ClientSessionManager;
5858
use crate::servers::http::v1::HttpQueryManager;
5959
use crate::sessions::QueriesQueueManager;
6060
use crate::sessions::SessionManager;
61+
use crate::spillers::SpillsBufferPool;
6162
use crate::task::service::TaskService;
6263

6364
pub struct GlobalServices;
@@ -106,6 +107,7 @@ impl GlobalServices {
106107
// 4. cluster discovery init.
107108
ClusterDiscovery::init(config, version).await?;
108109

110+
SpillsBufferPool::init();
109111
// TODO(xuanwo):
110112
//
111113
// This part is a bit complex because catalog are used widely in different

โ€Žsrc/query/service/src/physical_plans/physical_hash_join.rsโ€Ž

Lines changed: 23 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,8 @@ use crate::physical_plans::physical_plan::PhysicalPlan;
5555
use crate::physical_plans::physical_plan::PhysicalPlanMeta;
5656
use crate::physical_plans::Exchange;
5757
use crate::physical_plans::PhysicalPlanBuilder;
58-
use crate::pipelines::processors::transforms::memory::outer_left_join::OuterLeftHashJoin;
59-
use crate::pipelines::processors::transforms::BasicHashJoinState;
58+
use crate::pipelines::processors::transforms::HashJoinFactory;
6059
use crate::pipelines::processors::transforms::HashJoinProbeState;
61-
use crate::pipelines::processors::transforms::InnerHashJoin;
6260
use crate::pipelines::processors::transforms::RuntimeFiltersDesc;
6361
use crate::pipelines::processors::transforms::TransformHashJoin;
6462
use crate::pipelines::processors::transforms::TransformHashJoinBuild;
@@ -395,16 +393,22 @@ impl HashJoin {
395393
builder: &mut PipelineBuilder,
396394
desc: Arc<HashJoinDesc>,
397395
) -> Result<()> {
398-
let state = Arc::new(BasicHashJoinState::create());
396+
let factory = self.join_factory(builder, desc)?;
397+
399398
// We must build the runtime filter before constructing the child nodes,
400399
// as we will inject some runtime filter information into the context for the child nodes to use.
401400
let rf_desc = RuntimeFiltersDesc::create(&builder.ctx, self)?;
402401

403-
if let Some((build_cache_index, _)) = self.build_side_cache_info {
404-
builder.hash_join_states.insert(
405-
build_cache_index,
406-
HashJoinStateRef::NewHashJoinState(state.clone()),
407-
);
402+
// After common subexpression elimination is completed, we can delete this type of code.
403+
{
404+
let state = factory.create_basic_state(0)?;
405+
406+
if let Some((build_cache_index, _)) = self.build_side_cache_info {
407+
builder.hash_join_states.insert(
408+
build_cache_index,
409+
HashJoinStateRef::NewHashJoinState(state.clone()),
410+
);
411+
}
408412
}
409413

410414
self.build.build_pipeline(builder)?;
@@ -441,7 +445,7 @@ impl HashJoin {
441445
build_input.clone(),
442446
probe_input.clone(),
443447
joined_output.clone(),
444-
self.create_join(&self.join_type, builder, desc.clone(), state.clone())?,
448+
factory.create_hash_join(self.join_type.clone(), 0)?,
445449
stage_sync_barrier.clone(),
446450
self.projections.clone(),
447451
rf_desc.clone(),
@@ -465,13 +469,11 @@ impl HashJoin {
465469
.resize(builder.main_pipeline.output_len(), true)
466470
}
467471

468-
fn create_join(
472+
fn join_factory(
469473
&self,
470-
join_type: &JoinType,
471-
builder: &mut PipelineBuilder,
474+
ctx: &PipelineBuilder,
472475
desc: Arc<HashJoinDesc>,
473-
state: Arc<BasicHashJoinState>,
474-
) -> Result<Box<dyn crate::pipelines::processors::transforms::Join>> {
476+
) -> Result<Arc<HashJoinFactory>> {
475477
let hash_key_types = self
476478
.build_keys
477479
.iter()
@@ -486,25 +488,12 @@ impl HashJoin {
486488
})
487489
.collect::<Vec<_>>();
488490

489-
let method = DataBlock::choose_hash_method_with_types(&hash_key_types)?;
490-
491-
Ok(match join_type {
492-
JoinType::Inner => Box::new(InnerHashJoin::create(
493-
&builder.ctx,
494-
builder.func_ctx.clone(),
495-
method,
496-
desc,
497-
state,
498-
)?),
499-
JoinType::Left => Box::new(OuterLeftHashJoin::create(
500-
&builder.ctx,
501-
builder.func_ctx.clone(),
502-
method,
503-
desc,
504-
state,
505-
)?),
506-
_ => unreachable!(),
507-
})
491+
Ok(HashJoinFactory::create(
492+
ctx.ctx.clone(),
493+
ctx.func_ctx.clone(),
494+
DataBlock::choose_hash_method_with_types(&hash_key_types)?,
495+
desc,
496+
))
508497
}
509498
}
510499

โ€Žsrc/query/service/src/pipelines/processors/transforms/hash_join/mod.rsโ€Ž

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,5 +40,6 @@ pub use hash_join_state::*;
4040
pub use probe_state::ProbeState;
4141
pub use probe_state::ProcessState;
4242
pub use runtime_filter::*;
43+
pub use spill_common::get_hashes;
4344
pub use transform_hash_join_build::TransformHashJoinBuild;
4445
pub use transform_hash_join_probe::TransformHashJoinProbe;

โ€Žsrc/query/service/src/pipelines/processors/transforms/new_hash_join/common/squash_blocks.rsโ€Ž

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,11 @@ impl SquashBlocks {
9797
let mut current_rows = 0;
9898
let mut current_bytes = 0;
9999

100-
while let Some(mut block) = self.blocks.pop_front() {
100+
while current_rows < self.squash_rows && current_bytes < self.squash_bytes {
101+
let Some(mut block) = self.blocks.pop_front() else {
102+
return DataBlock::concat(&blocks);
103+
};
104+
101105
if block.block.is_empty() {
102106
continue;
103107
}
@@ -106,34 +110,29 @@ impl SquashBlocks {
106110
self.current_bytes -= block.block.memory_size();
107111

108112
let mut slice_rows = block.block.num_rows();
109-
110113
slice_rows = std::cmp::min(slice_rows, self.squash_rows - current_rows);
111114

112115
let max_bytes_rows = match block.avg_bytes {
113116
0 => block.block.num_rows(),
114-
_ => (self.squash_bytes - current_bytes) / block.avg_bytes,
117+
_ => self.squash_bytes.saturating_sub(current_bytes) / block.avg_bytes,
115118
};
116119

117120
slice_rows = std::cmp::min(max_bytes_rows, slice_rows);
118121

119122
if slice_rows != block.block.num_rows() {
120-
let compact_block = block.block.slice(0..slice_rows);
121-
let remain_block = block.block.slice(slice_rows..block.block.num_rows());
122-
123+
let compact_block = block.block.slice(0..slice_rows).maybe_gc();
123124
blocks.push(compact_block);
124125

125-
if !remain_block.is_empty() {
126-
let mut columns = Vec::with_capacity(block.block.num_columns());
126+
let remain_block = block
127+
.block
128+
.slice(slice_rows..block.block.num_rows())
129+
.maybe_gc();
127130

128-
for block_entry in remain_block.take_columns() {
129-
let column = block_entry.to_column();
130-
drop(block_entry);
131-
columns.push(column.maybe_gc());
132-
}
131+
self.current_rows += remain_block.num_rows();
132+
self.current_bytes += remain_block.memory_size();
133133

134-
block.block = DataBlock::new_from_columns(columns);
135-
self.current_rows += block.block.num_rows();
136-
self.current_bytes += block.block.memory_size();
134+
if !remain_block.is_empty() {
135+
block.block = remain_block;
137136
self.blocks.push_front(block);
138137
}
139138

0 commit comments

Comments
ย (0)