-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-54388][SS] Introduce StatePartitionReader that scan raw bytes for Single ColFamily #53104
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
micheal-o
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did a quick pass. It is in the right direction. Just needs some changes.
...ore/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
Outdated
Show resolved
Hide resolved
...ore/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
Outdated
Show resolved
Hide resolved
...ore/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
Outdated
Show resolved
Hide resolved
...rc/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala
Outdated
Show resolved
Hide resolved
...rc/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala
Outdated
Show resolved
Hide resolved
...re/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/utils/SchemaUtil.scala
Outdated
Show resolved
Hide resolved
...re/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/utils/SchemaUtil.scala
Outdated
Show resolved
Hide resolved
...main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
Outdated
Show resolved
Hide resolved
...rc/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
Outdated
Show resolved
Hide resolved
42540b1 to
0a878e9
Compare
...re/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/utils/SchemaUtil.scala
Outdated
Show resolved
Hide resolved
...ore/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
Outdated
Show resolved
Hide resolved
...ore/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
Outdated
Show resolved
Hide resolved
...ore/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
Outdated
Show resolved
Hide resolved
...rc/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala
Show resolved
Hide resolved
...rc/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala
Outdated
Show resolved
Hide resolved
...he/spark/sql/execution/datasources/v2/state/StatePartitionReaderAllColumnFamiliesSuite.scala
Outdated
Show resolved
Hide resolved
...he/spark/sql/execution/datasources/v2/state/StatePartitionReaderAllColumnFamiliesSuite.scala
Outdated
Show resolved
Hide resolved
...he/spark/sql/execution/datasources/v2/state/StatePartitionReaderAllColumnFamiliesSuite.scala
Outdated
Show resolved
Hide resolved
...he/spark/sql/execution/datasources/v2/state/StatePartitionReaderAllColumnFamiliesSuite.scala
Outdated
Show resolved
Hide resolved
...he/spark/sql/execution/datasources/v2/state/StatePartitionReaderAllColumnFamiliesSuite.scala
Outdated
Show resolved
Hide resolved
932e054 to
279ddf5
Compare
...rc/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala
Outdated
Show resolved
Hide resolved
...he/spark/sql/execution/datasources/v2/state/StatePartitionAllColumnFamiliesReaderSuite.scala
Outdated
Show resolved
Hide resolved
c10eed0 to
99e2412
Compare
...ore/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
Outdated
Show resolved
Hide resolved
...ore/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
Show resolved
Hide resolved
...rc/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala
Outdated
Show resolved
Hide resolved
...rc/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala
Outdated
Show resolved
Hide resolved
...rc/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala
Outdated
Show resolved
Hide resolved
...rc/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala
Outdated
Show resolved
Hide resolved
...ain/scala/org/apache/spark/sql/execution/streaming/state/OfflineStateRepartitionErrors.scala
Outdated
Show resolved
Hide resolved
|
Also CI is failing due to linter error for your changes. PTAL |
194364d to
b46e8d1
Compare
micheal-o
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did another round of review. It is almost there. Thanks
...ore/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
Show resolved
Hide resolved
...rc/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala
Outdated
Show resolved
Hide resolved
...ain/scala/org/apache/spark/sql/execution/streaming/state/OfflineStateRepartitionErrors.scala
Show resolved
Hide resolved
...ain/scala/org/apache/spark/sql/execution/streaming/state/OfflineStateRepartitionErrors.scala
Outdated
Show resolved
Hide resolved
...he/spark/sql/execution/datasources/v2/state/StatePartitionAllColumnFamiliesReaderSuite.scala
Outdated
Show resolved
Hide resolved
...he/spark/sql/execution/datasources/v2/state/StatePartitionAllColumnFamiliesReaderSuite.scala
Outdated
Show resolved
Hide resolved
|
@zifeif2 please also fix the CI failure. |
0f8f7d3 to
392d498
Compare
392d498 to
48521c3
Compare
micheal-o
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stamped with some minor comments. Mostly looks good now. Thanks
| Seq(INTERNAL_ONLY_READ_ALL_COLUMN_FAMILIES, STATE_VAR_NAME)) | ||
| } | ||
|
|
||
| if (internalOnlyReadAllColumnFamilies && joinSide != JoinSideValues.none) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You forgot to address this comment: https://github.com/apache/spark/pull/53104/files#r2567180585
|
|
||
| def unsupportedStateStoreProviderError( | ||
| checkpointLocation: String, | ||
| providerClass: String): StateRepartitionUnsupportedProviderError = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: return type StateRepartitionInvalidCheckpointError
| subClass = "UNSUPPORTED_OFFSET_SEQ_VERSION", | ||
| messageParameters = Map("version" -> version.toString)) | ||
|
|
||
| class StateRepartitionUnsupportedProviderError( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still haven't fixed this indentation
What changes were proposed in this pull request?
Introducing a new StatePartitionReader - StatePartitionReaderAllColumnFamilies to support offline repartition.
StatePartitionReaderAllColumnFamilies is invoked when user specify option
readAllColumnFamiliesto true.We have the StateDataSource Reader, which allows customers to read the rows in an operator state store using the DataFrame API, just like they read a normal table. But it currently only supports reading one column family in the state store at a time.
We would introduce a change to allow reading all the state rows in all the column families, so that we can repartition them at once. This would allow us to read the entire state store, repartition the rows, and then save the new repartition state rows to the cloud. This also has a perf impact, since we don’t have to read each column family separately. We would read the state based on the last committed batch version.
Since each column family can have a different schema, the DataFrame we will return will treat the key and value row as bytes -
Why are the changes needed?
See above
Does this PR introduce any user-facing change?
No
How was this patch tested?
See unit test. It not only verify the schema, but also validate the data are serialized to bytes correctly by comparing them against the normal queried data frame
Was this patch authored or co-authored using generative AI tooling?
Yes. haiku, sonnet.