Skip to content

Commit 2dbd9c7

Browse files
authored
Support Non-Literal Expressions in Substrait VirtualTable Values and Improve Round-Trip Robustness (#18866)
## Which issue does this PR close? * Closes #16276. ## Rationale for this change This PR addresses failures in Substrait round‑trip mode where Virtual Tables were limited to accepting only literals or aliased literals. Several SQLLogicTests—especially those involving nested or scalar functions such as `map()`—failed because expressions were rejected prematurely. Allowing expression-based rows when exporting Virtual Tables is essential to making Substrait → DataFusion → Substrait round‑trips stable and representative of real workloads. Fixing this improves test coverage, unblocks Substrait integration efforts, and eliminates a major source of round‑trip inconsistencies. ### Before ``` ❯ cargo test --test sqllogictests -- --substrait-round-trip map.slt:388 Finished `test` profile [unoptimized + debuginfo] target(s) in 0.66s Running bin/sqllogictests.rs (target/debug/deps/sqllogictests-d29eafa7e841495b) Completed 1 test files in 0 seconds External error: 2 errors in file /Users/kosiew/GitHub/datafusion/datafusion/sqllogictest/test_files/map.slt 1. query failed: DataFusion error: This feature is not implemented: Unsupported plan type: DescribeTable { schema: Schema ... [SQL] describe data; at /Users/kosiew/GitHub/datafusion/datafusion/sqllogictest/test_files/map.slt:43 2. query failed: DataFusion error: Substrait error: Only literal types and aliases are supported in Virtual Tables, got: ScalarFunction [SQL] VALUES (MAP(['a'], [1])), (MAP(['b'], [2])), (MAP(['c', 'a'], [3, 1])) at /Users/kosiew/GitHub/datafusion/datafusion/sqllogictest/test_files/map.slt:388 ``` ### After ``` ❯ cargo test --test sqllogictests -- --substrait-round-trip map.slt:388 Finished `test` profile [unoptimized + debuginfo] target(s) in 0.50s Running bin/sqllogictests.rs (target/debug/deps/sqllogictests-62df8e0045f6176f) Completed 1 test files in 0 seconds ``` ## What changes are included in this PR? * Introduces `convert_literal_rows` and `convert_expression_rows` helpers to properly distinguish literal-only rows from expression-based rows. * Adds support for generating nested `Struct` expressions (Substrait `NestedStruct`) for non-literal Virtual Table values. * Updates `from_values` to dynamically choose between literal structs and nested structs. * Ensures schema consistency validation for expression-based rows. * Adds new round‑trip test cases, including `roundtrip_values_with_scalar_function`. * Refactors tests to use a new `substrait_roundtrip` helper to reduce duplication. * Adds `LogicalPlan::DescribeTable(_)` handling in the round‑trip engine. ## Are these changes tested? Yes. New test coverage includes: * A dedicated round‑trip test for scalar functions inside `VALUES` clauses. * Use of `substrait_roundtrip` across multiple existing tests to ensure consistent behavior. * Validation of expression-based row handling, alias stripping, and schema equivalence. ## Are there any user-facing changes? No direct API-breaking changes. The behavior of Substrait round‑trip conversion improves and becomes more permissive regarding expressions in Virtual Tables. This is internal to Substrait/DataFusion integration and does not change SQL semantics or user-level APIs. ## LLM-generated code disclosure This PR includes LLM-generated code and comments. All LLM-generated content has been manually reviewed and tested.
1 parent 1cc9bcd commit 2dbd9c7

File tree

3 files changed

+132
-38
lines changed

3 files changed

+132
-38
lines changed

datafusion/sqllogictest/src/engines/datafusion_substrait_roundtrip_engine/runner.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ async fn run_query_substrait_round_trip(
152152
| LogicalPlan::Explain(_)
153153
| LogicalPlan::Dml(_)
154154
| LogicalPlan::Copy(_)
155+
| LogicalPlan::DescribeTable(_)
155156
| LogicalPlan::Statement(_) => df.logical_plan().clone(),
156157
// For any other plan, convert to Substrait
157158
logical_plan => {

datafusion/substrait/src/logical_plan/producer/rel/read_rel.rs

Lines changed: 83 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,76 @@ use datafusion::common::{not_impl_err, substrait_datafusion_err, DFSchema, ToDFS
2222
use datafusion::logical_expr::utils::conjunction;
2323
use datafusion::logical_expr::{EmptyRelation, Expr, TableScan, Values};
2424
use std::sync::Arc;
25-
use substrait::proto::expression::literal::Struct;
25+
use substrait::proto::expression::literal::Struct as LiteralStruct;
2626
use substrait::proto::expression::mask_expression::{StructItem, StructSelect};
27+
use substrait::proto::expression::nested::Struct as NestedStruct;
2728
use substrait::proto::expression::MaskExpression;
2829
use substrait::proto::read_rel::{NamedTable, ReadType, VirtualTable};
2930
use substrait::proto::rel::RelType;
3031
use substrait::proto::{ReadRel, Rel};
3132

33+
/// Converts rows of literal expressions into Substrait literal structs.
34+
///
35+
/// Each row is expected to contain only `Expr::Literal` or `Expr::Alias` wrapping literals.
36+
/// Aliases are unwrapped and the underlying literal is converted.
37+
fn convert_literal_rows(
38+
producer: &mut impl SubstraitProducer,
39+
rows: &[Vec<Expr>],
40+
) -> datafusion::common::Result<Vec<LiteralStruct>> {
41+
rows.iter()
42+
.map(|row| {
43+
let fields = row
44+
.iter()
45+
.map(|expr| match expr {
46+
Expr::Literal(sv, _) => to_substrait_literal(producer, sv),
47+
Expr::Alias(alias) => match alias.expr.as_ref() {
48+
// The schema gives us the names, so we can skip aliases
49+
Expr::Literal(sv, _) => to_substrait_literal(producer, sv),
50+
_ => Err(substrait_datafusion_err!(
51+
"Only literal types can be aliased in Virtual Tables, got: {}",
52+
alias.expr.variant_name()
53+
)),
54+
},
55+
_ => Err(substrait_datafusion_err!(
56+
"Only literal types and aliases are supported in Virtual Tables, got: {}",
57+
expr.variant_name()
58+
)),
59+
})
60+
.collect::<datafusion::common::Result<_>>()?;
61+
Ok(LiteralStruct { fields })
62+
})
63+
.collect()
64+
}
65+
66+
/// Converts rows of arbitrary expressions into Substrait nested structs.
67+
///
68+
/// Validates that each row has the expected schema length and converts each expression
69+
/// using the producer's expression handler.
70+
fn convert_expression_rows(
71+
producer: &mut impl SubstraitProducer,
72+
rows: &[Vec<Expr>],
73+
schema_len: usize,
74+
empty_schema: &Arc<DFSchema>,
75+
) -> datafusion::common::Result<Vec<NestedStruct>> {
76+
rows.iter()
77+
.map(|row| {
78+
if row.len() != schema_len {
79+
return Err(substrait_datafusion_err!(
80+
"Names list must match exactly to nested schema, but found {} uses for {} names",
81+
row.len(),
82+
schema_len
83+
));
84+
}
85+
86+
let fields = row
87+
.iter()
88+
.map(|expr| producer.handle_expr(expr, empty_schema))
89+
.collect::<datafusion::common::Result<_>>()?;
90+
Ok(NestedStruct { fields })
91+
})
92+
.collect()
93+
}
94+
3295
pub fn from_table_scan(
3396
producer: &mut impl SubstraitProducer,
3497
scan: &TableScan,
@@ -111,29 +174,25 @@ pub fn from_values(
111174
producer: &mut impl SubstraitProducer,
112175
v: &Values,
113176
) -> datafusion::common::Result<Box<Rel>> {
114-
let values = v
115-
.values
116-
.iter()
117-
.map(|row| {
118-
let fields = row
119-
.iter()
120-
.map(|v| match v {
121-
Expr::Literal(sv, _) => to_substrait_literal(producer, sv),
122-
Expr::Alias(alias) => match alias.expr.as_ref() {
123-
// The schema gives us the names, so we can skip aliases
124-
Expr::Literal(sv, _) => to_substrait_literal(producer, sv),
125-
_ => Err(substrait_datafusion_err!(
126-
"Only literal types can be aliased in Virtual Tables, got: {}", alias.expr.variant_name()
127-
)),
128-
},
129-
_ => Err(substrait_datafusion_err!(
130-
"Only literal types and aliases are supported in Virtual Tables, got: {}", v.variant_name()
131-
)),
132-
})
133-
.collect::<datafusion::common::Result<_>>()?;
134-
Ok(Struct { fields })
177+
let schema_len = v.schema.fields().len();
178+
let empty_schema = Arc::new(DFSchema::empty());
179+
180+
let use_literals = v.values.iter().all(|row| {
181+
row.iter().all(|expr| match expr {
182+
Expr::Literal(_, _) => true,
183+
Expr::Alias(alias) => matches!(alias.expr.as_ref(), Expr::Literal(_, _)),
184+
_ => false,
135185
})
136-
.collect::<datafusion::common::Result<_>>()?;
186+
});
187+
188+
let (values, expressions) = if use_literals {
189+
let values = convert_literal_rows(producer, &v.values)?;
190+
(values, vec![])
191+
} else {
192+
let expressions =
193+
convert_expression_rows(producer, &v.values, schema_len, &empty_schema)?;
194+
(vec![], expressions)
195+
};
137196
#[allow(deprecated)]
138197
Ok(Box::new(Rel {
139198
rel_type: Some(RelType::Read(Box::new(ReadRel {
@@ -145,7 +204,7 @@ pub fn from_values(
145204
advanced_extension: None,
146205
read_type: Some(ReadType::VirtualTable(VirtualTable {
147206
values,
148-
expressions: vec![],
207+
expressions,
149208
})),
150209
}))),
151210
}))

datafusion/substrait/tests/cases/roundtrip_logical_plan.rs

Lines changed: 48 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
use crate::utils::test::read_json;
1919
use datafusion::arrow::array::ArrayRef;
20+
use datafusion::functions_nested::map::map;
21+
use datafusion::logical_expr::LogicalPlanBuilder;
2022
use datafusion::physical_plan::Accumulator;
2123
use datafusion::scalar::ScalarValue;
2224
use datafusion_substrait::logical_plan::{
@@ -26,6 +28,7 @@ use std::cmp::Ordering;
2628
use std::mem::size_of_val;
2729

2830
use datafusion::arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit};
31+
use datafusion::common::tree_node::Transformed;
2932
use datafusion::common::{not_impl_err, plan_err, DFSchema, DFSchemaRef};
3033
use datafusion::error::Result;
3134
use datafusion::execution::registry::SerializerRegistry;
@@ -1266,6 +1269,34 @@ async fn roundtrip_values_no_columns() -> Result<()> {
12661269
Ok(())
12671270
}
12681271

1272+
#[tokio::test]
1273+
async fn roundtrip_values_with_scalar_function() -> Result<()> {
1274+
let ctx = create_context().await?;
1275+
// datafusion::functions_nested::map::map;
1276+
let expr = map(vec![lit("a")], vec![lit(1)]);
1277+
let plan = LogicalPlanBuilder::values(vec![vec![expr]])?.build()?;
1278+
let expected = ctx.state().optimize(&plan)?;
1279+
1280+
let actual = substrait_roundtrip(&plan, &ctx).await?;
1281+
1282+
let strip_aliases_from_values = |plan: &LogicalPlan| -> LogicalPlan {
1283+
plan.clone()
1284+
.map_expressions(|expr| Ok(Transformed::yes(expr.unalias())))
1285+
.map(|t| t.data)
1286+
.unwrap_or_else(|_| plan.clone())
1287+
};
1288+
1289+
let normalized_expected = strip_aliases_from_values(&expected);
1290+
let normalized_actual = strip_aliases_from_values(&actual);
1291+
1292+
assert_eq!(
1293+
format!("{normalized_expected}"),
1294+
format!("{normalized_actual}")
1295+
);
1296+
assert_eq!(normalized_expected.schema(), normalized_actual.schema());
1297+
Ok(())
1298+
}
1299+
12691300
#[tokio::test]
12701301
async fn roundtrip_values_empty_relation() -> Result<()> {
12711302
roundtrip("SELECT * FROM (VALUES ('a')) LIMIT 0").await
@@ -1449,9 +1480,7 @@ async fn roundtrip_repartition_roundrobin() -> Result<()> {
14491480
partitioning_scheme: Partitioning::RoundRobinBatch(8),
14501481
});
14511482

1452-
let proto = to_substrait_plan(&plan, &ctx.state())?;
1453-
let plan2 = from_substrait_plan(&ctx.state(), &proto).await?;
1454-
let plan2 = ctx.state().optimize(&plan2)?;
1483+
let plan2 = substrait_roundtrip(&plan, &ctx).await?;
14551484

14561485
assert_eq!(format!("{plan}"), format!("{plan2}"));
14571486
Ok(())
@@ -1466,9 +1495,7 @@ async fn roundtrip_repartition_hash() -> Result<()> {
14661495
partitioning_scheme: Partitioning::Hash(vec![col("data.a")], 8),
14671496
});
14681497

1469-
let proto = to_substrait_plan(&plan, &ctx.state())?;
1470-
let plan2 = from_substrait_plan(&ctx.state(), &proto).await?;
1471-
let plan2 = ctx.state().optimize(&plan2)?;
1498+
let plan2 = substrait_roundtrip(&plan, &ctx).await?;
14721499

14731500
assert_eq!(format!("{plan}"), format!("{plan2}"));
14741501
Ok(())
@@ -1650,9 +1677,7 @@ async fn assert_expected_plan(
16501677
let ctx = create_context().await?;
16511678
let df = ctx.sql(sql).await?;
16521679
let plan = df.into_optimized_plan()?;
1653-
let proto = to_substrait_plan(&plan, &ctx.state())?;
1654-
let plan2 = from_substrait_plan(&ctx.state(), &proto).await?;
1655-
let plan2 = ctx.state().optimize(&plan2)?;
1680+
let plan2 = substrait_roundtrip(&plan, &ctx).await?;
16561681

16571682
if assert_schema {
16581683
assert_eq!(plan.schema(), plan2.schema());
@@ -1694,9 +1719,7 @@ async fn roundtrip_fill_na(sql: &str) -> Result<()> {
16941719
let ctx = create_context().await?;
16951720
let df = ctx.sql(sql).await?;
16961721
let plan = df.into_optimized_plan()?;
1697-
let proto = to_substrait_plan(&plan, &ctx.state())?;
1698-
let plan2 = from_substrait_plan(&ctx.state(), &proto).await?;
1699-
let plan2 = ctx.state().optimize(&plan2)?;
1722+
let plan2 = substrait_roundtrip(&plan, &ctx).await?;
17001723

17011724
// Format plan string and replace all None's with 0
17021725
let plan1str = format!("{plan}").replace("None", "0");
@@ -1708,6 +1731,18 @@ async fn roundtrip_fill_na(sql: &str) -> Result<()> {
17081731
Ok(())
17091732
}
17101733

1734+
/// Converts a logical plan to Substrait and back, applying optimization.
1735+
/// Returns the roundtripped and optimized logical plan.
1736+
async fn substrait_roundtrip(
1737+
plan: &LogicalPlan,
1738+
ctx: &SessionContext,
1739+
) -> Result<LogicalPlan> {
1740+
let proto = to_substrait_plan(plan, &ctx.state())?;
1741+
let plan2 = from_substrait_plan(&ctx.state(), &proto).await?;
1742+
let plan2 = ctx.state().optimize(&plan2)?;
1743+
Ok(plan2)
1744+
}
1745+
17111746
async fn test_alias(sql_with_alias: &str, sql_no_alias: &str) -> Result<()> {
17121747
// Since we ignore the SubqueryAlias in the producer, the result should be
17131748
// the same as producing a Substrait plan from the same query without aliases
@@ -1735,8 +1770,7 @@ async fn roundtrip_logical_plan_with_ctx(
17351770
ctx: SessionContext,
17361771
) -> Result<Box<Plan>> {
17371772
let proto = to_substrait_plan(&plan, &ctx.state())?;
1738-
let plan2 = from_substrait_plan(&ctx.state(), &proto).await?;
1739-
let plan2 = ctx.state().optimize(&plan2)?;
1773+
let plan2 = substrait_roundtrip(&plan, &ctx).await?;
17401774

17411775
let plan1str = format!("{plan}");
17421776
let plan2str = format!("{plan2}");

0 commit comments

Comments
 (0)