Skip to content

Commit fa776e1

Browse files
committed
small changes
1 parent 251f306 commit fa776e1

File tree

6 files changed

+38
-34
lines changed

6 files changed

+38
-34
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ class StateDataSource extends TableProvider with DataSourceRegister with Logging
6767
val sourceOptions = StateSourceOptions.modifySourceOptions(hadoopConf,
6868
StateSourceOptions.apply(session, hadoopConf, properties))
6969
val stateConf = buildStateStoreConf(sourceOptions.resolvedCpLocation, sourceOptions.batchId)
70+
// We only support RocksDB because the repartition work that this reader
71+
// is built for only supports RocksDB
7072
if (sourceOptions.internalOnlyReadAllColumnFamilies
7173
&& stateConf.providerClass != classOf[RocksDBStateStoreProvider].getName) {
7274
throw OfflineStateRepartitionErrors.unsupportedStateStoreProviderError(
@@ -386,8 +388,8 @@ case class StateSourceOptions(
386388
override def toString: String = {
387389
var desc = s"StateSourceOptions(checkpointLocation=$resolvedCpLocation, batchId=$batchId, " +
388390
s"operatorId=$operatorId, storeName=$storeName, joinSide=$joinSide, " +
389-
s"stateVarName=${stateVarName.getOrElse("None")}, +" +
390-
s"flattenCollectionTypes=$flattenCollectionTypes" +
391+
s"stateVarName=${stateVarName.getOrElse("None")}, " +
392+
s"flattenCollectionTypes=$flattenCollectionTypes, " +
391393
s"internalOnlyReadAllColumnFamilies=$internalOnlyReadAllColumnFamilies"
392394
if (fromSnapshotOptions.isDefined) {
393395
desc += s", snapshotStartBatchId=${fromSnapshotOptions.get.snapshotStartBatchId}"
@@ -502,8 +504,7 @@ object StateSourceOptions extends DataSourceOptions {
502504
val readChangeFeed = Option(options.get(READ_CHANGE_FEED)).exists(_.toBoolean)
503505

504506
val internalOnlyReadAllColumnFamilies = try {
505-
Option(options.get(INTERNAL_ONLY_READ_ALL_COLUMN_FAMILIES))
506-
.map(_.toBoolean).getOrElse(false)
507+
Option(options.get(INTERNAL_ONLY_READ_ALL_COLUMN_FAMILIES)).exists(_.toBoolean)
507508
} catch {
508509
case _: IllegalArgumentException =>
509510
throw StateDataSourceErrors.invalidOptionValue(INTERNAL_ONLY_READ_ALL_COLUMN_FAMILIES,
@@ -648,8 +649,8 @@ object StateSourceOptions extends DataSourceOptions {
648649
StateSourceOptions(
649650
resolvedCpLocation, batchId.get, operatorId, storeName, joinSide,
650651
readChangeFeed, fromSnapshotOptions, readChangeFeedOptions,
651-
stateVarName, readRegisteredTimers, flattenCollectionTypes,
652-
internalOnlyReadAllColumnFamilies, startOperatorStateUniqueIds, endOperatorStateUniqueIds)
652+
stateVarName, readRegisteredTimers, flattenCollectionTypes, internalOnlyReadAllColumnFamilies,
653+
startOperatorStateUniqueIds, endOperatorStateUniqueIds)
653654
}
654655

655656
private def getLastCommittedBatch(session: SparkSession, checkpointLocation: String): Long = {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,11 @@ class StatePartitionReaderFactory(
5050
override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
5151
val stateStoreInputPartition = partition.asInstanceOf[StateStoreInputPartition]
5252
if (stateStoreInputPartition.sourceOptions.internalOnlyReadAllColumnFamilies) {
53+
// Disable format validation because the schema returned by
54+
// StatePartitionAllColumnFamiliesReader does not contain the corresponding
55+
// keySchema or valueSchema.
56+
// It's safe to do so we also don't expect the caller of StatePartitionAllColumnFamiliesReader
57+
// to extract specific fields out of the returning row.
5358
val modifiedStoreConf = storeConf.withExtraOptions(Map(
5459
StateStoreConf.FORMAT_VALIDATION_ENABLED_CONFIG -> "false",
5560
StateStoreConf.FORMAT_VALIDATION_CHECK_VALUE_CONFIG -> "false"
@@ -85,23 +90,24 @@ abstract class StatePartitionReaderBase(
8590
extends PartitionReader[InternalRow] with Logging {
8691
// Used primarily as a placeholder for the value schema in the context of
8792
// state variables used within the transformWithState operator.
88-
private val schemaForValueRow: StructType =
93+
// Also used as a placeholder for both key and value schema for
94+
// StatePartitionAllColumnFamiliesReader
95+
private val placeholderSchema: StructType =
8996
StructType(Array(StructField("__dummy__", NullType)))
9097

9198
protected val keySchema = {
9299
if (SchemaUtil.checkVariableType(stateVariableInfoOpt, StateVariableType.MapState)) {
93100
SchemaUtil.getCompositeKeySchema(schema, partition.sourceOptions)
94101
} else if (partition.sourceOptions.internalOnlyReadAllColumnFamilies) {
95-
schemaForValueRow
102+
placeholderSchema
96103
} else {
97104
SchemaUtil.getSchemaAsDataType(schema, "key").asInstanceOf[StructType]
98105
}
99106
}
100107

101-
protected val valueSchema = if (stateVariableInfoOpt.isDefined) {
102-
schemaForValueRow
103-
} else if (partition.sourceOptions.internalOnlyReadAllColumnFamilies) {
104-
schemaForValueRow
108+
protected val valueSchema = if (stateVariableInfoOpt.isDefined ||
109+
partition.sourceOptions.internalOnlyReadAllColumnFamilies) {
110+
placeholderSchema
105111
} else {
106112
SchemaUtil.getSchemaAsDataType(
107113
schema, "value").asInstanceOf[StructType]
@@ -252,6 +258,7 @@ class StatePartitionReader(
252258
* An implementation of [[StatePartitionReaderBase]] for reading all column families
253259
* in binary format. This reader returns raw key and value bytes along with column family names.
254260
* We are returning key/value bytes because each column family can have different schema
261+
* It will also return the partition key
255262
*/
256263
class StatePartitionAllColumnFamiliesReader(
257264
storeConf: StateStoreConf,

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/utils/SchemaUtil.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ object SchemaUtil {
6363
.add("partition_id", IntegerType)
6464
} else if (sourceOptions.internalOnlyReadAllColumnFamilies) {
6565
new StructType()
66-
// todo [SPARK-54443]: change keySchema to more specific type after we
66+
// todo [SPARK-54443]: change keySchema to a more specific type after we
6767
// can extract partition key from keySchema
6868
.add("partition_key", keySchema)
6969
.add("key_bytes", BinaryType)

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OfflineStateRepartitionErrors.scala

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717

1818
package org.apache.spark.sql.execution.streaming.state
1919

20-
import org.apache.spark.{SparkIllegalArgumentException, SparkIllegalStateException, SparkRuntimeException}
20+
import org.apache.spark.{SparkIllegalArgumentException, SparkIllegalStateException, SparkUnsupportedOperationException}
21+
import org.apache.spark.sql.internal.SQLConf
2122

2223
/**
2324
* Errors thrown by Offline state repartitioning.
@@ -211,15 +212,13 @@ class StateRepartitionUnsupportedOffsetSeqVersionError(
211212
abstract class StateRepartitionInvalidStateStoreConfigError(
212213
configName: String,
213214
subClass: String,
214-
messageParameters: Map[String, String] = Map.empty,
215-
cause: Throwable = null)
216-
extends SparkRuntimeException(
215+
messageParameters: Map[String, String] = Map.empty)
216+
extends SparkUnsupportedOperationException(
217217
errorClass = s"STATE_REPARTITION_INVALID_STATE_STORE_CONFIG.$subClass",
218-
messageParameters = Map("configName" -> configName) ++ messageParameters,
219-
cause = cause)
218+
messageParameters = Map("configName" -> configName) ++ messageParameters)
220219

221220
class StateRepartitionInvalidStateStoreConfigUnsupportedProviderError(
222221
provider: String) extends StateRepartitionInvalidStateStoreConfigError(
223-
"SQLConf.STATE_STORE_PROVIDER_CLASS.key",
222+
SQLConf.STATE_STORE_PROVIDER_CLASS.key,
224223
subClass = "UNSUPPORTED_PROVIDER",
225224
messageParameters = Map("provider" -> provider))

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,10 @@ class StateStoreConf(
9595
val providerClass: String = sqlConf.stateStoreProviderClass
9696

9797
/** Whether validate the underlying format or not. */
98-
val formatValidationEnabled: Boolean = extraOptions.getOrElse(
99-
StateStoreConf.FORMAT_VALIDATION_ENABLED_CONFIG,
100-
sqlConf.stateStoreFormatValidationEnabled) == "true"
98+
val formatValidationEnabled: Boolean = extraOptions.get(
99+
StateStoreConf.FORMAT_VALIDATION_ENABLED_CONFIG)
100+
.map(_ == "true")
101+
.getOrElse(sqlConf.stateStoreFormatValidationEnabled)
101102

102103
/**
103104
* Whether to validate StateStore commits for ForeachBatch sinks to ensure all partitions

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionAllColumnFamiliesReaderSuite.scala

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2.state
1919
import java.nio.ByteOrder
2020
import java.util.Arrays
2121

22-
import org.apache.spark.SparkRuntimeException
22+
import org.apache.spark.SparkUnsupportedOperationException
2323
import org.apache.spark.sql.{DataFrame, Row}
2424
import org.apache.spark.sql.catalyst.CatalystTypeConverters
2525
import org.apache.spark.sql.catalyst.InternalRow
@@ -100,20 +100,16 @@ class StatePartitionAllColumnFamiliesReaderSuite extends StateDataSourceTestBase
100100

101101
// Create projections to convert Row to UnsafeRow bytes
102102
val keyProjection = UnsafeProjection.create(keySchema)
103-
val valueProjection = if (valueSchema.isEmpty) null else UnsafeProjection.create(valueSchema)
103+
val valueProjection = UnsafeProjection.create(valueSchema)
104104

105105
// Create converters to convert external Row types to internal Catalyst types
106106
val keyConverter = CatalystTypeConverters.createToCatalystConverter(keySchema)
107-
val valueConverter = if (valueSchema.isEmpty) {
108-
null
109-
} else {
110-
CatalystTypeConverters.createToCatalystConverter(valueSchema)
111-
}
107+
val valueConverter = CatalystTypeConverters.createToCatalystConverter(valueSchema)
112108

113109
// Convert normal data to bytes
114110
val normalAsBytes = normalDf.map { row =>
115111
val key = row.getStruct(1)
116-
val value = if (row.isNullAt(2) || valueSchema.isEmpty) null else row.getStruct(2)
112+
val value = if (row.isNullAt(2)) null else row.getStruct(2)
117113

118114
// Convert key to InternalRow, then to UnsafeRow, then get bytes
119115
val keyInternalRow = keyConverter(key).asInstanceOf[InternalRow]
@@ -123,7 +119,7 @@ class StatePartitionAllColumnFamiliesReaderSuite extends StateDataSourceTestBase
123119
val keyBytes = keyUnsafeRow.getBytes.clone()
124120

125121
// Convert value to bytes
126-
val valueBytes = if (value == null || valueSchema.isEmpty) {
122+
val valueBytes = if (value == null) {
127123
Array.empty[Byte]
128124
} else {
129125
val valueInternalRow = valueConverter(value).asInstanceOf[InternalRow]
@@ -433,7 +429,7 @@ class StatePartitionAllColumnFamiliesReaderSuite extends StateDataSourceTestBase
433429
)
434430

435431
checkError(
436-
exception = intercept[SparkRuntimeException] {
432+
exception = intercept[SparkUnsupportedOperationException] {
437433
spark.read
438434
.format("statestore")
439435
.option(StateSourceOptions.PATH, tempDir.getAbsolutePath)
@@ -443,7 +439,7 @@ class StatePartitionAllColumnFamiliesReaderSuite extends StateDataSourceTestBase
443439
},
444440
condition = "STATE_REPARTITION_INVALID_STATE_STORE_CONFIG.UNSUPPORTED_PROVIDER",
445441
parameters = Map(
446-
"configName" -> "SQLConf.STATE_STORE_PROVIDER_CLASS.key",
442+
"configName" -> SQLConf.STATE_STORE_PROVIDER_CLASS.key,
447443
"provider" -> classOf[HDFSBackedStateStoreProvider].getName
448444
)
449445
)

0 commit comments

Comments
 (0)