-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-54410][SQL] Fix read support for the variant logical type annotation #53120
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-54410][SQL] Fix read support for the variant logical type annotation #53120
Conversation
cashmand
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for making this PR! Just a few small questions and suggestions.
|
|
||
| val PARQUET_IGNORE_VARIANT_ANNOTATION = | ||
| buildConf("spark.sql.parquet.ignoreVariantAnnotation") | ||
| .doc("When true, ignore the variant logical type annotation and treat the Parquet " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we mark this conf as .internal()? I think the main use case is to simplify debugging issues with the raw variant bytes, but let me know if there's a reason for this conf that I'm missing. Assuming my understanding is right, maybe we can also mention the intended use case in the doc comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
.../src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
Show resolved
Hide resolved
...c/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
Outdated
Show resolved
Hide resolved
.../src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkShreddingUtils.scala
Outdated
Show resolved
Hide resolved
| throw QueryCompilationErrors.invalidVariantWrongNumFieldsError() | ||
| } | ||
| val valueAndMetadata = Seq("value", "metadata").map { colName => | ||
| val Seq(v, m) = Seq("value", "metadata").map { colName => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| val Seq(v, m) = Seq("value", "metadata").map { colName => | |
| val Seq(value, metadata) = Seq("value", "metadata").map { colName => |
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should target Apache Spark 4.2.0, @harshmotw-db .
Please fix the config version.
.version("4.1.0")
|
Hi @dongjoon-hyun , this is the last piece of variant type support in 4.1. We have been using Parquet variant logical type when writing Spark variant to Parquet (already in branch-4.1), and we should also support reading it back. |
To @cloud-fan , I'm not sure this is the last piece. However, if you want, I'd like to ask you to revise the PR title as a kind of bug fix. Since |
| buildConf("spark.sql.parquet.ignoreVariantAnnotation") | ||
| .doc("When true, ignore the variant logical type annotation and treat the Parquet " + | ||
| "column in the same way as the underlying struct type") | ||
| .version("4.1.0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is a bug fix, this should be 4.0.2, @harshmotw-db and @cloud-fan .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if the parquet version we use in Spark 4.0 has the variant logical type. I'll leave it to @harshmotw-db
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if the parquet version we use in Spark 4.0 has the variant logical type. I'll leave it to @harshmotw-db
Thanks. We can continue our discussion if we are not sure. AFAIK, it means there is no regression at Apache Spark 4.1.0 from Apache Spark 4.0.0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the record, for the improvement, this should be 4.2.0 according to the Apache Spark community policy, @harshmotw-db and @cloud-fan .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given Spark 4.1 has upgraded the parquet version which has logical variant type, I think 4.1 should support reading parquet files with native variant type fields?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, we can say that it's still simply unsupported feature like we did in Apache Spark 4.0.0 variant. It's too late if this is an improvement, @cloud-fan .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR practically is a fix already. This PR added a temporary workaround for reading variant data mainly for testing purposes (see this line). Essentially, the existing code behaves as if ignoreVariantAnnotation = false. This PR just implements this code more formally so we actually do make sure that the target type matches the actual parquet type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't you revise the PR title more properly which looks like a fix literally, @harshmotw-db ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, the ParquetRowConverter fix is essential since currently, when VARIANT_ALLOW_READING_SHREDDED = false, the reader is broken when the parquet schema is struct<metadata, value> instead of struct<value, metadata>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, in practice it is a fix. I need to head out for an hour and I will change the PR title after that
I removed my review request from this PR.
| "column in the same way as the underlying struct type") | ||
| .version("4.1.0") | ||
| .booleanConf | ||
| .createWithDefault(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When this should be true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's mainly for debugging purposes if we need to extract the raw variant bytes by specifying the schema as say struct<value: Binary, metadata: Binary>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, for that purpose, let's remove this configuration. You can use logDebug instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct me if I'm wrong but I don't think logDebug would be helpful here if we want to extract variant columns into a custom schema in a Spark DataFrame. This config is a good tool to debug issues in a Parquet file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May I ask why you think it that way? You told me that It's mainly for debugging purposes, right?
Correct me if I'm wrong but I don't think logDebug would be helpful here if we want to extract variant columns into a custom schema in a Spark DataFrame. This config is a good tool to debug issues in a Parquet file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added a new test variant logical type annotation - ignore variant annotation to demonstrate this point.
So, if the ignoreVariantAnnotation config is enabled, you can read a parquet file with an underlying variant column into a struct of binaries schema. So for a variant column v, you could run:
spark.read.format("parquer").schema("v struct<value: BINARY, metadata: BINARY>").load(...) and it would load the value and metadata columns into these fields even though the data is logically not a struct of two binaries but is instead a variant. People could use this to debug the physical variant values.
If the config is disabled, which is the default, this read would give an error and you would need to read variant columns into a variant schema.
| convertInternal(groupColumn, None) | ||
| case v: VariantLogicalTypeAnnotation if v.getSpecVersion == 1 => | ||
| if (ignoreVariantAnnotation) { | ||
| convertInternal(groupColumn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand the reason why we need to maintain this logic for pure debugging purpose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
|
||
| /** Parquet converter for unshredded Variant */ | ||
| // Parquet converter for unshredded Variant. | ||
| @deprecated("We use this converter when the `spark.sql.variant.allowReadingShredded` config " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not a public API and we don't need to use the deprecated annotation. We can just put it as normal code comment.
|
Thank you, @harshmotw-db . Could you resolve the remaining @cloud-fan 's comment, #53120 (comment) and #53120 (comment) ? |
|
@dongjoon-hyun @cloud-fan Thanks for reviewing! I have addressed your comments |
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Thank you, @harshmotw-db and @cloud-fan .
Merged to master/4.1 for Apache Spark 4.1.0.
…tation ### What changes were proposed in this pull request? [This PR](#53005) introduced a fix where the Spark parquet writer would annotate variant columns with the parquet variant logical type. The PR had an ad-hoc fix on the reader side for validation. This PR formally allows Spark to read parquet files with the Variant logical type. The PR also introduces an unrelated fix in ParquetRowConverter to allow Spark to read variant columns regardless of which order the value and metadata fields are stored in. ### Why are the changes needed? The variant logical type annotation has formally been adopted as part of the parquet spec in is part of the parquet-java 1.16.0 library. Therefore, Spark should be able to read files containing data annotated as such. ### Does this PR introduce _any_ user-facing change? Yes, it allows users to read parquet files with the variant logical type annotation. ### How was this patch tested? Existing test from [this PR](#53005) where we wrote data of the variant logical type and tested read using an ad-hoc solution. ### Was this patch authored or co-authored using generative AI tooling? No Closes #53120 from harshmotw-db/harshmotw-db/variant_annotation_write. Authored-by: Harsh Motwani <harsh.motwani@databricks.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit da7389b) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…tation ### What changes were proposed in this pull request? [This PR](apache#53005) introduced a fix where the Spark parquet writer would annotate variant columns with the parquet variant logical type. The PR had an ad-hoc fix on the reader side for validation. This PR formally allows Spark to read parquet files with the Variant logical type. The PR also introduces an unrelated fix in ParquetRowConverter to allow Spark to read variant columns regardless of which order the value and metadata fields are stored in. ### Why are the changes needed? The variant logical type annotation has formally been adopted as part of the parquet spec in is part of the parquet-java 1.16.0 library. Therefore, Spark should be able to read files containing data annotated as such. ### Does this PR introduce _any_ user-facing change? Yes, it allows users to read parquet files with the variant logical type annotation. ### How was this patch tested? Existing test from [this PR](apache#53005) where we wrote data of the variant logical type and tested read using an ad-hoc solution. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#53120 from harshmotw-db/harshmotw-db/variant_annotation_write. Authored-by: Harsh Motwani <harsh.motwani@databricks.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…tation ### What changes were proposed in this pull request? [This PR](apache#53005) introduced a fix where the Spark parquet writer would annotate variant columns with the parquet variant logical type. The PR had an ad-hoc fix on the reader side for validation. This PR formally allows Spark to read parquet files with the Variant logical type. The PR also introduces an unrelated fix in ParquetRowConverter to allow Spark to read variant columns regardless of which order the value and metadata fields are stored in. ### Why are the changes needed? The variant logical type annotation has formally been adopted as part of the parquet spec in is part of the parquet-java 1.16.0 library. Therefore, Spark should be able to read files containing data annotated as such. ### Does this PR introduce _any_ user-facing change? Yes, it allows users to read parquet files with the variant logical type annotation. ### How was this patch tested? Existing test from [this PR](apache#53005) where we wrote data of the variant logical type and tested read using an ad-hoc solution. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#53120 from harshmotw-db/harshmotw-db/variant_annotation_write. Authored-by: Harsh Motwani <harsh.motwani@databricks.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
What changes were proposed in this pull request?
This PR introduced a fix where the Spark parquet writer would annotate variant columns with the parquet variant logical type. The PR had an ad-hoc fix on the reader side for validation. This PR formally allows Spark to read parquet files with the Variant logical type.
The PR also introduces an unrelated fix in ParquetRowConverter to allow Spark to read variant columns regardless of which order the value and metadata fields are stored in.
Why are the changes needed?
The variant logical type annotation has formally been adopted as part of the parquet spec in is part of the parquet-java 1.16.0 library. Therefore, Spark should be able to read files containing data annotated as such.
Does this PR introduce any user-facing change?
Yes, it allows users to read parquet files with the variant logical type annotation.
How was this patch tested?
Existing test from this PR where we wrote data of the variant logical type and tested read using an ad-hoc solution.
Was this patch authored or co-authored using generative AI tooling?
No