Skip to content

Commit 279ddf5

Browse files
committed
address comment
1 parent a93fe24 commit 279ddf5

File tree

8 files changed

+540
-271
lines changed

8 files changed

+540
-271
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5507,6 +5507,19 @@
55075507
},
55085508
"sqlState" : "42616"
55095509
},
5510+
"STATE_REPARTITION_INVALID_STATE_STORE_CONFIG": {
5511+
"message" : [
5512+
"StateStoreConfig <configName> is invalid:"
5513+
],
5514+
"subClass" : {
5515+
"UNSUPPORTED_PROVIDER" : {
5516+
"message" : [
5517+
"<provider> is not supported"
5518+
]
5519+
}
5520+
},
5521+
"sqlState" : "42617"
5522+
},
55105523
"STATE_STORE_CANNOT_CREATE_COLUMN_FAMILY_WITH_RESERVED_CHARS" : {
55115524
"message" : [
55125525
"Failed to create column family with unsupported starting character and name=<colFamilyName>."

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ import org.apache.spark.sql.execution.streaming.operators.stateful.transformwith
4141
import org.apache.spark.sql.execution.streaming.operators.stateful.transformwithstate.timers.TimerStateUtils
4242
import org.apache.spark.sql.execution.streaming.runtime.StreamingCheckpointConstants.DIR_NAME_STATE
4343
import org.apache.spark.sql.execution.streaming.runtime.StreamingQueryCheckpointMetadata
44-
import org.apache.spark.sql.execution.streaming.state.{InMemoryStateSchemaProvider, KeyStateEncoderSpec, NoPrefixKeyStateEncoderSpec, PrefixKeyScanStateEncoderSpec, StateSchemaCompatibilityChecker, StateSchemaMetadata, StateSchemaProvider, StateStore, StateStoreColFamilySchema, StateStoreConf, StateStoreId, StateStoreProviderId}
44+
import org.apache.spark.sql.execution.streaming.state.{InMemoryStateSchemaProvider, KeyStateEncoderSpec, NoPrefixKeyStateEncoderSpec, PrefixKeyScanStateEncoderSpec, RocksDBStateStoreProvider, StateSchemaCompatibilityChecker, StateSchemaMetadata, StateSchemaProvider, StateStore, StateStoreColFamilySchema, StateStoreConf, StateStoreId, StateStoreProviderId}
45+
import org.apache.spark.sql.execution.streaming.state.OfflineStateRepartitionErrors
4546
import org.apache.spark.sql.execution.streaming.utils.StreamingUtils
4647
import org.apache.spark.sql.sources.DataSourceRegister
4748
import org.apache.spark.sql.streaming.TimeMode
@@ -67,11 +68,9 @@ class StateDataSource extends TableProvider with DataSourceRegister with Logging
6768
StateSourceOptions.apply(session, hadoopConf, properties))
6869
val stateConf = buildStateStoreConf(sourceOptions.resolvedCpLocation, sourceOptions.batchId)
6970
if (sourceOptions.internalOnlyReadAllColumnFamilies
70-
&& !stateConf.providerClass.contains("RocksDB")) {
71-
throw StateDataSourceErrors.invalidOptionValue(
72-
StateSourceOptions.INTERNAL_ONLY_READ_ALL_COLUMN_FAMILIES,
73-
"internalOnlyReadAllColumnFamilies=true is only supported with " +
74-
s"RocksDBStateStoreProvider. Current provider: ${stateConf.providerClass}")
71+
&& stateConf.providerClass != classOf[RocksDBStateStoreProvider].getName) {
72+
throw OfflineStateRepartitionErrors.unsupportedStateStoreProviderError(
73+
stateConf.providerClass)
7574
}
7675
val stateStoreReaderInfo: StateStoreReaderInfo = getStoreMetadataAndRunChecks(
7776
sourceOptions)
@@ -379,7 +378,7 @@ case class StateSourceOptions(
379378
stateVarName: Option[String],
380379
readRegisteredTimers: Boolean,
381380
flattenCollectionTypes: Boolean,
382-
internalOnlyReadAllColumnFamilies: Boolean,
381+
internalOnlyReadAllColumnFamilies: Boolean = false,
383382
startOperatorStateUniqueIds: Option[Array[Array[String]]] = None,
384383
endOperatorStateUniqueIds: Option[Array[Array[String]]] = None) {
385384
def stateCheckpointLocation: Path = new Path(resolvedCpLocation, DIR_NAME_STATE)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,11 @@ class StatePartitionReaderFactory(
5050
override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
5151
val stateStoreInputPartition = partition.asInstanceOf[StateStoreInputPartition]
5252
if (stateStoreInputPartition.sourceOptions.internalOnlyReadAllColumnFamilies) {
53-
new StatePartitionReaderAllColumnFamilies(storeConf, hadoopConf,
53+
val modifiedStoreConf = storeConf.withExtraOptions(Map(
54+
StateStoreConf.FORMAT_VALIDATION_ENABLED_CONFIG -> "false",
55+
StateStoreConf.FORMAT_VALIDATION_CHECK_VALUE_CONFIG -> "false"
56+
))
57+
new StatePartitionAllColumnFamiliesReader(modifiedStoreConf, hadoopConf,
5458
stateStoreInputPartition, schema, keyStateEncoderSpec)
5559
} else if (stateStoreInputPartition.sourceOptions.readChangeFeed) {
5660
new StateStoreChangeDataPartitionReader(storeConf, hadoopConf,
@@ -87,13 +91,17 @@ abstract class StatePartitionReaderBase(
8791
protected val keySchema = {
8892
if (SchemaUtil.checkVariableType(stateVariableInfoOpt, StateVariableType.MapState)) {
8993
SchemaUtil.getCompositeKeySchema(schema, partition.sourceOptions)
94+
} else if (partition.sourceOptions.internalOnlyReadAllColumnFamilies) {
95+
schemaForValueRow
9096
} else {
9197
SchemaUtil.getSchemaAsDataType(schema, "key").asInstanceOf[StructType]
9298
}
9399
}
94100

95101
protected val valueSchema = if (stateVariableInfoOpt.isDefined) {
96102
schemaForValueRow
103+
} else if (partition.sourceOptions.internalOnlyReadAllColumnFamilies) {
104+
schemaForValueRow
97105
} else {
98106
SchemaUtil.getSchemaAsDataType(
99107
schema, "value").asInstanceOf[StructType]
@@ -243,14 +251,17 @@ class StatePartitionReader(
243251
/**
244252
* An implementation of [[StatePartitionReaderBase]] for reading all column families
245253
* in binary format. This reader returns raw key and value bytes along with column family names.
254+
* We are returning key/value bytes because each column family can have different schema
246255
*/
247-
class StatePartitionReaderAllColumnFamilies(
256+
class StatePartitionAllColumnFamiliesReader(
248257
storeConf: StateStoreConf,
249258
hadoopConf: SerializableConfiguration,
250259
partition: StateStoreInputPartition,
251260
schema: StructType,
252261
keyStateEncoderSpec: KeyStateEncoderSpec)
253-
extends StatePartitionReaderBase(storeConf, hadoopConf, partition, schema,
262+
extends StatePartitionReaderBase(
263+
storeConf,
264+
hadoopConf, partition, schema,
254265
keyStateEncoderSpec, None, None, None, None) {
255266

256267
private lazy val store: ReadStateStore = {
@@ -263,7 +274,6 @@ class StatePartitionReaderAllColumnFamilies(
263274
}
264275

265276
override lazy val iter: Iterator[InternalRow] = {
266-
// Single store with column families (join v3, transformWithState, or simple operators)
267277
store
268278
.iterator()
269279
.map { pair =>

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/utils/SchemaUtil.scala

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -63,15 +63,12 @@ object SchemaUtil {
6363
.add("partition_id", IntegerType)
6464
} else if (sourceOptions.internalOnlyReadAllColumnFamilies) {
6565
new StructType()
66-
// todo: change this to some more specific type after we
66+
// todo [SPARK-54443]: change keySchema to more specific type after we
6767
// can extract partition key from keySchema
6868
.add("partition_key", keySchema)
6969
.add("key_bytes", BinaryType)
7070
.add("value_bytes", BinaryType)
7171
.add("column_family_name", StringType)
72-
// need key and value schema so that state store can encode data
73-
.add("value", valueSchema)
74-
.add("key", keySchema)
7572
} else {
7673
new StructType()
7774
.add("key", keySchema)
@@ -89,14 +86,18 @@ object SchemaUtil {
8986
}
9087

9188
/**
92-
* Creates a unified row from raw key and value bytes.
93-
* This is an alias for unifyStateRowPairAsBytes that takes individual byte arrays
94-
* instead of a tuple for better readability.
89+
* Returns an InternalRow representing
90+
* 1. partitionKey
91+
* 2. key in bytes
92+
* 3. value in bytes
93+
* 4. column family name
9594
*/
9695
def unifyStateRowPairAsRawBytes(
97-
pair: (UnsafeRow, UnsafeRow),
98-
colFamilyName: String): InternalRow = {
99-
val row = new GenericInternalRow(6)
96+
pair: (UnsafeRow, UnsafeRow),
97+
colFamilyName: String): InternalRow = {
98+
val row = new GenericInternalRow(4)
99+
// todo [SPARK-54443]: change keySchema to more specific type after we
100+
// can extract partition key from keySchema
100101
row.update(0, pair._1)
101102
row.update(1, pair._1.getBytes)
102103
row.update(2, pair._2.getBytes)
@@ -261,9 +262,9 @@ object SchemaUtil {
261262
"expiration_timestamp_ms" -> classOf[LongType],
262263
"partition_id" -> classOf[IntegerType],
263264
"partition_key" -> classOf[StructType],
264-
"key_bytes"->classOf[BinaryType],
265-
"value_bytes"->classOf[BinaryType],
266-
"column_family_name"->classOf[StringType])
265+
"key_bytes" -> classOf[BinaryType],
266+
"value_bytes" -> classOf[BinaryType],
267+
"column_family_name" -> classOf[StringType])
267268

268269
val expectedFieldNames = if (transformWithStateVariableInfoOpt.isDefined) {
269270
val stateVarInfo = transformWithStateVariableInfoOpt.get
@@ -305,7 +306,7 @@ object SchemaUtil {
305306
} else if (sourceOptions.readChangeFeed) {
306307
Seq("batch_id", "change_type", "key", "value", "partition_id")
307308
} else if (sourceOptions.internalOnlyReadAllColumnFamilies) {
308-
Seq("partition_key", "key_bytes", "value_bytes", "column_family_name", "value", "key")
309+
Seq("partition_key", "key_bytes", "value_bytes", "column_family_name")
309310
} else {
310311
Seq("key", "value", "partition_id")
311312
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OfflineStateRepartitionErrors.scala

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.execution.streaming.state
1919

20-
import org.apache.spark.{SparkIllegalArgumentException, SparkIllegalStateException}
20+
import org.apache.spark.{SparkIllegalArgumentException, SparkIllegalStateException, SparkRuntimeException}
2121

2222
/**
2323
* Errors thrown by Offline state repartitioning.
@@ -85,6 +85,12 @@ object OfflineStateRepartitionErrors {
8585
version: Int): StateRepartitionInvalidCheckpointError = {
8686
new StateRepartitionUnsupportedOffsetSeqVersionError(checkpointLocation, version)
8787
}
88+
89+
def unsupportedStateStoreProviderError(
90+
providerClass: String
91+
): StateRepartitionInvalidStateStoreConfigUnsupportedProviderError = {
92+
new StateRepartitionInvalidStateStoreConfigUnsupportedProviderError(providerClass)
93+
}
8894
}
8995

9096
/**
@@ -201,3 +207,19 @@ class StateRepartitionUnsupportedOffsetSeqVersionError(
201207
checkpointLocation,
202208
subClass = "UNSUPPORTED_OFFSET_SEQ_VERSION",
203209
messageParameters = Map("version" -> version.toString))
210+
211+
abstract class StateRepartitionInvalidStateStoreConfigError(
212+
configName: String,
213+
subClass: String,
214+
messageParameters: Map[String, String] = Map.empty,
215+
cause: Throwable = null)
216+
extends SparkRuntimeException(
217+
errorClass = s"STATE_REPARTITION_INVALID_STATE_STORE_CONFIG.$subClass",
218+
messageParameters = Map("configName" -> configName) ++ messageParameters,
219+
cause = cause)
220+
221+
class StateRepartitionInvalidStateStoreConfigUnsupportedProviderError(
222+
provider: String) extends StateRepartitionInvalidStateStoreConfigError(
223+
"SQLConf.STATE_STORE_PROVIDER_CLASS.key",
224+
subClass = "UNSUPPORTED_PROVIDER",
225+
messageParameters = Map("provider" -> provider))

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,18 @@ class StateStoreConf(
3030

3131
def this() = this(new SQLConf)
3232

33+
def withExtraOptions(additionalOptions: Map[String, String]): StateStoreConf = {
34+
val reconstructedSqlConf = {
35+
// Reconstruct a SQLConf with the all settings preserved because sqlConf is transient
36+
val conf = new SQLConf()
37+
// Restore all state store related settings
38+
sqlConfs.foreach { case (key, value) =>
39+
conf.setConfString(key, value)
40+
}
41+
conf
42+
}
43+
new StateStoreConf(reconstructedSqlConf, extraOptions ++ additionalOptions)
44+
}
3345
/**
3446
* Size of MaintenanceThreadPool to perform maintenance tasks for StateStore
3547
*/
@@ -83,7 +95,9 @@ class StateStoreConf(
8395
val providerClass: String = sqlConf.stateStoreProviderClass
8496

8597
/** Whether validate the underlying format or not. */
86-
val formatValidationEnabled: Boolean = sqlConf.stateStoreFormatValidationEnabled
98+
val formatValidationEnabled: Boolean = extraOptions.getOrElse(
99+
StateStoreConf.FORMAT_VALIDATION_ENABLED_CONFIG,
100+
sqlConf.stateStoreFormatValidationEnabled) == "true"
87101

88102
/**
89103
* Whether to validate StateStore commits for ForeachBatch sinks to ensure all partitions
@@ -166,6 +180,7 @@ class StateStoreConf(
166180
}
167181

168182
object StateStoreConf {
183+
val FORMAT_VALIDATION_ENABLED_CONFIG = "formatValidationEnabled"
169184
val FORMAT_VALIDATION_CHECK_VALUE_CONFIG = "formatValidationCheckValue"
170185

171186
val empty = new StateStoreConf()

0 commit comments

Comments
 (0)