@@ -26,7 +26,7 @@ import org.apache.spark.sql.execution.streaming.operators.stateful.join.Symmetri
2626import org .apache .spark .sql .execution .streaming .operators .stateful .transformwithstate .{StateVariableType , TransformWithStateVariableInfo }
2727import org .apache .spark .sql .execution .streaming .state ._
2828import org .apache .spark .sql .execution .streaming .state .RecordType .{getRecordTypeAsString , RecordType }
29- import org .apache .spark .sql .types .{BinaryType , NullType , StructField , StructType }
29+ import org .apache .spark .sql .types .{NullType , StructField , StructType }
3030import org .apache .spark .unsafe .types .UTF8String
3131import org .apache .spark .util .{NextIterator , SerializableConfiguration }
3232
@@ -85,20 +85,16 @@ abstract class StatePartitionReaderBase(
8585 private val schemaForValueRow : StructType =
8686 StructType (Array (StructField (" __dummy__" , NullType )))
8787
88- protected val keySchema = {
88+ protected lazy val keySchema = {
8989 if (SchemaUtil .checkVariableType(stateVariableInfoOpt, StateVariableType .MapState )) {
9090 SchemaUtil .getCompositeKeySchema(schema, partition.sourceOptions)
91- } else if (partition.sourceOptions.readAllColumnFamilies) {
92- new StructType ().add(" keyBytes" , BinaryType , nullable = false )
9391 } else {
9492 SchemaUtil .getSchemaAsDataType(schema, " key" ).asInstanceOf [StructType ]
9593 }
9694 }
9795
98- protected val valueSchema = if (stateVariableInfoOpt.isDefined) {
96+ protected lazy val valueSchema = if (stateVariableInfoOpt.isDefined) {
9997 schemaForValueRow
100- } else if (partition.sourceOptions.readAllColumnFamilies) {
101- new StructType ().add(" valueBytes" , BinaryType , nullable = false )
10298 } else {
10399 SchemaUtil .getSchemaAsDataType(
104100 schema, " value" ).asInstanceOf [StructType ]
@@ -289,9 +285,10 @@ class StatePartitionReaderAllColumnFamilies(
289285 // loading data from disk, so we disable it for raw bytes mode.
290286 val modifiedStoreConf = storeConf.withFormatValidationDisabled()
291287
292- val keyStateEncoderSpec = NoPrefixKeyStateEncoderSpec (keySchema)
288+ val keyStateEncoderSpec = NoPrefixKeyStateEncoderSpec (new StructType ())
289+ // Pass in empty keySchema, valueSchema and dummy encoder because we don't encode any data
293290 val provider = StateStoreProvider .createAndInit(
294- stateStoreProviderId, keySchema, valueSchema , keyStateEncoderSpec,
291+ stateStoreProviderId, new StructType (), new StructType () , keyStateEncoderSpec,
295292 useColumnFamilies = colFamilyNames.nonEmpty, modifiedStoreConf, hadoopConf.value, false , None )
296293
297294 provider
0 commit comments