Skip to content

Commit d11e97f

Browse files
authored
refactor(query): rename exchange hash to node to node hash (#18948)
1 parent f969257 commit d11e97f

File tree

22 files changed

+66
-49
lines changed

22 files changed

+66
-49
lines changed

src/query/service/src/physical_plans/physical_exchange.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ impl PhysicalPlanBuilder {
100100
mut required: ColumnSet,
101101
) -> Result<PhysicalPlan> {
102102
// 1. Prune unused Columns.
103-
if let databend_common_sql::plans::Exchange::Hash(exprs) = exchange {
103+
if let databend_common_sql::plans::Exchange::NodeToNodeHash(exprs) = exchange {
104104
for expr in exprs {
105105
required.extend(expr.used_columns());
106106
}
@@ -112,7 +112,7 @@ impl PhysicalPlanBuilder {
112112
let mut keys = vec![];
113113
let mut allow_adjust_parallelism = true;
114114
let kind = match exchange {
115-
databend_common_sql::plans::Exchange::Hash(scalars) => {
115+
databend_common_sql::plans::Exchange::NodeToNodeHash(scalars) => {
116116
for scalar in scalars {
117117
let expr = scalar
118118
.type_check(input_schema.as_ref())?

src/query/service/src/physical_plans/physical_hash_join.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1198,7 +1198,7 @@ impl PhysicalPlanBuilder {
11981198
let build_side_data_distribution = s_expr.build_side_child().get_data_distribution()?;
11991199
let broadcast_id = if build_side_data_distribution
12001200
.as_ref()
1201-
.is_some_and(|e| matches!(e, databend_common_sql::plans::Exchange::Hash(_)))
1201+
.is_some_and(|e| matches!(e, databend_common_sql::plans::Exchange::NodeToNodeHash(_)))
12021202
{
12031203
Some(self.ctx.get_next_broadcast_id())
12041204
} else {

src/query/service/src/physical_plans/physical_join_filter.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,12 +191,12 @@ impl JoinRuntimeFilter {
191191

192192
let enable_min_max_runtime_filter = build_side_data_distribution
193193
.as_ref()
194-
.is_none_or(|e| matches!(e, Exchange::Broadcast | Exchange::Hash(_)))
194+
.is_none_or(|e| matches!(e, Exchange::Broadcast | Exchange::NodeToNodeHash(_)))
195195
&& Self::is_type_supported_for_min_max_filter(&data_type);
196196

197197
let enable_inlist_runtime_filter = build_side_data_distribution
198198
.as_ref()
199-
.is_none_or(|e| matches!(e, Exchange::Broadcast | Exchange::Hash(_)));
199+
.is_none_or(|e| matches!(e, Exchange::Broadcast | Exchange::NodeToNodeHash(_)));
200200

201201
// Create and add the runtime filter
202202
let runtime_filter = PhysicalRuntimeFilter {

src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ impl ExchangeInjector for AggregateInjector {
239239
match exchange {
240240
DataExchange::Merge(_) => unreachable!(),
241241
DataExchange::Broadcast(_) => unreachable!(),
242-
DataExchange::ShuffleDataExchange(exchange) => {
242+
DataExchange::NodeToNodeExchange(exchange) => {
243243
Ok(Arc::new(Box::new(HashTableHashScatter {
244244
buckets: exchange.destination_ids.len(),
245245
})))

src/query/service/src/pipelines/processors/transforms/sort/sort_exchange_injector.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ impl ExchangeInjector for SortInjector {
4040
) -> Result<Arc<Box<dyn FlightScatter>>> {
4141
match exchange {
4242
DataExchange::Merge(_) | DataExchange::Broadcast(_) => unreachable!(),
43-
DataExchange::ShuffleDataExchange(exchange) => {
43+
DataExchange::NodeToNodeExchange(exchange) => {
4444
Ok(Arc::new(Box::new(SortBoundScatter {
4545
partitions: exchange.destination_ids.len(),
4646
})))

src/query/service/src/schedulers/fragments/fragmenter.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ use crate::schedulers::PlanFragment;
4848
use crate::servers::flight::v1::exchange::BroadcastExchange;
4949
use crate::servers::flight::v1::exchange::DataExchange;
5050
use crate::servers::flight::v1::exchange::MergeExchange;
51-
use crate::servers::flight::v1::exchange::ShuffleDataExchange;
51+
use crate::servers::flight::v1::exchange::NodeToNodeExchange;
5252
use crate::sessions::QueryContext;
5353

5454
/// Visitor to split a `PhysicalPlan` into fragments.
@@ -217,7 +217,7 @@ impl FragmentDeriveHandle {
217217

218218
Ok(match exchange_sink.kind {
219219
FragmentKind::Init => None,
220-
FragmentKind::Normal => Some(DataExchange::ShuffleDataExchange(ShuffleDataExchange {
220+
FragmentKind::Normal => Some(DataExchange::NodeToNodeExchange(NodeToNodeExchange {
221221
destination_ids: get_executors(cluster),
222222
shuffle_keys: exchange_sink.keys.clone(),
223223
allow_adjust_parallelism: exchange_sink.allow_adjust_parallelism,

src/query/service/src/schedulers/fragments/query_fragment_actions_display.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ impl Display for QueryFragmentActionsWrap<'_> {
7171
match data_exchange {
7272
DataExchange::Merge(_) => writeln!(f, " DataExchange: Merge")?,
7373
DataExchange::Broadcast(_) => writeln!(f, " DataExchange: Broadcast")?,
74-
DataExchange::ShuffleDataExchange(_) => writeln!(f, " DataExchange: Shuffle")?,
74+
DataExchange::NodeToNodeExchange(_) => writeln!(f, " DataExchange: Shuffle")?,
7575
}
7676
}
7777

src/query/service/src/servers/flight/v1/exchange/data_exchange.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,21 @@ use databend_common_expression::RemoteExpr;
1818
pub enum DataExchange {
1919
Merge(MergeExchange),
2020
Broadcast(BroadcastExchange),
21-
ShuffleDataExchange(ShuffleDataExchange),
21+
NodeToNodeExchange(NodeToNodeExchange),
2222
}
2323

2424
impl DataExchange {
2525
pub fn get_destinations(&self) -> Vec<String> {
2626
match self {
2727
DataExchange::Merge(exchange) => vec![exchange.destination_id.clone()],
2828
DataExchange::Broadcast(exchange) => exchange.destination_ids.clone(),
29-
DataExchange::ShuffleDataExchange(exchange) => exchange.destination_ids.clone(),
29+
DataExchange::NodeToNodeExchange(exchange) => exchange.destination_ids.clone(),
3030
}
3131
}
3232
}
3333

3434
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
35-
pub struct ShuffleDataExchange {
35+
pub struct NodeToNodeExchange {
3636
pub destination_ids: Vec<String>,
3737
pub shuffle_keys: Vec<RemoteExpr>,
3838
pub allow_adjust_parallelism: bool,

src/query/service/src/servers/flight/v1/exchange/exchange_injector.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ impl ExchangeInjector for DefaultExchangeInjector {
8686
DataExchange::Broadcast(exchange) => Box::new(BroadcastFlightScatter::try_create(
8787
exchange.destination_ids.len(),
8888
)?),
89-
DataExchange::ShuffleDataExchange(exchange) => {
89+
DataExchange::NodeToNodeExchange(exchange) => {
9090
let local_id = &ctx.get_cluster().local_id;
9191
let local_pos = exchange
9292
.destination_ids

src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1050,7 +1050,7 @@ impl FragmentCoordinator {
10501050
allow_adjust_parallelism: true,
10511051
},
10521052
))),
1053-
DataExchange::ShuffleDataExchange(exchange) => Ok(Some(
1053+
DataExchange::NodeToNodeExchange(exchange) => Ok(Some(
10541054
ExchangeParams::ShuffleExchange(ShuffleExchangeParams {
10551055
exchange_injector: exchange_injector.clone(),
10561056
schema: self.physical_plan.output_schema()?,

0 commit comments

Comments
 (0)