Skip to content

Commit 37aa81a

Browse files
author
yangyx
committed
overwrite invalidateTable method to clear table fileStatusCache
1 parent 7298b00 commit 37aa81a

File tree

3 files changed

+36
-24
lines changed

3 files changed

+36
-24
lines changed

extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -386,8 +386,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
386386
case _: NoSuchTableException =>
387387
throw new NoSuchTableException(ident)
388388
}
389-
HiveFileStatusCache.getOrCreate(sparkSession,
390-
catalogName + "." + catalogTable.qualifiedName).invalidateAll()
389+
invalidateTable(ident)
391390
loadTable(ident)
392391
}
393392

@@ -400,9 +399,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
400399
ident.asTableIdentifier,
401400
ignoreIfNotExists = true,
402401
purge = true /* skip HDFS trash */ )
403-
if (table.isInstanceOf[HiveTable]) {
404-
table.asInstanceOf[HiveTable].fileIndex.refresh()
405-
}
402+
invalidateTable(ident)
406403
true
407404
} else {
408405
false
@@ -422,12 +419,14 @@ class HiveTableCatalog(sparkSession: SparkSession)
422419
// Load table to make sure the table exists
423420
val table = loadTable(oldIdent)
424421
catalog.renameTable(oldIdent.asTableIdentifier, newIdent.asTableIdentifier)
425-
if (table.isInstanceOf[HiveTable]) {
426-
table.asInstanceOf[HiveTable].fileIndex.refresh()
427-
}
428-
422+
invalidateTable(oldIdent)
429423
}
430424

425+
override def invalidateTable(ident: Identifier): Unit = {
426+
val qualifiedName = s"$catalogName.${ident.namespace().mkString(".")}.${ident.name()}"
427+
HiveFileStatusCache.getOrCreate(sparkSession, qualifiedName).invalidateAll()
428+
}
429+
431430
private def toOptions(properties: Map[String, String]): Map[String, String] = {
432431
properties.filterKeys(_.startsWith(TableCatalog.OPTION_PREFIX)).map {
433432
case (key, value) => key.drop(TableCatalog.OPTION_PREFIX.length) -> value

extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import scala.util.control.NonFatal
2121
import org.apache.hadoop.conf.Configuration
2222
import org.apache.hadoop.fs.Path
2323
import org.apache.hadoop.hive.conf.HiveConf
24-
import org.apache.kyuubi.spark.connector.hive.read.HiveFileStatusCache
2524
import org.apache.spark.internal.Logging
2625
import org.apache.spark.internal.io.FileCommitProtocol
2726
import org.apache.spark.sql.SparkSession
@@ -33,6 +32,7 @@ import org.apache.spark.sql.execution.datasources.v2.FileBatchWrite
3332
import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.toSQLValue
3433
import org.apache.spark.sql.types.StringType
3534
import org.apache.kyuubi.spark.connector.hive.{HiveConnectorUtils, HiveTableCatalog, KyuubiHiveConnectorException}
35+
import org.apache.spark.sql.connector.catalog.Identifier
3636

3737
class HiveBatchWrite(
3838
sparkSession: SparkSession,
@@ -68,9 +68,9 @@ class HiveBatchWrite(
6868

6969
// un-cache this table.
7070
hiveTableCatalog.catalog.invalidateCachedTable(table.identifier)
71-
// clear fileStatusCache
72-
HiveFileStatusCache.getOrCreate(sparkSession,
73-
hiveTableCatalog.name() + "." + table.qualifiedName).invalidateAll()
71+
hiveTableCatalog.invalidateTable(
72+
Identifier.of(Array(table.identifier.database.getOrElse("")), table.identifier.table)
73+
)
7474

7575
val catalog = hiveTableCatalog.catalog
7676
if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) {

extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
3939

4040
import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.IdentifierHelper
4141
import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorConf.{READ_CONVERT_METASTORE_ORC, READ_CONVERT_METASTORE_PARQUET}
42-
import org.apache.kyuubi.spark.connector.hive.read.HiveScan
42+
import org.apache.kyuubi.spark.connector.hive.read.{HiveFileStatusCache, HiveScan}
4343

4444
class HiveCatalogSuite extends KyuubiHiveTest {
4545

@@ -284,16 +284,29 @@ class HiveCatalogSuite extends KyuubiHiveTest {
284284
}
285285

286286
test("invalidateTable") {
287-
val table = catalog.createTable(testIdent, schema, Array.empty[Transform], emptyProps)
288-
// Hive v2 don't cache table
289-
catalog.invalidateTable(testIdent)
290-
291-
val loaded = catalog.loadTable(testIdent)
292-
293-
assert(table.name == loaded.name)
294-
assert(table.schema == loaded.schema)
295-
assert(table.properties == loaded.properties)
296-
catalog.dropTable(testIdent)
287+
withSparkSession() { spark =>
288+
val table = catalog.createTable(testIdent, schema, Array.empty[Transform], emptyProps)
289+
val qualifiedName = s"$catalogName.${testIdent.namespace().mkString(".")}.${testIdent.name()}"
290+
val location = table.asInstanceOf[HiveTable].catalogTable.location
291+
292+
spark.sql(s"select * from $qualifiedName").collect()
293+
assert(HiveFileStatusCache.getOrCreate(spark, qualifiedName)
294+
.getLeafFiles(new Path(location)).isDefined)
295+
296+
catalog.invalidateTable(testIdent)
297+
assert(HiveFileStatusCache.getOrCreate(spark, qualifiedName)
298+
.getLeafFiles(new Path(location)).isEmpty)
299+
300+
spark.sql(s"select * from $qualifiedName").collect()
301+
assert(HiveFileStatusCache.getOrCreate(spark, qualifiedName)
302+
.getLeafFiles(new Path(location)).isDefined)
303+
304+
val loaded = catalog.loadTable(testIdent)
305+
assert(table.name == loaded.name)
306+
assert(table.schema == loaded.schema)
307+
assert(table.properties == loaded.properties)
308+
catalog.dropTable(testIdent)
309+
}
297310
}
298311

299312
test("listNamespaces: fail if missing namespace") {

0 commit comments

Comments
 (0)