1717
1818package org .apache .kyuubi .spark .connector .hive
1919
20+ import com .google .common .collect .Maps
2021import scala .concurrent .duration .DurationInt
21-
2222import org .apache .hadoop .fs .{FileStatus , Path }
2323import org .apache .spark .sql .internal .{SQLConf , StaticSQLConf }
2424import org .scalatest .concurrent .Eventually .eventually
2525import org .scalatest .concurrent .Futures .timeout
26-
2726import org .apache .kyuubi .spark .connector .hive .read .HiveFileStatusCache
27+ import org .apache .spark .sql .connector .catalog .Identifier
28+ import org .apache .spark .sql .util .CaseInsensitiveStringMap
2829
2930class HiveFileStatusCacheSuite extends KyuubiHiveTest {
3031
@@ -83,4 +84,121 @@ class HiveFileStatusCacheSuite extends KyuubiHiveTest {
8384 SQLConf .get.setConf(StaticSQLConf .METADATA_CACHE_TTL_SECONDS , previousValue)
8485 }
8586 }
86- }
87+
88+
89+ private def newCatalog (): HiveTableCatalog = {
90+ val catalog = new HiveTableCatalog
91+ val catalogName = " hive"
92+ val properties = Maps .newHashMap[String , String ]()
93+ properties.put(" javax.jdo.option.ConnectionURL" , " jdbc:derby:memory:memorydb;create=true" )
94+ properties.put(" javax.jdo.option.ConnectionDriverName" , " org.apache.derby.jdbc.EmbeddedDriver" )
95+ catalog.initialize(catalogName, new CaseInsensitiveStringMap (properties))
96+ catalog
97+ }
98+
99+ test(" expire FileStatusCache when insert into" ) {
100+ val table = " hive.default.tbl_partition"
101+ val identifier = Identifier .of(Array (" default" ), " tbl_partition" )
102+ withTable(table) {
103+ spark.sql(s " create table $table (age int)partitioned by(city string) stored as orc " ).collect()
104+ val hiveCatalogTable = newCatalog().loadTable(identifier).asInstanceOf [HiveTable ]
105+ val location = hiveCatalogTable.catalogTable.location.toString
106+
107+ spark.sql(s " insert into $table partition(city='ct') values(10),(20),(30),(40),(50) " ).collect()
108+ assert(HiveFileStatusCache .getOrCreate(spark, table)
109+ .getLeafFiles(new Path (s " $location/city=ct " )).isEmpty)
110+
111+ val df1 = spark.sql(s " select * from $table" )
112+ assert(df1.count() === 5 )
113+ assert(HiveFileStatusCache .getOrCreate(spark, table)
114+ .getLeafFiles(new Path (s " $location/city=ct " )).size === 1 )
115+
116+ spark.sql(s " insert into $table partition(city='ct') values(11),(21),(31),(41),(51) " ).collect()
117+ assert(HiveFileStatusCache .getOrCreate(spark, table)
118+ .getLeafFiles(new Path (s " $location/city=ct " )).isEmpty)
119+
120+ val df2 = spark.sql(s " select * from $table" )
121+ assert(df2.count() === 10 )
122+ assert(HiveFileStatusCache .getOrCreate(spark, table)
123+ .getLeafFiles(new Path (s " $location/city=ct " )).size === 2 )
124+ }
125+ }
126+
127+ test(" expire FileStatusCache when insert overwrite" ) {
128+ val table = " hive.default.tbl_partition"
129+ val identifier = Identifier .of(Array (" default" ), " tbl_partition" )
130+ withTable(table) {
131+ spark.sql(s " create table $table (age int)partitioned by(city string) stored as orc " ).collect()
132+ val hiveCatalogTable = newCatalog().loadTable(identifier).asInstanceOf [HiveTable ]
133+ val location = hiveCatalogTable.catalogTable.location.toString
134+
135+ spark.sql(s " insert into $table partition(city='ct') values(10),(20),(30),(40),(50) " ).collect()
136+ assert(HiveFileStatusCache .getOrCreate(spark, table)
137+ .getLeafFiles(new Path (s " $location/city=ct " )).isEmpty)
138+
139+ val df1 = spark.sql(s " select * from $table" )
140+ assert(df1.count() === 5 )
141+ assert(HiveFileStatusCache .getOrCreate(spark, table)
142+ .getLeafFiles(new Path (s " $location/city=ct " )).size === 1 )
143+
144+ spark.sql(s " insert overwrite $table partition(city='ct') values(11),(21),(31),(41),(51) " )
145+ .collect()
146+ assert(HiveFileStatusCache .getOrCreate(spark, table)
147+ .getLeafFiles(new Path (s " $location/city=ct " )).isEmpty)
148+
149+ val df2 = spark.sql(s " select * from $table" )
150+ assert(df2.count() === 5 )
151+ assert(HiveFileStatusCache .getOrCreate(spark, table)
152+ .getLeafFiles(new Path (s " $location/city=ct " )).size === 1 )
153+ }
154+ }
155+
156+ test(" expire FileStatusCache when alter Table" ) {
157+ val table = " hive.default.tbl_partition"
158+ val identifier = Identifier .of(Array (" default" ), " tbl_partition" )
159+ withTable(table) {
160+ spark.sql(s " create table $table (age int)partitioned by(city string) stored as orc " ).collect()
161+ val hiveCatalogTable = newCatalog().loadTable(identifier).asInstanceOf [HiveTable ]
162+ val location = hiveCatalogTable.catalogTable.location.toString
163+
164+ spark.sql(s " insert into $table partition(city='ct') values(10),(20),(30),(40),(50) " ).collect()
165+ assert(HiveFileStatusCache .getOrCreate(spark, table)
166+ .getLeafFiles(new Path (s " $location/city=ct " )).isEmpty)
167+
168+ val df1 = spark.sql(s " select * from $table" )
169+ assert(df1.count() === 5 )
170+ assert(HiveFileStatusCache .getOrCreate(spark, table)
171+ .getLeafFiles(new Path (s " $location/city=ct " )).size === 1 )
172+
173+ spark.sql(s " ALTER TABLE $table ADD COLUMNS (name string) " ).collect()
174+ assert(HiveFileStatusCache .getOrCreate(spark, table)
175+ .getLeafFiles(new Path (s " $location/city=ct " )).isEmpty)
176+ }
177+ }
178+
179+ test(" expire FileStatusCache when rename Table" ) {
180+ val table = " hive.default.tbl_partition"
181+ val identifier = Identifier .of(Array (" default" ), " tbl_partition" )
182+ withTable(table) {
183+ spark.sql(s " create table $table (age int)partitioned by(city string) stored as orc " ).collect()
184+ val hiveCatalogTable = newCatalog().loadTable(identifier).asInstanceOf [HiveTable ]
185+ val location = hiveCatalogTable.catalogTable.location.toString
186+
187+ spark.sql(s " insert into $table partition(city='ct') values(10),(20),(30),(40),(50) " ).collect()
188+ assert(HiveFileStatusCache .getOrCreate(spark, table)
189+ .getLeafFiles(new Path (s " $location/city=ct " )).isEmpty)
190+
191+ val df1 = spark.sql(s " select * from $table" )
192+ assert(df1.count() === 5 )
193+ assert(HiveFileStatusCache .getOrCreate(spark, table)
194+ .getLeafFiles(new Path (s " $location/city=ct " )).size === 1 )
195+
196+ val newTable = " hive.default.tbl_partition_1"
197+ spark.sql(s " ALTER TABLE $table RENAME TO $newTable" ).collect()
198+ assert(HiveFileStatusCache .getOrCreate(spark, table)
199+ .getLeafFiles(new Path (s " $location/city=ct " )).isEmpty)
200+ assert(HiveFileStatusCache .getOrCreate(spark, newTable)
201+ .getLeafFiles(new Path (s " $location/city=ct " )).isEmpty)
202+ }
203+ }
204+ }
0 commit comments