-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-54388][SS] Introduce StatePartitionReader that scan raw bytes for Single ColFamily #53104
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
ac4bd31
f14e024
0cd1330
f6e15ed
158c846
2129dcb
251f306
fa776e1
63e0753
aee5732
48521c3
6003f54
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -41,7 +41,8 @@ import org.apache.spark.sql.execution.streaming.operators.stateful.transformwith | |
| import org.apache.spark.sql.execution.streaming.operators.stateful.transformwithstate.timers.TimerStateUtils | ||
| import org.apache.spark.sql.execution.streaming.runtime.StreamingCheckpointConstants.DIR_NAME_STATE | ||
| import org.apache.spark.sql.execution.streaming.runtime.StreamingQueryCheckpointMetadata | ||
| import org.apache.spark.sql.execution.streaming.state.{InMemoryStateSchemaProvider, KeyStateEncoderSpec, NoPrefixKeyStateEncoderSpec, PrefixKeyScanStateEncoderSpec, StateSchemaCompatibilityChecker, StateSchemaMetadata, StateSchemaProvider, StateStore, StateStoreColFamilySchema, StateStoreConf, StateStoreId, StateStoreProviderId} | ||
| import org.apache.spark.sql.execution.streaming.state.{InMemoryStateSchemaProvider, KeyStateEncoderSpec, NoPrefixKeyStateEncoderSpec, PrefixKeyScanStateEncoderSpec, RocksDBStateStoreProvider, StateSchemaCompatibilityChecker, StateSchemaMetadata, StateSchemaProvider, StateStore, StateStoreColFamilySchema, StateStoreConf, StateStoreId, StateStoreProviderId} | ||
| import org.apache.spark.sql.execution.streaming.state.OfflineStateRepartitionErrors | ||
| import org.apache.spark.sql.execution.streaming.utils.StreamingUtils | ||
| import org.apache.spark.sql.sources.DataSourceRegister | ||
| import org.apache.spark.sql.streaming.TimeMode | ||
|
|
@@ -66,6 +67,14 @@ class StateDataSource extends TableProvider with DataSourceRegister with Logging | |
| val sourceOptions = StateSourceOptions.modifySourceOptions(hadoopConf, | ||
| StateSourceOptions.apply(session, hadoopConf, properties)) | ||
| val stateConf = buildStateStoreConf(sourceOptions.resolvedCpLocation, sourceOptions.batchId) | ||
| // We only support RocksDB because the repartition work that this option | ||
| // is built for only supports RocksDB | ||
| if (sourceOptions.internalOnlyReadAllColumnFamilies | ||
| && stateConf.providerClass != classOf[RocksDBStateStoreProvider].getName) { | ||
| throw OfflineStateRepartitionErrors.unsupportedStateStoreProviderError( | ||
| sourceOptions.resolvedCpLocation, | ||
| stateConf.providerClass) | ||
| } | ||
| val stateStoreReaderInfo: StateStoreReaderInfo = getStoreMetadataAndRunChecks( | ||
| sourceOptions) | ||
|
|
||
|
|
@@ -372,15 +381,17 @@ case class StateSourceOptions( | |
| stateVarName: Option[String], | ||
| readRegisteredTimers: Boolean, | ||
| flattenCollectionTypes: Boolean, | ||
| internalOnlyReadAllColumnFamilies: Boolean = false, | ||
| startOperatorStateUniqueIds: Option[Array[Array[String]]] = None, | ||
| endOperatorStateUniqueIds: Option[Array[Array[String]]] = None) { | ||
| def stateCheckpointLocation: Path = new Path(resolvedCpLocation, DIR_NAME_STATE) | ||
|
|
||
| override def toString: String = { | ||
| var desc = s"StateSourceOptions(checkpointLocation=$resolvedCpLocation, batchId=$batchId, " + | ||
| s"operatorId=$operatorId, storeName=$storeName, joinSide=$joinSide, " + | ||
| s"stateVarName=${stateVarName.getOrElse("None")}, +" + | ||
| s"flattenCollectionTypes=$flattenCollectionTypes" | ||
| s"stateVarName=${stateVarName.getOrElse("None")}, " + | ||
| s"flattenCollectionTypes=$flattenCollectionTypes, " + | ||
| s"internalOnlyReadAllColumnFamilies=$internalOnlyReadAllColumnFamilies" | ||
| if (fromSnapshotOptions.isDefined) { | ||
| desc += s", snapshotStartBatchId=${fromSnapshotOptions.get.snapshotStartBatchId}" | ||
| desc += s", snapshotPartitionId=${fromSnapshotOptions.get.snapshotPartitionId}" | ||
|
|
@@ -407,6 +418,7 @@ object StateSourceOptions extends DataSourceOptions { | |
| val STATE_VAR_NAME = newOption("stateVarName") | ||
| val READ_REGISTERED_TIMERS = newOption("readRegisteredTimers") | ||
| val FLATTEN_COLLECTION_TYPES = newOption("flattenCollectionTypes") | ||
| val INTERNAL_ONLY_READ_ALL_COLUMN_FAMILIES = newOption("internalOnlyReadAllColumnFamilies") | ||
|
|
||
| object JoinSideValues extends Enumeration { | ||
| type JoinSideValues = Value | ||
|
|
@@ -478,6 +490,7 @@ object StateSourceOptions extends DataSourceOptions { | |
| s"Valid values are ${JoinSideValues.values.mkString(",")}") | ||
| } | ||
|
|
||
| // Use storeName rather than joinSide to identify the specific join store | ||
| if (joinSide != JoinSideValues.none && storeName != StateStoreId.DEFAULT_STORE_NAME) { | ||
| throw StateDataSourceErrors.conflictOptions(Seq(JOIN_SIDE, STORE_NAME)) | ||
| } | ||
|
|
@@ -492,6 +505,29 @@ object StateSourceOptions extends DataSourceOptions { | |
|
|
||
| val readChangeFeed = Option(options.get(READ_CHANGE_FEED)).exists(_.toBoolean) | ||
|
|
||
| val internalOnlyReadAllColumnFamilies = try { | ||
| Option(options.get(INTERNAL_ONLY_READ_ALL_COLUMN_FAMILIES)).exists(_.toBoolean) | ||
| } catch { | ||
| case _: IllegalArgumentException => | ||
| throw StateDataSourceErrors.invalidOptionValue(INTERNAL_ONLY_READ_ALL_COLUMN_FAMILIES, | ||
| "Boolean value is expected") | ||
| } | ||
|
|
||
| if (internalOnlyReadAllColumnFamilies && stateVarName.isDefined) { | ||
| throw StateDataSourceErrors.conflictOptions( | ||
| Seq(INTERNAL_ONLY_READ_ALL_COLUMN_FAMILIES, STATE_VAR_NAME)) | ||
| } | ||
|
|
||
| if (internalOnlyReadAllColumnFamilies && joinSide != JoinSideValues.none) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You forgot to address this comment: https://github.com/apache/spark/pull/53104/files#r2567180585 |
||
| throw StateDataSourceErrors.conflictOptions( | ||
| Seq(INTERNAL_ONLY_READ_ALL_COLUMN_FAMILIES, JOIN_SIDE)) | ||
| } | ||
|
|
||
| if (internalOnlyReadAllColumnFamilies && readChangeFeed) { | ||
| throw StateDataSourceErrors.conflictOptions( | ||
| Seq(INTERNAL_ONLY_READ_ALL_COLUMN_FAMILIES, READ_CHANGE_FEED)) | ||
| } | ||
|
|
||
| val changeStartBatchId = Option(options.get(CHANGE_START_BATCH_ID)).map(_.toLong) | ||
| var changeEndBatchId = Option(options.get(CHANGE_END_BATCH_ID)).map(_.toLong) | ||
|
|
||
|
|
@@ -615,7 +651,7 @@ object StateSourceOptions extends DataSourceOptions { | |
| StateSourceOptions( | ||
| resolvedCpLocation, batchId.get, operatorId, storeName, joinSide, | ||
| readChangeFeed, fromSnapshotOptions, readChangeFeedOptions, | ||
| stateVarName, readRegisteredTimers, flattenCollectionTypes, | ||
| stateVarName, readRegisteredTimers, flattenCollectionTypes, internalOnlyReadAllColumnFamilies, | ||
| startOperatorStateUniqueIds, endOperatorStateUniqueIds) | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -85,6 +85,12 @@ object OfflineStateRepartitionErrors { | |
| version: Int): StateRepartitionInvalidCheckpointError = { | ||
| new StateRepartitionUnsupportedOffsetSeqVersionError(checkpointLocation, version) | ||
| } | ||
|
|
||
| def unsupportedStateStoreProviderError( | ||
| checkpointLocation: String, | ||
| providerClass: String): StateRepartitionInvalidCheckpointError = { | ||
| new StateRepartitionUnsupportedProviderError(checkpointLocation, providerClass) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -201,3 +207,11 @@ class StateRepartitionUnsupportedOffsetSeqVersionError( | |
| checkpointLocation, | ||
| subClass = "UNSUPPORTED_OFFSET_SEQ_VERSION", | ||
| messageParameters = Map("version" -> version.toString)) | ||
|
|
||
| class StateRepartitionUnsupportedProviderError( | ||
zifeif2 marked this conversation as resolved.
Show resolved
Hide resolved
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. still haven't fixed this indentation
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @zifeif2 probably my eyes. You can resolve it. |
||
| checkpointLocation: String, | ||
| provider: String) | ||
| extends StateRepartitionInvalidCheckpointError( | ||
| checkpointLocation, | ||
| subClass = "UNSUPPORTED_PROVIDER", | ||
| messageParameters = Map("provider" -> provider)) | ||

Uh oh!
There was an error while loading. Please reload this page.