Skip to content

Commit 194364d

Browse files
committed
address commenet
1 parent 99e2412 commit 194364d

File tree

6 files changed

+153
-82
lines changed

6 files changed

+153
-82
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5480,6 +5480,11 @@
54805480
"message" : [
54815481
"Unsupported offset sequence version <version>. Please make sure the checkpoint is from a supported Spark version (Spark 4.0+)."
54825482
]
5483+
},
5484+
"UNSUPPORTED_PROVIDER" : {
5485+
"message" : [
5486+
"<provider> is not supported"
5487+
]
54835488
}
54845489
},
54855490
"sqlState" : "55019"
@@ -5507,19 +5512,6 @@
55075512
},
55085513
"sqlState" : "42616"
55095514
},
5510-
"STATE_REPARTITION_INVALID_STATE_STORE_CONFIG" : {
5511-
"message" : [
5512-
"StateStoreConfig <configName> is invalid:"
5513-
],
5514-
"subClass" : {
5515-
"UNSUPPORTED_PROVIDER" : {
5516-
"message" : [
5517-
"<provider> is not supported"
5518-
]
5519-
}
5520-
},
5521-
"sqlState" : "42617"
5522-
},
55235515
"STATE_STORE_CANNOT_CREATE_COLUMN_FAMILY_WITH_RESERVED_CHARS" : {
55245516
"message" : [
55255517
"Failed to create column family with unsupported starting character and name=<colFamilyName>."

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,12 @@ class StateDataSource extends TableProvider with DataSourceRegister with Logging
6767
val sourceOptions = StateSourceOptions.modifySourceOptions(hadoopConf,
6868
StateSourceOptions.apply(session, hadoopConf, properties))
6969
val stateConf = buildStateStoreConf(sourceOptions.resolvedCpLocation, sourceOptions.batchId)
70-
// We only support RocksDB because the repartition work that this reader
70+
// We only support RocksDB because the repartition work that this option
7171
// is built for only supports RocksDB
7272
if (sourceOptions.internalOnlyReadAllColumnFamilies
7373
&& stateConf.providerClass != classOf[RocksDBStateStoreProvider].getName) {
7474
throw OfflineStateRepartitionErrors.unsupportedStateStoreProviderError(
75+
sourceOptions.resolvedCpLocation,
7576
stateConf.providerClass)
7677
}
7778
val stateStoreReaderInfo: StateStoreReaderInfo = getStoreMetadataAndRunChecks(

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala

Lines changed: 39 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*/
1717
package org.apache.spark.sql.execution.datasources.v2.state
1818

19+
import scala.collection.mutable
20+
1921
import org.apache.spark.internal.Logging
2022
import org.apache.spark.sql.catalyst.InternalRow
2123
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow}
@@ -50,17 +52,8 @@ class StatePartitionReaderFactory(
5052
override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
5153
val stateStoreInputPartition = partition.asInstanceOf[StateStoreInputPartition]
5254
if (stateStoreInputPartition.sourceOptions.internalOnlyReadAllColumnFamilies) {
53-
// Disable format validation because the schema returned by
54-
// StatePartitionAllColumnFamiliesReader does not contain the corresponding
55-
// keySchema or valueSchema.
56-
// It's safe to do so we also don't expect the caller of StatePartitionAllColumnFamiliesReader
57-
// to extract specific fields out of the returning row.
58-
val modifiedStoreConf = storeConf.withExtraOptions(Map(
59-
StateStoreConf.FORMAT_VALIDATION_ENABLED_CONFIG -> "false",
60-
StateStoreConf.FORMAT_VALIDATION_CHECK_VALUE_CONFIG -> "false"
61-
))
62-
new StatePartitionAllColumnFamiliesReader(modifiedStoreConf, hadoopConf,
63-
stateStoreInputPartition, schema, keyStateEncoderSpec)
55+
new StatePartitionAllColumnFamiliesReader(storeConf, hadoopConf,
56+
stateStoreInputPartition, schema, keyStateEncoderSpec, joinColFamilyOpt)
6457
} else if (stateStoreInputPartition.sourceOptions.readChangeFeed) {
6558
new StateStoreChangeDataPartitionReader(storeConf, hadoopConf,
6659
stateStoreInputPartition, schema, keyStateEncoderSpec, stateVariableInfoOpt,
@@ -99,12 +92,14 @@ abstract class StatePartitionReaderBase(
9992
if (SchemaUtil.checkVariableType(stateVariableInfoOpt, StateVariableType.MapState)) {
10093
SchemaUtil.getCompositeKeySchema(schema, partition.sourceOptions)
10194
} else if (partition.sourceOptions.internalOnlyReadAllColumnFamilies) {
95+
// StatePartitionAllFamiliesReader will have its own provider won't use this keySchema
10296
placeholderSchema
10397
} else {
10498
SchemaUtil.getSchemaAsDataType(schema, "key").asInstanceOf[StructType]
10599
}
106100
}
107101

102+
// StatePartitionAllFamiliesReader will have its own provider won't use this valueSchema
108103
protected val valueSchema = if (stateVariableInfoOpt.isDefined ||
109104
partition.sourceOptions.internalOnlyReadAllColumnFamilies) {
110105
placeholderSchema
@@ -265,7 +260,8 @@ class StatePartitionAllColumnFamiliesReader(
265260
hadoopConf: SerializableConfiguration,
266261
partition: StateStoreInputPartition,
267262
schema: StructType,
268-
keyStateEncoderSpec: KeyStateEncoderSpec)
263+
keyStateEncoderSpec: KeyStateEncoderSpec,
264+
joinColFamilyOpt: Option[String])
269265
extends StatePartitionReaderBase(
270266
storeConf,
271267
hadoopConf, partition, schema,
@@ -280,6 +276,37 @@ class StatePartitionAllColumnFamiliesReader(
280276
)
281277
}
282278

279+
private val colFamilyToSchema : mutable.HashMap[String, StateStoreColFamilySchema] = {
280+
val stateStoreId = StateStoreId(
281+
partition.sourceOptions.stateCheckpointLocation.toString,
282+
partition.sourceOptions.operatorId,
283+
StateStore.PARTITION_ID_TO_CHECK_SCHEMA,
284+
partition.sourceOptions.storeName)
285+
val stateStoreProviderId = StateStoreProviderId(stateStoreId, partition.queryId)
286+
val manager = new StateSchemaCompatibilityChecker(stateStoreProviderId, hadoopConf.value)
287+
val schemaFile = manager.readSchemaFile()
288+
val schemaMap = mutable.HashMap[String, StateStoreColFamilySchema]()
289+
schemaFile.foreach { schema => schemaMap.put(schema.colFamilyName, schema)}
290+
schemaMap
291+
}
292+
293+
override lazy val provider: StateStoreProvider = {
294+
val stateStoreId = StateStoreId(
295+
partition.sourceOptions.stateCheckpointLocation.toString,
296+
partition.sourceOptions.operatorId,
297+
partition.partition,
298+
partition.sourceOptions.storeName)
299+
val stateStoreProviderId = StateStoreProviderId(stateStoreId, partition.queryId)
300+
val provider = StateStoreProvider.createAndInit(
301+
stateStoreProviderId,
302+
colFamilyToSchema(StateStore.DEFAULT_COL_FAMILY_NAME).keySchema,
303+
colFamilyToSchema(StateStore.DEFAULT_COL_FAMILY_NAME).valueSchema,
304+
keyStateEncoderSpec,
305+
useColumnFamilies = false, storeConf, hadoopConf.value,
306+
useMultipleValuesPerKey = false, None)
307+
provider
308+
}
309+
283310
override lazy val iter: Iterator[InternalRow] = {
284311
store
285312
.iterator()

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OfflineStateRepartitionErrors.scala

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@
1717

1818
package org.apache.spark.sql.execution.streaming.state
1919

20-
import org.apache.spark.{SparkIllegalArgumentException, SparkIllegalStateException, SparkUnsupportedOperationException}
21-
import org.apache.spark.sql.internal.SQLConf
20+
import org.apache.spark.{SparkIllegalArgumentException, SparkIllegalStateException}
2221

2322
/**
2423
* Errors thrown by Offline state repartitioning.
@@ -88,9 +87,10 @@ object OfflineStateRepartitionErrors {
8887
}
8988

9089
def unsupportedStateStoreProviderError(
90+
checkpointLocation: String,
9191
providerClass: String
92-
): StateRepartitionInvalidStateStoreConfigUnsupportedProviderError = {
93-
new StateRepartitionInvalidStateStoreConfigUnsupportedProviderError(providerClass)
92+
): StateRepartitionUnsupportedProviderError = {
93+
new StateRepartitionUnsupportedProviderError(checkpointLocation, providerClass)
9494
}
9595
}
9696

@@ -209,16 +209,9 @@ class StateRepartitionUnsupportedOffsetSeqVersionError(
209209
subClass = "UNSUPPORTED_OFFSET_SEQ_VERSION",
210210
messageParameters = Map("version" -> version.toString))
211211

212-
abstract class StateRepartitionInvalidStateStoreConfigError(
213-
configName: String,
214-
subClass: String,
215-
messageParameters: Map[String, String] = Map.empty)
216-
extends SparkUnsupportedOperationException(
217-
errorClass = s"STATE_REPARTITION_INVALID_STATE_STORE_CONFIG.$subClass",
218-
messageParameters = Map("configName" -> configName) ++ messageParameters)
219-
220-
class StateRepartitionInvalidStateStoreConfigUnsupportedProviderError(
221-
provider: String) extends StateRepartitionInvalidStateStoreConfigError(
222-
SQLConf.STATE_STORE_PROVIDER_CLASS.key,
212+
class StateRepartitionUnsupportedProviderError(
213+
checkpointLocation: String,
214+
provider: String) extends StateRepartitionInvalidCheckpointError(
215+
checkpointLocation,
223216
subClass = "UNSUPPORTED_PROVIDER",
224217
messageParameters = Map("provider" -> provider))

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,6 @@ class StateStoreConf(
3030

3131
def this() = this(new SQLConf)
3232

33-
def withExtraOptions(additionalOptions: Map[String, String]): StateStoreConf = {
34-
val reconstructedSqlConf = {
35-
// Reconstruct a SQLConf with the all settings preserved because sqlConf is transient
36-
val conf = new SQLConf()
37-
// Restore all state store related settings
38-
sqlConfs.foreach { case (key, value) =>
39-
conf.setConfString(key, value)
40-
}
41-
conf
42-
}
43-
new StateStoreConf(reconstructedSqlConf, extraOptions ++ additionalOptions)
44-
}
4533
/**
4634
* Size of MaintenanceThreadPool to perform maintenance tasks for StateStore
4735
*/
@@ -95,10 +83,7 @@ class StateStoreConf(
9583
val providerClass: String = sqlConf.stateStoreProviderClass
9684

9785
/** Whether validate the underlying format or not. */
98-
val formatValidationEnabled: Boolean = extraOptions.get(
99-
StateStoreConf.FORMAT_VALIDATION_ENABLED_CONFIG)
100-
.map(_ == "true")
101-
.getOrElse(sqlConf.stateStoreFormatValidationEnabled)
86+
val formatValidationEnabled: Boolean = sqlConf.stateStoreFormatValidationEnabled
10287

10388
/**
10489
* Whether to validate StateStore commits for ForeachBatch sinks to ensure all partitions
@@ -181,7 +166,6 @@ class StateStoreConf(
181166
}
182167

183168
object StateStoreConf {
184-
val FORMAT_VALIDATION_ENABLED_CONFIG = "formatValidationEnabled"
185169
val FORMAT_VALIDATION_CHECK_VALUE_CONFIG = "formatValidationCheckValue"
186170

187171
val empty = new StateStoreConf()

0 commit comments

Comments
 (0)