diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 5020c21e2a81..cd9425ae33dc 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -69,18 +69,27 @@ impl From<&protobuf::PhysicalColumn> for Column { /// # Arguments /// /// * `proto` - Input proto with physical sort expression node -/// * `registry` - A registry knows how to build logical expressions out of user-defined function names +/// * `ctx` - Either a `&TaskContext` or `DecodeContext` for decoding. When passing `&TaskContext`, +/// a new `DecodeContext` is created without caching. For optimal performance with expression +/// deduplication, create and pass a `DecodeContext` directly. /// * `input_schema` - The Arrow schema for the input, used for determining expression data types /// when performing type coercion. /// * `codec` - An extension codec used to decode custom UDFs. -pub fn parse_physical_sort_expr( +/// +/// # Backwards Compatibility +/// +/// This function accepts either `&TaskContext` or `DecodeContext` for backwards compatibility. +/// In future versions, passing `DecodeContext` directly will be required. +pub fn parse_physical_sort_expr<'a>( proto: &protobuf::PhysicalSortExprNode, - decode_ctx: &DecodeContext, + ctx: impl Into>, input_schema: &Schema, codec: &dyn PhysicalExtensionCodec, ) -> Result { + let decode_ctx = ctx.into(); if let Some(expr) = &proto.expr { - let expr = parse_physical_expr(expr.as_ref(), decode_ctx, input_schema, codec)?; + let expr = + parse_physical_expr(expr.as_ref(), decode_ctx.clone(), input_schema, codec)?; let options = SortOptions { descending: !proto.asc, nulls_first: proto.nulls_first, @@ -96,20 +105,28 @@ pub fn parse_physical_sort_expr( /// # Arguments /// /// * `proto` - Input proto with vector of physical sort expression node -/// * `registry` - A registry knows how to build logical expressions out of user-defined function names +/// * `ctx` - Either a `&TaskContext` or `DecodeContext` for decoding. When passing `&TaskContext`, +/// a new `DecodeContext` is created without caching. For optimal performance with expression +/// deduplication, create and pass a `DecodeContext` directly. /// * `input_schema` - The Arrow schema for the input, used for determining expression data types /// when performing type coercion. /// * `codec` - An extension codec used to decode custom UDFs. -pub fn parse_physical_sort_exprs( +/// +/// # Backwards Compatibility +/// +/// This function accepts either `&TaskContext` or `DecodeContext` for backwards compatibility. +/// In future versions, passing `DecodeContext` directly will be required. +pub fn parse_physical_sort_exprs<'a>( proto: &[protobuf::PhysicalSortExprNode], - decode_ctx: &DecodeContext, + ctx: impl Into>, input_schema: &Schema, codec: &dyn PhysicalExtensionCodec, ) -> Result> { + let decode_ctx = ctx.into(); proto .iter() .map(|sort_expr| { - parse_physical_sort_expr(sort_expr, decode_ctx, input_schema, codec) + parse_physical_sort_expr(sort_expr, decode_ctx.clone(), input_schema, codec) }) .collect() } @@ -119,25 +136,40 @@ pub fn parse_physical_sort_exprs( /// # Arguments /// /// * `proto` - Input proto with physical window expression node. -/// * `name` - Name of the window expression. -/// * `registry` - A registry knows how to build logical expressions out of user-defined function names +/// * `ctx` - Either a `&TaskContext` or `DecodeContext` for decoding. When passing `&TaskContext`, +/// a new `DecodeContext` is created without caching. For optimal performance with expression +/// deduplication, create and pass a `DecodeContext` directly. /// * `input_schema` - The Arrow schema for the input, used for determining expression data types /// when performing type coercion. /// * `codec` - An extension codec used to decode custom UDFs. -pub fn parse_physical_window_expr( +/// +/// # Backwards Compatibility +/// +/// This function accepts either `&TaskContext` or `DecodeContext` for backwards compatibility. +/// In future versions, passing `DecodeContext` directly will be required. +pub fn parse_physical_window_expr<'a>( proto: &protobuf::PhysicalWindowExprNode, - decode_ctx: &DecodeContext, + ctx: impl Into>, input_schema: &Schema, codec: &dyn PhysicalExtensionCodec, ) -> Result> { - let ctx = decode_ctx.task_context(); + let decode_ctx: DecodeContext<'a> = ctx.into(); + let task_ctx = decode_ctx.task_context(); let window_node_expr = - parse_physical_exprs(&proto.args, decode_ctx, input_schema, codec)?; - let partition_by = - parse_physical_exprs(&proto.partition_by, decode_ctx, input_schema, codec)?; - - let order_by = - parse_physical_sort_exprs(&proto.order_by, decode_ctx, input_schema, codec)?; + parse_physical_exprs(&proto.args, decode_ctx.clone(), input_schema, codec)?; + let partition_by = parse_physical_exprs( + &proto.partition_by, + decode_ctx.clone(), + input_schema, + codec, + )?; + + let order_by = parse_physical_sort_exprs( + &proto.order_by, + decode_ctx.clone(), + input_schema, + codec, + )?; let window_frame = proto .window_frame @@ -154,13 +186,13 @@ pub fn parse_physical_window_expr( protobuf::physical_window_expr_node::WindowFunction::UserDefinedAggrFunction(udaf_name) => { WindowFunctionDefinition::AggregateUDF(match &proto.fun_definition { Some(buf) => codec.try_decode_udaf(udaf_name, buf)?, - None => ctx.udaf(udaf_name).or_else(|_| codec.try_decode_udaf(udaf_name, &[]))?, + None => task_ctx.udaf(udaf_name).or_else(|_| codec.try_decode_udaf(udaf_name, &[]))?, }) } protobuf::physical_window_expr_node::WindowFunction::UserDefinedWindowFunction(udwf_name) => { WindowFunctionDefinition::WindowUDF(match &proto.fun_definition { Some(buf) => codec.try_decode_udwf(udwf_name, buf)?, - None => ctx.udwf(udwf_name).or_else(|_| codec.try_decode_udwf(udwf_name, &[]))? + None => task_ctx.udwf(udwf_name).or_else(|_| codec.try_decode_udwf(udwf_name, &[]))? }) } } @@ -186,18 +218,34 @@ pub fn parse_physical_window_expr( ) } -pub fn parse_physical_exprs<'a, I>( +/// Parses multiple physical expressions from protobufs. +/// +/// # Arguments +/// +/// * `protos` - Iterator of proto physical expression nodes +/// * `ctx` - Either a `&TaskContext` or `DecodeContext` for decoding. When passing `&TaskContext`, +/// a new `DecodeContext` is created without caching. For optimal performance with expression +/// deduplication, create and pass a `DecodeContext` directly. +/// * `input_schema` - The Arrow schema for the input +/// * `codec` - An extension codec used to decode custom UDFs. +/// +/// # Backwards Compatibility +/// +/// This function accepts either `&TaskContext` or `DecodeContext` for backwards compatibility. +/// In future versions, passing `DecodeContext` directly will be required. +pub fn parse_physical_exprs<'a, 'b, I>( protos: I, - decode_ctx: &DecodeContext, + ctx: impl Into>, input_schema: &Schema, codec: &dyn PhysicalExtensionCodec, ) -> Result>> where I: IntoIterator, { + let decode_ctx = ctx.into(); protos .into_iter() - .map(|p| parse_physical_expr(p, decode_ctx, input_schema, codec)) + .map(|p| parse_physical_expr(p, decode_ctx.clone(), input_schema, codec)) .collect::>>() } @@ -206,16 +254,25 @@ where /// # Arguments /// /// * `proto` - Input proto with physical expression node -/// * `registry` - A registry knows how to build logical expressions out of user-defined function names +/// * `ctx` - Either a `&TaskContext` or `DecodeContext` for decoding. When passing `&TaskContext`, +/// a new `DecodeContext` is created without caching. For optimal performance with expression +/// deduplication, create and pass a `DecodeContext` directly. /// * `input_schema` - The Arrow schema for the input, used for determining expression data types /// when performing type coercion. /// * `codec` - An extension codec used to decode custom UDFs. -pub fn parse_physical_expr( +/// +/// # Backwards Compatibility +/// +/// This function accepts either `&TaskContext` or `DecodeContext` for backwards compatibility. +/// In future versions, passing `DecodeContext` directly will be required. +pub fn parse_physical_expr<'a>( proto: &protobuf::PhysicalExprNode, - decode_ctx: &DecodeContext, + ctx: impl Into>, input_schema: &Schema, codec: &dyn PhysicalExtensionCodec, ) -> Result> { + let decode_ctx = ctx.into(); + // Check cache first if an ID is present if let Some(id) = proto.id { if let Some(cached) = decode_ctx.get_cached_expr(id) { @@ -240,7 +297,7 @@ pub fn parse_physical_expr( ExprType::BinaryExpr(binary_expr) => Arc::new(BinaryExpr::new( parse_required_physical_expr( binary_expr.l.as_deref(), - decode_ctx, + &decode_ctx, "left", input_schema, codec, @@ -248,7 +305,7 @@ pub fn parse_physical_expr( logical_plan::from_proto::from_proto_binary_op(&binary_expr.op)?, parse_required_physical_expr( binary_expr.r.as_deref(), - decode_ctx, + &decode_ctx, "right", input_schema, codec, @@ -270,7 +327,7 @@ pub fn parse_physical_expr( ExprType::IsNullExpr(e) => { Arc::new(IsNullExpr::new(parse_required_physical_expr( e.expr.as_deref(), - decode_ctx, + &decode_ctx, "expr", input_schema, codec, @@ -279,7 +336,7 @@ pub fn parse_physical_expr( ExprType::IsNotNullExpr(e) => { Arc::new(IsNotNullExpr::new(parse_required_physical_expr( e.expr.as_deref(), - decode_ctx, + &decode_ctx, "expr", input_schema, codec, @@ -287,7 +344,7 @@ pub fn parse_physical_expr( } ExprType::NotExpr(e) => Arc::new(NotExpr::new(parse_required_physical_expr( e.expr.as_deref(), - decode_ctx, + &decode_ctx, "expr", input_schema, codec, @@ -295,7 +352,7 @@ pub fn parse_physical_expr( ExprType::Negative(e) => { Arc::new(NegativeExpr::new(parse_required_physical_expr( e.expr.as_deref(), - decode_ctx, + &decode_ctx, "expr", input_schema, codec, @@ -304,19 +361,26 @@ pub fn parse_physical_expr( ExprType::InList(e) => in_list( parse_required_physical_expr( e.expr.as_deref(), - decode_ctx, + &decode_ctx, "expr", input_schema, codec, )?, - parse_physical_exprs(&e.list, decode_ctx, input_schema, codec)?, + parse_physical_exprs(&e.list, decode_ctx.clone(), input_schema, codec)?, &e.negated, input_schema, )?, ExprType::Case(e) => Arc::new(CaseExpr::try_new( e.expr .as_ref() - .map(|e| parse_physical_expr(e.as_ref(), decode_ctx, input_schema, codec)) + .map(|e| { + parse_physical_expr( + e.as_ref(), + decode_ctx.clone(), + input_schema, + codec, + ) + }) .transpose()?, e.when_then_expr .iter() @@ -324,14 +388,14 @@ pub fn parse_physical_expr( Ok(( parse_required_physical_expr( e.when_expr.as_ref(), - decode_ctx, + &decode_ctx, "when_expr", input_schema, codec, )?, parse_required_physical_expr( e.then_expr.as_ref(), - decode_ctx, + &decode_ctx, "then_expr", input_schema, codec, @@ -341,13 +405,20 @@ pub fn parse_physical_expr( .collect::>>()?, e.else_expr .as_ref() - .map(|e| parse_physical_expr(e.as_ref(), decode_ctx, input_schema, codec)) + .map(|e| { + parse_physical_expr( + e.as_ref(), + decode_ctx.clone(), + input_schema, + codec, + ) + }) .transpose()?, )?), ExprType::Cast(e) => Arc::new(CastExpr::new( parse_required_physical_expr( e.expr.as_deref(), - decode_ctx, + &decode_ctx, "expr", input_schema, codec, @@ -358,7 +429,7 @@ pub fn parse_physical_expr( ExprType::TryCast(e) => Arc::new(TryCastExpr::new( parse_required_physical_expr( e.expr.as_deref(), - decode_ctx, + &decode_ctx, "expr", input_schema, codec, @@ -374,7 +445,8 @@ pub fn parse_physical_expr( }; let scalar_fun_def = Arc::clone(&udf); - let args = parse_physical_exprs(&e.args, decode_ctx, input_schema, codec)?; + let args = + parse_physical_exprs(&e.args, decode_ctx.clone(), input_schema, codec)?; let config_options = Arc::clone(ctx.session_config().options()); @@ -399,14 +471,14 @@ pub fn parse_physical_expr( like_expr.case_insensitive, parse_required_physical_expr( like_expr.expr.as_deref(), - decode_ctx, + &decode_ctx, "expr", input_schema, codec, )?, parse_required_physical_expr( like_expr.pattern.as_deref(), - decode_ctx, + &decode_ctx, "pattern", input_schema, codec, @@ -416,7 +488,7 @@ pub fn parse_physical_expr( let inputs: Vec> = extension .inputs .iter() - .map(|e| parse_physical_expr(e, decode_ctx, input_schema, codec)) + .map(|e| parse_physical_expr(e, decode_ctx.clone(), input_schema, codec)) .collect::>()?; (codec.try_decode_expr(extension.expr.as_slice(), &inputs)?) as _ } @@ -437,22 +509,29 @@ fn parse_required_physical_expr( input_schema: &Schema, codec: &dyn PhysicalExtensionCodec, ) -> Result> { - expr.map(|e| parse_physical_expr(e, decode_ctx, input_schema, codec)) + expr.map(|e| parse_physical_expr(e, decode_ctx.clone(), input_schema, codec)) .transpose()? .ok_or_else(|| internal_datafusion_err!("Missing required field {field:?}")) } -pub fn parse_protobuf_hash_partitioning( +/// Parses protobuf hash partitioning. +/// +/// # Backwards Compatibility +/// +/// This function accepts either `&TaskContext` or `DecodeContext` for backwards compatibility. +/// In future versions, passing `DecodeContext` directly will be required. +pub fn parse_protobuf_hash_partitioning<'a>( partitioning: Option<&protobuf::PhysicalHashRepartition>, - decode_ctx: &DecodeContext, + ctx: impl Into>, input_schema: &Schema, codec: &dyn PhysicalExtensionCodec, ) -> Result> { + let decode_ctx = ctx.into(); match partitioning { Some(hash_part) => { let expr = parse_physical_exprs( &hash_part.hash_expr, - decode_ctx, + decode_ctx.clone(), input_schema, codec, )?; @@ -466,12 +545,19 @@ pub fn parse_protobuf_hash_partitioning( } } -pub fn parse_protobuf_partitioning( +/// Parses protobuf partitioning. +/// +/// # Backwards Compatibility +/// +/// This function accepts either `&TaskContext` or `DecodeContext` for backwards compatibility. +/// In future versions, passing `DecodeContext` directly will be required. +pub fn parse_protobuf_partitioning<'a>( partitioning: Option<&protobuf::Partitioning>, - decode_ctx: &DecodeContext, + ctx: impl Into>, input_schema: &Schema, codec: &dyn PhysicalExtensionCodec, ) -> Result> { + let decode_ctx = ctx.into(); match partitioning { Some(protobuf::Partitioning { partition_method }) => match partition_method { Some(protobuf::partitioning::PartitionMethod::RoundRobin( @@ -482,7 +568,7 @@ pub fn parse_protobuf_partitioning( Some(protobuf::partitioning::PartitionMethod::Hash(hash_repartition)) => { parse_protobuf_hash_partitioning( Some(hash_repartition), - decode_ctx, + decode_ctx.clone(), input_schema, codec, ) @@ -504,12 +590,19 @@ pub fn parse_protobuf_file_scan_schema( Ok(Arc::new(convert_required!(proto.schema)?)) } -pub fn parse_protobuf_file_scan_config( +/// Parses protobuf file scan config. +/// +/// # Backwards Compatibility +/// +/// This function accepts either `&TaskContext` or `DecodeContext` for backwards compatibility. +/// In future versions, passing `DecodeContext` directly will be required. +pub fn parse_protobuf_file_scan_config<'a>( proto: &protobuf::FileScanExecConf, - decode_ctx: &DecodeContext, + ctx: impl Into>, codec: &dyn PhysicalExtensionCodec, file_source: Arc, ) -> Result { + let decode_ctx = ctx.into(); let schema: Arc = parse_protobuf_file_scan_schema(proto)?; let projection = proto .projection @@ -557,7 +650,7 @@ pub fn parse_protobuf_file_scan_config( for node_collection in &proto.output_ordering { let sort_exprs = parse_physical_sort_exprs( &node_collection.physical_sort_expr_nodes, - decode_ctx, + decode_ctx.clone(), &schema, codec, )?; diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 403977335153..55ed37b37c1b 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -108,9 +108,10 @@ use prost::Message; /// physical expressions. The cache is keyed by the expression's ID (derived from the /// Arc pointer during serialization), allowing duplicate expressions in a plan to be /// deserialized only once. +#[derive(Clone)] pub struct DecodeContext<'a> { task_context: &'a TaskContext, - cache: Mutex>>, + cache: Arc>>>, } impl<'a> DecodeContext<'a> { @@ -118,7 +119,7 @@ impl<'a> DecodeContext<'a> { pub fn new(task_context: &'a TaskContext) -> Self { Self { task_context, - cache: Mutex::new(HashMap::new()), + cache: Arc::new(Mutex::new(HashMap::new())), } } @@ -145,6 +146,12 @@ impl<'a> DecodeContext<'a> { } } +impl<'a> From<&'a TaskContext> for DecodeContext<'a> { + fn from(task_context: &'a TaskContext) -> Self { + Self::new(task_context) + } +} + pub mod from_proto; pub mod to_proto; @@ -579,7 +586,7 @@ impl protobuf::PhysicalPlanNode { Ok(( parse_physical_expr( expr, - decode_ctx, + decode_ctx.clone(), input.schema().as_ref(), extension_codec, )?, @@ -610,7 +617,7 @@ impl protobuf::PhysicalPlanNode { .map(|expr| { parse_physical_expr( expr, - decode_ctx, + decode_ctx.clone(), input.schema().as_ref(), extension_codec, ) @@ -684,7 +691,7 @@ impl protobuf::PhysicalPlanNode { let conf = FileScanConfigBuilder::from(parse_protobuf_file_scan_config( scan.base_conf.as_ref().unwrap(), - decode_ctx, + decode_ctx.clone(), extension_codec, source, )?) @@ -703,7 +710,7 @@ impl protobuf::PhysicalPlanNode { ) -> Result> { let scan_conf = parse_protobuf_file_scan_config( scan.base_conf.as_ref().unwrap(), - decode_ctx, + decode_ctx.clone(), extension_codec, Arc::new(JsonSource::new()), )?; @@ -744,7 +751,7 @@ impl protobuf::PhysicalPlanNode { .map(|expr| { parse_physical_expr( expr, - decode_ctx, + decode_ctx.clone(), predicate_schema.as_ref(), extension_codec, ) @@ -762,7 +769,7 @@ impl protobuf::PhysicalPlanNode { } let base_config = parse_protobuf_file_scan_config( base_conf, - decode_ctx, + decode_ctx.clone(), extension_codec, Arc::new(source), )?; @@ -784,7 +791,7 @@ impl protobuf::PhysicalPlanNode { { let conf = parse_protobuf_file_scan_config( scan.base_conf.as_ref().unwrap(), - decode_ctx, + decode_ctx.clone(), extension_codec, Arc::new(AvroSource::new()), )?; @@ -827,7 +834,7 @@ impl protobuf::PhysicalPlanNode { for ordering in &scan.sort_information { let sort_exprs = parse_physical_sort_exprs( &ordering.physical_sort_expr_nodes, - decode_ctx, + decode_ctx.clone(), &schema, extension_codec, )?; @@ -881,7 +888,7 @@ impl protobuf::PhysicalPlanNode { into_physical_plan(&repart.input, decode_ctx, extension_codec)?; let partitioning = parse_protobuf_partitioning( repart.partitioning.as_ref(), - decode_ctx, + decode_ctx.clone(), input.schema().as_ref(), extension_codec, )?; @@ -938,7 +945,7 @@ impl protobuf::PhysicalPlanNode { .map(|window_expr| { parse_physical_window_expr( window_expr, - decode_ctx, + decode_ctx.clone(), input_schema.as_ref(), extension_codec, ) @@ -951,7 +958,7 @@ impl protobuf::PhysicalPlanNode { .map(|expr| { parse_physical_expr( expr, - decode_ctx, + decode_ctx.clone(), input.schema().as_ref(), extension_codec, ) @@ -1017,7 +1024,7 @@ impl protobuf::PhysicalPlanNode { .map(|(expr, name)| { parse_physical_expr( expr, - decode_ctx, + decode_ctx.clone(), input.schema().as_ref(), extension_codec, ) @@ -1032,7 +1039,7 @@ impl protobuf::PhysicalPlanNode { .map(|(expr, name)| { parse_physical_expr( expr, - decode_ctx, + decode_ctx.clone(), input.schema().as_ref(), extension_codec, ) @@ -1064,7 +1071,7 @@ impl protobuf::PhysicalPlanNode { .map(|e| { parse_physical_expr( e, - decode_ctx, + decode_ctx.clone(), &physical_schema, extension_codec, ) @@ -1090,7 +1097,7 @@ impl protobuf::PhysicalPlanNode { .map(|e| { parse_physical_expr( e, - decode_ctx, + decode_ctx.clone(), &physical_schema, extension_codec, ) @@ -1102,7 +1109,7 @@ impl protobuf::PhysicalPlanNode { .map(|e| { parse_physical_sort_expr( e, - decode_ctx, + decode_ctx.clone(), &physical_schema, extension_codec, ) @@ -1185,13 +1192,13 @@ impl protobuf::PhysicalPlanNode { .map(|col| { let left = parse_physical_expr( &col.left.clone().unwrap(), - decode_ctx, + decode_ctx.clone(), left_schema.as_ref(), extension_codec, )?; let right = parse_physical_expr( &col.right.clone().unwrap(), - decode_ctx, + decode_ctx.clone(), right_schema.as_ref(), extension_codec, )?; @@ -1226,7 +1233,7 @@ impl protobuf::PhysicalPlanNode { f.expression.as_ref().ok_or_else(|| { proto_error("Unexpected empty filter expression") })?, - decode_ctx, + decode_ctx.clone(), &schema, extension_codec, )?; @@ -1301,13 +1308,13 @@ impl protobuf::PhysicalPlanNode { .map(|col| { let left = parse_physical_expr( &col.left.clone().unwrap(), - decode_ctx, + decode_ctx.clone(), left_schema.as_ref(), extension_codec, )?; let right = parse_physical_expr( &col.right.clone().unwrap(), - decode_ctx, + decode_ctx.clone(), right_schema.as_ref(), extension_codec, )?; @@ -1342,7 +1349,7 @@ impl protobuf::PhysicalPlanNode { f.expression.as_ref().ok_or_else(|| { proto_error("Unexpected empty filter expression") })?, - decode_ctx, &schema, + decode_ctx.clone(), &schema, extension_codec, )?; let column_indices = f.column_indices @@ -1367,7 +1374,7 @@ impl protobuf::PhysicalPlanNode { let left_sort_exprs = parse_physical_sort_exprs( &sym_join.left_sort_exprs, - decode_ctx, + decode_ctx.clone(), &left_schema, extension_codec, )?; @@ -1375,7 +1382,7 @@ impl protobuf::PhysicalPlanNode { let right_sort_exprs = parse_physical_sort_exprs( &sym_join.right_sort_exprs, - decode_ctx, + decode_ctx.clone(), &right_schema, extension_codec, )?; @@ -1500,7 +1507,7 @@ impl protobuf::PhysicalPlanNode { })? .as_ref(); Ok(PhysicalSortExpr { - expr: parse_physical_expr(expr, decode_ctx, input.schema().as_ref(), extension_codec)?, + expr: parse_physical_expr(expr, decode_ctx.clone(), input.schema().as_ref(), extension_codec)?, options: SortOptions { descending: !sort_expr.asc, nulls_first: sort_expr.nulls_first, @@ -1553,7 +1560,7 @@ impl protobuf::PhysicalPlanNode { Ok(PhysicalSortExpr { expr: parse_physical_expr( expr, - decode_ctx, + decode_ctx.clone(), input.schema().as_ref(), extension_codec, )?, @@ -1625,7 +1632,7 @@ impl protobuf::PhysicalPlanNode { f.expression.as_ref().ok_or_else(|| { proto_error("Unexpected empty filter expression") })?, - decode_ctx, &schema, + decode_ctx.clone(), &schema, extension_codec, )?; let column_indices = f.column_indices @@ -1705,7 +1712,7 @@ impl protobuf::PhysicalPlanNode { .map(|collection| { parse_physical_sort_exprs( &collection.physical_sort_expr_nodes, - decode_ctx, + decode_ctx.clone(), &sink_schema, extension_codec, ) @@ -1742,7 +1749,7 @@ impl protobuf::PhysicalPlanNode { .map(|collection| { parse_physical_sort_exprs( &collection.physical_sort_expr_nodes, - decode_ctx, + decode_ctx.clone(), &sink_schema, extension_codec, ) @@ -1782,7 +1789,7 @@ impl protobuf::PhysicalPlanNode { .map(|collection| { parse_physical_sort_exprs( &collection.physical_sort_expr_nodes, - decode_ctx, + decode_ctx.clone(), &sink_schema, extension_codec, ) @@ -1858,7 +1865,7 @@ impl protobuf::PhysicalPlanNode { f.expression.as_ref().ok_or_else(|| { proto_error("Unexpected empty filter expression") })?, - decode_ctx, + decode_ctx.clone(), &schema, extension_codec, )?; @@ -1919,13 +1926,13 @@ impl protobuf::PhysicalPlanNode { .map(|col| { let left = parse_physical_expr( &col.left.clone().unwrap(), - decode_ctx, + decode_ctx.clone(), left_schema.as_ref(), extension_codec, )?; let right = parse_physical_expr( &col.right.clone().unwrap(), - decode_ctx, + decode_ctx.clone(), right_schema.as_ref(), extension_codec, )?; diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index 8b03193e7f99..e7849aafb369 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -50,38 +50,82 @@ impl FunctionRegistry for FunctionRegistryImpl { } ``` -### `datafusion-proto` use `TaskContext` rather than `SessionContext` in physical plan serde methods +### `datafusion-proto` introduces `DecodeContext` with backwards compatibility for `TaskContext` -There have been changes in the public API methods of `datafusion-proto` which handle physical plan serde. +The `datafusion-proto` crate has introduced a new `DecodeContext` type to improve performance through expression caching during physical plan deserialization. This change affects public API methods that handle physical plan serialization/deserialization. -Methods like `physical_plan_from_bytes`, `parse_physical_expr` and similar, expect `TaskContext` instead of `SessionContext` +#### Backwards Compatibility -```diff -- let plan2 = physical_plan_from_bytes(&bytes, &ctx)?; -+ let plan2 = physical_plan_from_bytes(&bytes, &ctx.task_ctx())?; +For backwards compatibility, most public functions now accept either `&TaskContext` or `DecodeContext`: + +```rust +// Both of these work: +let expr1 = parse_physical_expr(proto, &task_ctx, schema, codec)?; // Old way - still works +let expr2 = parse_physical_expr(proto, DecodeContext::new(&task_ctx), schema, codec)?; // New way - recommended ``` -as `TaskContext` contains `RuntimeEnv` methods such as `try_into_physical_plan` will not have explicit `RuntimeEnv` parameter. +Functions with backwards compatibility support include: +- `parse_physical_expr` +- `parse_physical_exprs` +- `parse_physical_sort_expr` / `parse_physical_sort_exprs` +- `parse_physical_window_expr` +- `parse_protobuf_partitioning` / `parse_protobuf_hash_partitioning` +- `parse_protobuf_file_scan_config` +#### Why use `DecodeContext` directly? + +When passing `&TaskContext`, a new `DecodeContext` is created internally without caching. For optimal performance with expression deduplication (when the same physical expression appears multiple times in a plan), create and reuse a `DecodeContext`: + +```rust +let decode_ctx = DecodeContext::new(&task_ctx); +let expr1 = parse_physical_expr(proto1, decode_ctx.clone(), schema, codec)?; +let expr2 = parse_physical_expr(proto2, decode_ctx.clone(), schema, codec)?; +// Expressions are deduplicated via the shared cache +``` + +#### Breaking Changes in Traits + +The following trait methods now require `&DecodeContext` (no automatic conversion): + +**`AsExecutionPlan::try_into_physical_plan`:** ```diff let result_exec_plan: Arc = proto -- .try_into_physical_plan(&ctx, runtime.deref(), &composed_codec) -+. .try_into_physical_plan(&ctx.task_ctx(), &composed_codec) +- .try_into_physical_plan(&ctx.task_ctx(), &composed_codec) ++ .try_into_physical_plan(&DecodeContext::new(&ctx.task_ctx()), &composed_codec) ``` -`PhysicalExtensionCodec::try_decode()` expects `TaskContext` instead of `FunctionRegistry`: - +**`PhysicalExtensionCodec::try_decode`:** ```diff pub trait PhysicalExtensionCodec { fn try_decode( &self, buf: &[u8], inputs: &[Arc], -- registry: &dyn FunctionRegistry, -+ ctx: &TaskContext, +- ctx: &TaskContext, ++ ctx: &DecodeContext, ) -> Result>; ``` +To update your implementation: +```rust +impl PhysicalExtensionCodec for MyCodec { + fn try_decode( + &self, + buf: &[u8], + inputs: &[Arc], + ctx: &DecodeContext, // Changed from &TaskContext + ) -> Result> { + // Access the TaskContext if needed: + let task_ctx = ctx.task_context(); + // ... your implementation + } +} +``` + +#### Future Deprecation + +The backwards compatibility (automatic conversion from `&TaskContext`) may be removed in future versions. It is recommended to migrate to using `DecodeContext` directly. + See [issue #17601] for more details. [issue #17601]: https://github.com/apache/datafusion/issues/17601