Skip to content

Commit 10fbc0a

Browse files
committed
Dedplicate PhysicalExpr on proto ser/de using Arc pointer addresses
1 parent 7c215ed commit 10fbc0a

File tree

12 files changed

+484
-267
lines changed

12 files changed

+484
-267
lines changed

datafusion-examples/examples/composed_extension_codec.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use datafusion::common::Result;
3939
use datafusion::execution::TaskContext;
4040
use datafusion::physical_plan::{DisplayAs, ExecutionPlan};
4141
use datafusion::prelude::SessionContext;
42+
use datafusion_proto::physical_plan::DecodeContext;
4243
use datafusion_proto::physical_plan::{
4344
AsExecutionPlan, ComposedPhysicalExtensionCodec, PhysicalExtensionCodec,
4445
};
@@ -71,7 +72,7 @@ async fn main() {
7172

7273
// deserialize proto back to execution plan
7374
let result_exec_plan: Arc<dyn ExecutionPlan> = proto
74-
.try_into_physical_plan(&ctx.task_ctx(), &composed_codec)
75+
.try_into_physical_plan(&DecodeContext::new(&ctx.task_ctx()), &composed_codec)
7576
.expect("from proto");
7677

7778
// assert that the original and deserialized execution plans are equal
@@ -137,7 +138,7 @@ impl PhysicalExtensionCodec for ParentPhysicalExtensionCodec {
137138
&self,
138139
buf: &[u8],
139140
inputs: &[Arc<dyn ExecutionPlan>],
140-
_ctx: &TaskContext,
141+
_ctx: &DecodeContext,
141142
) -> Result<Arc<dyn ExecutionPlan>> {
142143
if buf == "ParentExec".as_bytes() {
143144
Ok(Arc::new(ParentExec {
@@ -213,7 +214,7 @@ impl PhysicalExtensionCodec for ChildPhysicalExtensionCodec {
213214
&self,
214215
buf: &[u8],
215216
_inputs: &[Arc<dyn ExecutionPlan>],
216-
_ctx: &TaskContext,
217+
_ctx: &DecodeContext,
217218
) -> Result<Arc<dyn ExecutionPlan>> {
218219
if buf == "ChildExec".as_bytes() {
219220
Ok(Arc::new(ChildExec {}))

datafusion/ffi/src/plan_properties.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use datafusion_proto::{
3838
physical_plan::{
3939
from_proto::{parse_physical_sort_exprs, parse_protobuf_partitioning},
4040
to_proto::{serialize_partitioning, serialize_physical_sort_exprs},
41-
DefaultPhysicalExtensionCodec,
41+
DecodeContext, DefaultPhysicalExtensionCodec,
4242
},
4343
protobuf::{Partitioning, PhysicalSortExprNodeCollection},
4444
};
@@ -183,6 +183,7 @@ impl TryFrom<FFI_PlanProperties> for PlanProperties {
183183
let default_ctx = SessionContext::new();
184184
let task_context = default_ctx.task_ctx();
185185
let codex = DefaultPhysicalExtensionCodec {};
186+
let decode_context = DecodeContext::new(&task_context);
186187

187188
let ffi_orderings = unsafe { (ffi_props.output_ordering)(&ffi_props) };
188189

@@ -191,7 +192,7 @@ impl TryFrom<FFI_PlanProperties> for PlanProperties {
191192
.map_err(|e| DataFusionError::External(Box::new(e)))?;
192193
let sort_exprs = parse_physical_sort_exprs(
193194
&proto_output_ordering.physical_sort_expr_nodes,
194-
&task_context,
195+
&decode_context,
195196
&schema,
196197
&codex,
197198
)?;
@@ -203,7 +204,7 @@ impl TryFrom<FFI_PlanProperties> for PlanProperties {
203204
.map_err(|e| DataFusionError::External(Box::new(e)))?;
204205
let partitioning = parse_protobuf_partitioning(
205206
Some(&proto_output_partitioning),
206-
&task_context,
207+
&decode_context,
207208
&schema,
208209
&codex,
209210
)?

datafusion/ffi/src/udaf/accumulator_args.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use datafusion_proto::{
3535
physical_plan::{
3636
from_proto::{parse_physical_exprs, parse_physical_sort_exprs},
3737
to_proto::{serialize_physical_exprs, serialize_physical_sort_exprs},
38-
DefaultPhysicalExtensionCodec,
38+
DecodeContext, DefaultPhysicalExtensionCodec,
3939
},
4040
protobuf::PhysicalAggregateExprNode,
4141
};
@@ -121,16 +121,17 @@ impl TryFrom<FFI_AccumulatorArgs> for ForeignAccumulatorArgs {
121121

122122
let default_ctx = SessionContext::new();
123123
let task_ctx = default_ctx.task_ctx();
124+
let decode_ctx = DecodeContext::new(&task_ctx);
124125
let codex = DefaultPhysicalExtensionCodec {};
125126

126127
let order_bys = parse_physical_sort_exprs(
127128
&proto_def.ordering_req,
128-
&task_ctx,
129+
&decode_ctx,
129130
&schema,
130131
&codex,
131132
)?;
132133

133-
let exprs = parse_physical_exprs(&proto_def.expr, &task_ctx, &schema, &codex)?;
134+
let exprs = parse_physical_exprs(&proto_def.expr, &decode_ctx, &schema, &codex)?;
134135

135136
Ok(Self {
136137
return_field,

datafusion/ffi/src/udwf/partition_evaluator_args.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use datafusion_common::exec_datafusion_err;
3535
use datafusion_proto::{
3636
physical_plan::{
3737
from_proto::parse_physical_expr, to_proto::serialize_physical_exprs,
38-
DefaultPhysicalExtensionCodec,
38+
DecodeContext, DefaultPhysicalExtensionCodec,
3939
},
4040
protobuf::PhysicalExprNode,
4141
};
@@ -137,6 +137,8 @@ impl TryFrom<FFI_PartitionEvaluatorArgs> for ForeignPartitionEvaluatorArgs {
137137

138138
fn try_from(value: FFI_PartitionEvaluatorArgs) -> Result<Self> {
139139
let default_ctx = SessionContext::new();
140+
let task_ctx = default_ctx.task_ctx();
141+
let decode_ctx = DecodeContext::new(&task_ctx);
140142
let codec = DefaultPhysicalExtensionCodec {};
141143

142144
let schema: SchemaRef = value.schema.into();
@@ -148,9 +150,7 @@ impl TryFrom<FFI_PartitionEvaluatorArgs> for ForeignPartitionEvaluatorArgs {
148150
.collect::<std::result::Result<Vec<_>, prost::DecodeError>>()
149151
.map_err(|e| exec_datafusion_err!("Failed to decode PhysicalExprNode: {e}"))?
150152
.iter()
151-
.map(|expr_node| {
152-
parse_physical_expr(expr_node, &default_ctx.task_ctx(), &schema, &codec)
153-
})
153+
.map(|expr_node| parse_physical_expr(expr_node, &decode_ctx, &schema, &codec))
154154
.collect::<Result<Vec<_>>>()?;
155155

156156
let input_fields = input_exprs

datafusion/proto/proto/datafusion.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -865,6 +865,10 @@ message PhysicalExprNode {
865865

866866
UnknownColumn unknown_column = 20;
867867
}
868+
869+
// Optional ID for caching during deserialization.
870+
// Set to the Arc pointer address during serialization to enable deduplication.
871+
optional uint64 id = 21;
868872
}
869873

870874
message PhysicalScalarUdfNode {

datafusion/proto/src/bytes/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use crate::logical_plan::{
2121
self, AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
2222
};
2323
use crate::physical_plan::{
24-
AsExecutionPlan, DefaultPhysicalExtensionCodec, PhysicalExtensionCodec,
24+
AsExecutionPlan, DecodeContext, DefaultPhysicalExtensionCodec, PhysicalExtensionCodec,
2525
};
2626
use crate::protobuf;
2727
use datafusion_common::{plan_datafusion_err, Result};
@@ -333,5 +333,6 @@ pub fn physical_plan_from_bytes_with_extension_codec(
333333
) -> Result<Arc<dyn ExecutionPlan>> {
334334
let protobuf = protobuf::PhysicalPlanNode::decode(bytes)
335335
.map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
336-
protobuf.try_into_physical_plan(ctx, extension_codec)
336+
let decode_ctx = DecodeContext::new(ctx);
337+
protobuf.try_into_physical_plan(&decode_ctx, extension_codec)
337338
}

datafusion/proto/src/generated/pbjson.rs

Lines changed: 21 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/generated/prost.rs

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)