Skip to content

Commit a2e108a

Browse files
committed
proto wip
1 parent 7c9c585 commit a2e108a

File tree

6 files changed

+163
-8
lines changed

6 files changed

+163
-8
lines changed

datafusion/expr/src/logical_plan/tree_node.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -882,12 +882,12 @@ impl LogicalPlan {
882882
/// to perform mutable operations via [`Self::update_plan_from_children`].
883883
/// After mutating the `LogicalPlanContext.children`, or after creating the `LogicalPlanContext`,
884884
/// call `update_plan_from_children` to sync.
885-
///
885+
///
886886
/// See also:
887887
/// - [`datafusion_common::tree_node::TreeNode`] trait for tree traversal and mutation utilities.
888888
/// - [`datafusion_common::tree_node::ConcreteTreeNode`] trait for integrating with the tree node utilities.
889889
/// - [`datafusion::physical_plan::tree_node::PlanContext`] for a similar context for physical plans.
890-
///
890+
///
891891
/// [`datafusion::physical_plan::tree_node::PlanContext`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/tree_node/struct.PlanContext.html
892892
#[derive(Debug, Clone)]
893893
pub struct LogicalPlanContext<T: Sized> {

datafusion/optimizer/src/push_down_filter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ use datafusion_expr::{
4141
use crate::optimizer::ApplyOrder;
4242
use crate::simplify_expressions::simplify_predicates;
4343
use crate::utils::{
44-
build_schema_remapping, has_all_column_refs, is_restrict_null_predicate
44+
build_schema_remapping, has_all_column_refs, is_restrict_null_predicate,
4545
};
4646
use crate::{OptimizerConfig, OptimizerRule};
4747

datafusion/proto/proto/datafusion.proto

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@ message SortExprNodeCollection {
8282
repeated SortExprNode sort_expr_nodes = 1;
8383
}
8484

85+
message ScanOrderingNode {
86+
SortExprNodeCollection preferred_ordering = 1;
87+
}
88+
8589
message ListingTableScanNode {
8690
reserved 1; // was string table_name
8791
TableReference table_name = 14;
@@ -101,6 +105,7 @@ message ListingTableScanNode {
101105
datafusion_common.ArrowFormat arrow = 16;
102106
}
103107
repeated SortExprNodeCollection file_sort_order = 13;
108+
ScanOrderingNode ordering = 17;
104109
}
105110

106111
message ViewTableScanNode {

datafusion/proto/src/generated/pbjson.rs

Lines changed: 109 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/generated/prost.rs

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/logical_plan/mod.rs

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,8 @@ use datafusion::{
5757
};
5858
use datafusion_common::file_options::file_type::FileType;
5959
use datafusion_common::{
60-
context, internal_datafusion_err, internal_err, not_impl_err, plan_err, Result,
61-
TableReference, ToDFSchema,
60+
context, internal_datafusion_err, internal_err, not_impl_err, plan_err,
61+
DataFusionError, Result, TableReference, ToDFSchema,
6262
};
6363
use datafusion_expr::{
6464
dml,
@@ -495,13 +495,25 @@ impl AsLogicalPlan for LogicalPlanNode {
495495
projection = Some(column_indices);
496496
}
497497

498-
LogicalPlanBuilder::scan_with_filters(
498+
let ordering = scan.ordering.as_ref().map(|o| {
499+
o.preferred_ordering
500+
.as_ref()
501+
.map(|so| from_proto::parse_sorts(&so.sort_expr_nodes, ctx, extension_codec))
502+
.transpose()
503+
}).transpose()?;
504+
505+
let mut scan = TableScan::try_new(
499506
table_name,
500507
provider_as_source(Arc::new(provider)),
501508
projection,
502509
filters,
503-
)?
504-
.build()
510+
None,
511+
)?;
512+
if let Some(ordering) = ordering {
513+
scan = scan.with_ordering(ordering);
514+
}
515+
516+
Ok(LogicalPlan::TableScan(scan))
505517
}
506518
LogicalPlanType::CustomScan(scan) => {
507519
let schema: Schema = convert_required!(scan.schema)?;
@@ -1003,6 +1015,7 @@ impl AsLogicalPlan for LogicalPlanNode {
10031015
source,
10041016
filters,
10051017
projection,
1018+
ordering,
10061019
..
10071020
}) => {
10081021
let provider = source_as_provider(source)?;
@@ -1118,6 +1131,26 @@ impl AsLogicalPlan for LogicalPlanNode {
11181131
})
11191132
.collect::<Result<Vec<_>>>()?;
11201133

1134+
let ordering = ordering
1135+
.as_ref()
1136+
.map(|ordering| {
1137+
Ok::<_, DataFusionError>(protobuf::ScanOrderingNode {
1138+
preferred_ordering: ordering
1139+
.preferred_ordering
1140+
.as_ref()
1141+
.map(|order| {
1142+
Ok::<_, DataFusionError>(SortExprNodeCollection {
1143+
sort_expr_nodes: serialize_sorts(
1144+
order,
1145+
extension_codec,
1146+
)?,
1147+
})
1148+
})
1149+
.transpose()?,
1150+
})
1151+
})
1152+
.transpose()?;
1153+
11211154
Ok(LogicalPlanNode {
11221155
logical_plan_type: Some(LogicalPlanType::ListingScan(
11231156
protobuf::ListingTableScanNode {
@@ -1136,6 +1169,7 @@ impl AsLogicalPlan for LogicalPlanNode {
11361169
filters,
11371170
target_partitions: options.target_partitions as u32,
11381171
file_sort_order: exprs_vec,
1172+
ordering,
11391173
},
11401174
)),
11411175
})

0 commit comments

Comments
 (0)