Skip to content

Conversation

@zifeif2
Copy link

@zifeif2 zifeif2 commented Nov 18, 2025

What changes were proposed in this pull request?

Introducing a new StatePartitionReader - StatePartitionReaderAllColumnFamilies to support offline repartition.
StatePartitionReaderAllColumnFamilies is invoked when user specify option readAllColumnFamilies to true.

We have the StateDataSource Reader, which allows customers to read the rows in an operator state store using the DataFrame API, just like they read a normal table. But it currently only supports reading one column family in the state store at a time.

We would introduce a change to allow reading all the state rows in all the column families, so that we can repartition them at once. This would allow us to read the entire state store, repartition the rows, and then save the new repartition state rows to the cloud. This also has a perf impact, since we don’t have to read each column family separately. We would read the state based on the last committed batch version.

Since each column family can have a different schema, the DataFrame we will return will treat the key and value row as bytes -

  • partition_key (string)
  • key_bytes (binary)
  • value_bytes (binary)
  • column_family_name (string)

Why are the changes needed?

See above

Does this PR introduce any user-facing change?

No

How was this patch tested?

See unit test. It not only verify the schema, but also validate the data are serialized to bytes correctly by comparing them against the normal queried data frame

Was this patch authored or co-authored using generative AI tooling?

Yes. haiku, sonnet.

@zifeif2 zifeif2 changed the title [WIP] [SPARK-54388][SS] Introduce StatePartitionReader that scan raw bytes for Single ColFamily [SPARK-54388][SS] Introduce StatePartitionReader that scan raw bytes for Single ColFamily Nov 18, 2025
Copy link
Contributor

@micheal-o micheal-o left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did a quick pass. It is in the right direction. Just needs some changes.

@zifeif2 zifeif2 force-pushed the repartition-reader-single-cf branch from 42540b1 to 0a878e9 Compare November 19, 2025 02:54
@zifeif2 zifeif2 force-pushed the repartition-reader-single-cf branch from c10eed0 to 99e2412 Compare November 24, 2025 20:08
@micheal-o
Copy link
Contributor

Also CI is failing due to linter error for your changes. PTAL

@zifeif2 zifeif2 force-pushed the repartition-reader-single-cf branch from 194364d to b46e8d1 Compare November 26, 2025 08:18
Copy link
Contributor

@micheal-o micheal-o left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did another round of review. It is almost there. Thanks

@micheal-o
Copy link
Contributor

@zifeif2 please also fix the CI failure.

@zifeif2 zifeif2 force-pushed the repartition-reader-single-cf branch from 0f8f7d3 to 392d498 Compare December 1, 2025 17:07
@zifeif2 zifeif2 force-pushed the repartition-reader-single-cf branch from 392d498 to 48521c3 Compare December 1, 2025 22:33
Copy link
Contributor

@micheal-o micheal-o left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stamped with some minor comments. Mostly looks good now. Thanks

Seq(INTERNAL_ONLY_READ_ALL_COLUMN_FAMILIES, STATE_VAR_NAME))
}

if (internalOnlyReadAllColumnFamilies && joinSide != JoinSideValues.none) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


def unsupportedStateStoreProviderError(
checkpointLocation: String,
providerClass: String): StateRepartitionUnsupportedProviderError = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: return type StateRepartitionInvalidCheckpointError

subClass = "UNSUPPORTED_OFFSET_SEQ_VERSION",
messageParameters = Map("version" -> version.toString))

class StateRepartitionUnsupportedProviderError(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still haven't fixed this indentation

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants