Skip to content

Commit a93fe24

Browse files
author
Ubuntu
committed
add more test
1 parent f570573 commit a93fe24

File tree

3 files changed

+44
-4
lines changed

3 files changed

+44
-4
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,8 @@ class StateDataSource extends TableProvider with DataSourceRegister with Logging
7070
&& !stateConf.providerClass.contains("RocksDB")) {
7171
throw StateDataSourceErrors.invalidOptionValue(
7272
StateSourceOptions.INTERNAL_ONLY_READ_ALL_COLUMN_FAMILIES,
73-
"internalOnlyReadAllColumnFamilies is only supported with RocksDBStateStoreProvider. " +
74-
s"Current provider: ${stateConf.providerClass}")
73+
"internalOnlyReadAllColumnFamilies=true is only supported with " +
74+
s"RocksDBStateStoreProvider. Current provider: ${stateConf.providerClass}")
7575
}
7676
val stateStoreReaderInfo: StateStoreReaderInfo = getStoreMetadataAndRunChecks(
7777
sourceOptions)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/utils/SchemaUtil.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,6 @@ object SchemaUtil {
101101
row.update(1, pair._1.getBytes)
102102
row.update(2, pair._2.getBytes)
103103
row.update(3, UTF8String.fromString(colFamilyName))
104-
// row.update(4, pair._2)
105104
row
106105
}
107106

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReaderAllColumnFamiliesSuite.scala

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2.state
1919
import org.apache.spark.sql.{DataFrame, Row}
2020
import org.apache.spark.sql.catalyst.expressions.{GenericRowWithSchema, UnsafeRow}
2121
import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
22-
import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
22+
import org.apache.spark.sql.execution.streaming.state.{HDFSBackedStateStoreProvider, RocksDBStateStoreProvider}
2323
import org.apache.spark.sql.functions.{count, sum}
2424
import org.apache.spark.sql.internal.SQLConf
2525
import org.apache.spark.sql.streaming.OutputMode
@@ -200,4 +200,45 @@ class StatePartitionReaderAllColumnFamiliesSuite extends StateDataSourceTestBase
200200
}
201201
}
202202
}
203+
204+
test("internalOnlyReadAllColumnFamilies should fail with HDFS-backed state store") {
205+
withTempDir { tempDir =>
206+
withSQLConf(
207+
SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[HDFSBackedStateStoreProvider].getName,
208+
SQLConf.SHUFFLE_PARTITIONS.key -> "2") {
209+
210+
val inputData = MemoryStream[Int]
211+
val aggregated = inputData.toDF()
212+
.selectExpr("value", "value % 10 AS groupKey")
213+
.groupBy($"groupKey")
214+
.agg(
215+
count("*").as("cnt"),
216+
sum("value").as("sum")
217+
)
218+
.as[(Int, Long, Long)]
219+
220+
testStream(aggregated, OutputMode.Update)(
221+
StartStream(checkpointLocation = tempDir.getAbsolutePath),
222+
// batch 0
223+
AddData(inputData, 0 until 1: _*),
224+
CheckLastBatch(
225+
(0, 1, 0)
226+
),
227+
StopStream
228+
)
229+
230+
// Attempt to read with internalOnlyReadAllColumnFamilies=true should fail
231+
val e = intercept[StateDataSourceException] {
232+
spark.read
233+
.format("statestore")
234+
.option(StateSourceOptions.PATH, tempDir.getAbsolutePath)
235+
.option(StateSourceOptions.INTERNAL_ONLY_READ_ALL_COLUMN_FAMILIES, "true")
236+
.load()
237+
.collect()
238+
}
239+
assert(e.getMessage.contains("internalOnlyReadAllColumnFamilies=true is only " +
240+
s"supported with RocksDBStateStoreProvider"))
241+
}
242+
}
243+
}
203244
}

0 commit comments

Comments
 (0)