Skip to content

Commit 1da81f2

Browse files
cached fileindex
cache version 2 cache ut change ut
1 parent ea75fa8 commit 1da81f2

File tree

3 files changed

+240
-1
lines changed

3 files changed

+240
-1
lines changed

extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ class HiveCatalogFileIndex(
4848
private val partPathToBindHivePart: mutable.Map[PartitionPath, CatalogTablePartition] =
4949
mutable.Map()
5050

51-
private val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
51+
private val fileStatusCache =
52+
HiveFileStatusCache.getOrCreate(sparkSession, catalogTable.qualifiedName)
5253

5354
private val baseLocation: Option[URI] = table.storage.locationUri
5455

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.spark.connector.hive.read
19+
20+
import java.util.concurrent.TimeUnit
21+
import java.util.concurrent.atomic.AtomicBoolean
22+
23+
import scala.collection.JavaConverters._
24+
25+
import com.google.common.cache._
26+
import org.apache.hadoop.fs.{FileStatus, Path}
27+
import org.apache.spark.internal.Logging
28+
import org.apache.spark.sql.SparkSession
29+
import org.apache.spark.sql.execution.datasources.{FileStatusCache, NoopCache}
30+
import org.apache.spark.util.SizeEstimator
31+
32+
/**
33+
* Use [[HiveFileStatusCache.getOrCreate()]] to construct a globally shared file status cache.
34+
*/
35+
object HiveFileStatusCache {
36+
private var sharedCache: HiveSharedInMemoryCache = _
37+
38+
/**
39+
* @return a new FileStatusCache based on session configuration. Cache memory quota is
40+
* shared across all clients.
41+
*/
42+
def getOrCreate(session: SparkSession, qualifiedName: String): FileStatusCache =
43+
synchronized {
44+
if (session.sessionState.conf.manageFilesourcePartitions &&
45+
session.sessionState.conf.filesourcePartitionFileCacheSize > 0) {
46+
if (sharedCache == null) {
47+
sharedCache = new HiveSharedInMemoryCache(
48+
session.sessionState.conf.filesourcePartitionFileCacheSize,
49+
session.sessionState.conf.metadataCacheTTL)
50+
}
51+
sharedCache.createForNewClient(qualifiedName)
52+
} else {
53+
NoopCache
54+
}
55+
}
56+
57+
def resetForTesting(): Unit = synchronized {
58+
sharedCache = null
59+
}
60+
}
61+
62+
/**
63+
* An implementation that caches partition file statuses in memory.
64+
*
65+
* @param maxSizeInBytes max allowable cache size before entries start getting evicted
66+
*/
67+
private class HiveSharedInMemoryCache(maxSizeInBytes: Long, cacheTTL: Long) extends Logging {
68+
69+
// Opaque object that uniquely identifies a shared cache user
70+
private type ClientId = Object
71+
72+
private val warnedAboutEviction = new AtomicBoolean(false)
73+
74+
// we use a composite cache key in order to distinguish entries inserted by different clients
75+
private val cache: Cache[(ClientId, Path), Array[FileStatus]] = {
76+
// [[Weigher]].weigh returns Int so we could only cache objects < 2GB
77+
// instead, the weight is divided by this factor (which is smaller
78+
// than the size of one [[FileStatus]]).
79+
// so it will support objects up to 64GB in size.
80+
val weightScale = 32
81+
val weigher = new Weigher[(ClientId, Path), Array[FileStatus]] {
82+
override def weigh(key: (ClientId, Path), value: Array[FileStatus]): Int = {
83+
val estimate = (SizeEstimator.estimate(key) + SizeEstimator.estimate(value)) / weightScale
84+
if (estimate > Int.MaxValue) {
85+
logWarning(s"Cached table partition metadata size is too big. Approximating to " +
86+
s"${Int.MaxValue.toLong * weightScale}.")
87+
Int.MaxValue
88+
} else {
89+
estimate.toInt
90+
}
91+
}
92+
}
93+
val removalListener = new RemovalListener[(ClientId, Path), Array[FileStatus]]() {
94+
override def onRemoval(
95+
removed: RemovalNotification[(ClientId, Path), Array[FileStatus]]): Unit = {
96+
if (removed.getCause == RemovalCause.SIZE &&
97+
warnedAboutEviction.compareAndSet(false, true)) {
98+
logWarning(
99+
"Evicting cached table partition metadata from memory due to size constraints " +
100+
"(spark.sql.hive.filesourcePartitionFileCacheSize = "
101+
+ maxSizeInBytes + " bytes). This may impact query planning performance.")
102+
}
103+
}
104+
}
105+
106+
var builder = CacheBuilder.newBuilder()
107+
.weigher(weigher)
108+
.removalListener(removalListener)
109+
.maximumWeight(maxSizeInBytes / weightScale)
110+
111+
if (cacheTTL > 0) {
112+
builder = builder.expireAfterWrite(cacheTTL, TimeUnit.SECONDS)
113+
}
114+
115+
builder.build[(ClientId, Path), Array[FileStatus]]()
116+
}
117+
118+
/**
119+
* @return a FileStatusCache that does not share any entries with any other client, but does
120+
* share memory resources for the purpose of cache eviction.
121+
*/
122+
def createForNewClient(clientId: Object): HiveFileStatusCache = new HiveFileStatusCache {
123+
124+
override def getLeafFiles(path: Path): Option[Array[FileStatus]] = {
125+
Option(cache.getIfPresent((clientId, path)))
126+
}
127+
128+
override def putLeafFiles(path: Path, leafFiles: Array[FileStatus]): Unit = {
129+
cache.put((clientId, path), leafFiles)
130+
}
131+
132+
override def invalidateAll(): Unit = {
133+
cache.asMap.asScala.foreach { case (key, value) =>
134+
if (key._1 == clientId) {
135+
cache.invalidate(key)
136+
}
137+
}
138+
}
139+
}
140+
141+
abstract class HiveFileStatusCache extends FileStatusCache {}
142+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.spark.connector.hive
19+
20+
import scala.concurrent.duration.DurationInt
21+
22+
import org.apache.hadoop.fs.{FileStatus, Path}
23+
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
24+
import org.scalatest.concurrent.Eventually.eventually
25+
import org.scalatest.concurrent.Futures.timeout
26+
27+
import org.apache.kyuubi.spark.connector.hive.read.HiveFileStatusCache
28+
29+
class HiveFileStatusCacheSuite extends KyuubiHiveTest {
30+
31+
override def beforeEach(): Unit = {
32+
super.beforeEach()
33+
HiveFileStatusCache.resetForTesting()
34+
}
35+
36+
override def afterEach(): Unit = {
37+
super.afterEach()
38+
HiveFileStatusCache.resetForTesting()
39+
}
40+
41+
test("cached by qualifiedName") {
42+
val previousValue = SQLConf.get.getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS)
43+
try {
44+
// using 'SQLConf.get.setConf' instead of 'withSQLConf' to set a static config at runtime
45+
SQLConf.get.setConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS, 1L)
46+
47+
val path = new Path("/dummy_tmp", "abc")
48+
val files = (1 to 3).map(_ => new FileStatus())
49+
50+
HiveFileStatusCache.resetForTesting()
51+
val fileStatusCacheTabel1 = HiveFileStatusCache.getOrCreate(spark, "catalog.db.table1")
52+
fileStatusCacheTabel1.putLeafFiles(path, files.toArray)
53+
val fileStatusCacheTabel2 = HiveFileStatusCache.getOrCreate(spark, "catalog.db.table1")
54+
val fileStatusCacheTabel3 = HiveFileStatusCache.getOrCreate(spark, "catalog.db.table2")
55+
56+
// Exactly 3 files are cached.
57+
assert(fileStatusCacheTabel1.getLeafFiles(path).get.length === 3)
58+
assert(fileStatusCacheTabel2.getLeafFiles(path).get.length === 3)
59+
assert(fileStatusCacheTabel3.getLeafFiles(path).isEmpty === true)
60+
// Wait until the cache expiration.
61+
eventually(timeout(3.seconds)) {
62+
// And the cache is gone.
63+
assert(fileStatusCacheTabel1.getLeafFiles(path).isEmpty === true)
64+
assert(fileStatusCacheTabel2.getLeafFiles(path).isEmpty === true)
65+
assert(fileStatusCacheTabel3.getLeafFiles(path).isEmpty === true)
66+
}
67+
} finally {
68+
SQLConf.get.setConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS, previousValue)
69+
}
70+
}
71+
72+
test("expire FileStatusCache if TTL is configured") {
73+
val previousValue = SQLConf.get.getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS)
74+
try {
75+
// using 'SQLConf.get.setConf' instead of 'withSQLConf' to set a static config at runtime
76+
SQLConf.get.setConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS, 1L)
77+
78+
val path = new Path("/dummy_tmp", "abc")
79+
val files = (1 to 3).map(_ => new FileStatus())
80+
81+
HiveFileStatusCache.resetForTesting()
82+
val fileStatusCache = HiveFileStatusCache.getOrCreate(spark, "catalog.db.table")
83+
fileStatusCache.putLeafFiles(path, files.toArray)
84+
85+
// Exactly 3 files are cached.
86+
assert(fileStatusCache.getLeafFiles(path).get.length === 3)
87+
// Wait until the cache expiration.
88+
eventually(timeout(3.seconds)) {
89+
// And the cache is gone.
90+
assert(fileStatusCache.getLeafFiles(path).isEmpty === true)
91+
}
92+
} finally {
93+
SQLConf.get.setConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS, previousValue)
94+
}
95+
}
96+
}

0 commit comments

Comments
 (0)