@@ -276,80 +276,80 @@ class StatePartitionAllColumnFamiliesReaderSuite extends StateDataSourceTestBase
276276
277277 testWithChangelogConfig(" all-column-families: dropDuplicates validation" ) {
278278 withTempDir { tempDir =>
279- runDropDuplicatesQuery(tempDir.getAbsolutePath)
279+ runDropDuplicatesQuery(tempDir.getAbsolutePath)
280280
281- val keySchema = StructType (Array (
282- StructField (" value" , IntegerType , nullable = false ),
283- StructField (" eventTime" , org.apache.spark.sql.types.TimestampType )
284- ))
285- val valueSchema = StructType (Array (
286- StructField (" __dummy__" , NullType , nullable = true )
287- ))
281+ val keySchema = StructType (Array (
282+ StructField (" value" , IntegerType , nullable = false ),
283+ StructField (" eventTime" , org.apache.spark.sql.types.TimestampType )
284+ ))
285+ val valueSchema = StructType (Array (
286+ StructField (" __dummy__" , NullType , nullable = true )
287+ ))
288288
289- val normalData = getNormalReadDf(tempDir.getAbsolutePath).collect()
290- val bytesDf = getBytesReadDf(tempDir.getAbsolutePath).collect()
289+ val normalData = getNormalReadDf(tempDir.getAbsolutePath).collect()
290+ val bytesDf = getBytesReadDf(tempDir.getAbsolutePath).collect()
291291
292- compareNormalAndBytesData(normalData, bytesDf, " default" , keySchema, valueSchema)
292+ compareNormalAndBytesData(normalData, bytesDf, " default" , keySchema, valueSchema)
293293 }
294294 }
295295
296296 testWithChangelogConfig(" all-column-families: dropDuplicates with column specified" ) {
297297 withTempDir { tempDir =>
298- runDropDuplicatesQueryWithColumnSpecified(tempDir.getAbsolutePath)
298+ runDropDuplicatesQueryWithColumnSpecified(tempDir.getAbsolutePath)
299299
300- val keySchema = StructType (Array (
301- StructField (" col1" , org.apache.spark.sql.types.StringType , nullable = true )
302- ))
303- val valueSchema = StructType (Array (
304- StructField (" __dummy__" , NullType , nullable = true )
305- ))
300+ val keySchema = StructType (Array (
301+ StructField (" col1" , org.apache.spark.sql.types.StringType , nullable = true )
302+ ))
303+ val valueSchema = StructType (Array (
304+ StructField (" __dummy__" , NullType , nullable = true )
305+ ))
306306
307- val normalData = getNormalReadDf(tempDir.getAbsolutePath).collect()
308- val bytesDf = getBytesReadDf(tempDir.getAbsolutePath).collect()
307+ val normalData = getNormalReadDf(tempDir.getAbsolutePath).collect()
308+ val bytesDf = getBytesReadDf(tempDir.getAbsolutePath).collect()
309309
310- compareNormalAndBytesData(normalData, bytesDf, " default" , keySchema, valueSchema)
310+ compareNormalAndBytesData(normalData, bytesDf, " default" , keySchema, valueSchema)
311311 }
312312 }
313313
314314 testWithChangelogConfig(" all-column-families: dropDuplicatesWithinWatermark" ) {
315315 withTempDir { tempDir =>
316- runDropDuplicatesWithinWatermarkQuery(tempDir.getAbsolutePath)
316+ runDropDuplicatesWithinWatermarkQuery(tempDir.getAbsolutePath)
317317
318- val keySchema = StructType (Array (
319- StructField (" _1" , org.apache.spark.sql.types.StringType , nullable = true )
320- ))
321- val valueSchema = StructType (Array (
322- StructField (" expiresAtMicros" , LongType , nullable = false )
323- ))
318+ val keySchema = StructType (Array (
319+ StructField (" _1" , org.apache.spark.sql.types.StringType , nullable = true )
320+ ))
321+ val valueSchema = StructType (Array (
322+ StructField (" expiresAtMicros" , LongType , nullable = false )
323+ ))
324324
325- val normalData = getNormalReadDf(tempDir.getAbsolutePath).collect()
326- val bytesDf = getBytesReadDf(tempDir.getAbsolutePath).collect()
325+ val normalData = getNormalReadDf(tempDir.getAbsolutePath).collect()
326+ val bytesDf = getBytesReadDf(tempDir.getAbsolutePath).collect()
327327
328- compareNormalAndBytesData(normalData, bytesDf, " default" , keySchema, valueSchema)
328+ compareNormalAndBytesData(normalData, bytesDf, " default" , keySchema, valueSchema)
329329 }
330330 }
331331
332332 testWithChangelogConfig(" all-column-families: session window aggregation" ) {
333333 withTempDir { tempDir =>
334- runSessionWindowAggregationQuery(tempDir.getAbsolutePath)
335-
336- val keySchema = StructType (Array (
337- StructField (" sessionId" , org.apache.spark.sql.types.StringType , nullable = false ),
338- StructField (" sessionStartTime" , org.apache.spark.sql.types.TimestampType , nullable = false )
339- ))
340- val valueSchema = StructType (Array (
341- StructField (" session_window" , org.apache.spark.sql.types.StructType (Array (
342- StructField (" start" , org.apache.spark.sql.types.TimestampType , nullable = true ),
343- StructField (" end" , org.apache.spark.sql.types.TimestampType , nullable = true )
344- )), nullable = false ),
345- StructField (" sessionId" , org.apache.spark.sql.types.StringType , nullable = false ),
346- StructField (" count" , LongType , nullable = false )
347- ))
348-
349- val normalData = getNormalReadDf(tempDir.getAbsolutePath).collect()
350- val bytesDf = getBytesReadDf(tempDir.getAbsolutePath).collect()
351-
352- compareNormalAndBytesData(normalData, bytesDf, " default" , keySchema, valueSchema)
334+ runSessionWindowAggregationQuery(tempDir.getAbsolutePath)
335+
336+ val keySchema = StructType (Array (
337+ StructField (" sessionId" , org.apache.spark.sql.types.StringType , nullable = false ),
338+ StructField (" sessionStartTime" , org.apache.spark.sql.types.TimestampType , nullable = false )
339+ ))
340+ val valueSchema = StructType (Array (
341+ StructField (" session_window" , org.apache.spark.sql.types.StructType (Array (
342+ StructField (" start" , org.apache.spark.sql.types.TimestampType , nullable = true ),
343+ StructField (" end" , org.apache.spark.sql.types.TimestampType , nullable = true )
344+ )), nullable = false ),
345+ StructField (" sessionId" , org.apache.spark.sql.types.StringType , nullable = false ),
346+ StructField (" count" , LongType , nullable = false )
347+ ))
348+
349+ val normalData = getNormalReadDf(tempDir.getAbsolutePath).collect()
350+ val bytesDf = getBytesReadDf(tempDir.getAbsolutePath).collect()
351+
352+ compareNormalAndBytesData(normalData, bytesDf, " default" , keySchema, valueSchema)
353353 }
354354 }
355355
0 commit comments