Skip to content

Commit a78a733

Browse files
author
yangyx
committed
HiveFileStatusCache unit test
1 parent 9ba7cfc commit a78a733

File tree

1 file changed

+68
-53
lines changed

1 file changed

+68
-53
lines changed

extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveFileStatusCacheSuite.scala

Lines changed: 68 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -85,90 +85,92 @@ class HiveFileStatusCacheSuite extends KyuubiHiveTest {
8585
}
8686
}
8787

88-
8988
private def newCatalog(): HiveTableCatalog = {
9089
val catalog = new HiveTableCatalog
91-
val catalogName = "hive"
9290
val properties = Maps.newHashMap[String, String]()
9391
properties.put("javax.jdo.option.ConnectionURL", "jdbc:derby:memory:memorydb;create=true")
9492
properties.put("javax.jdo.option.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver")
95-
catalog.initialize(catalogName, new CaseInsensitiveStringMap(properties))
93+
catalog.initialize(super.catalogName, new CaseInsensitiveStringMap(properties))
9694
catalog
9795
}
9896

9997
test("expire FileStatusCache when insert into") {
100-
val table = "hive.default.tbl_partition"
101-
val identifier = Identifier.of(Array("default"), "tbl_partition")
98+
val dbName = "default"
99+
val tbName = "tbl_partition"
100+
val table = s"${catalogName}.${dbName}.${tbName}"
101+
102102
withTable(table) {
103103
spark.sql(s"create table $table (age int)partitioned by(city string) stored as orc").collect()
104-
val hiveCatalogTable = newCatalog().loadTable(identifier).asInstanceOf[HiveTable]
105-
val location = hiveCatalogTable.catalogTable.location.toString
104+
val location = newCatalog()
105+
.loadTable(Identifier.of(Array(dbName), tbName))
106+
.asInstanceOf[HiveTable]
107+
.catalogTable.location.toString
106108

107109
spark.sql(s"insert into $table partition(city='ct') values(10),(20),(30),(40),(50)").collect()
108110
assert(HiveFileStatusCache.getOrCreate(spark, table)
109111
.getLeafFiles(new Path(s"$location/city=ct")).isEmpty)
110112

111-
val df1 = spark.sql(s"select * from $table")
112-
assert(df1.count() === 5)
113+
assert(spark.sql(s"select * from $table").count() === 5)
113114
assert(HiveFileStatusCache.getOrCreate(spark, table)
114-
.getLeafFiles(new Path(s"$location/city=ct")).size === 1)
115+
.getLeafFiles(new Path(s"$location/city=ct")).get.length === 1)
115116

116117
spark.sql(s"insert into $table partition(city='ct') values(11),(21),(31),(41),(51)").collect()
117118
assert(HiveFileStatusCache.getOrCreate(spark, table)
118119
.getLeafFiles(new Path(s"$location/city=ct")).isEmpty)
119120

120-
val df2 = spark.sql(s"select * from $table")
121-
assert(df2.count() === 10)
121+
assert(spark.sql(s"select * from $table").count() === 10)
122122
assert(HiveFileStatusCache.getOrCreate(spark, table)
123-
.getLeafFiles(new Path(s"$location/city=ct")).size === 2)
123+
.getLeafFiles(new Path(s"$location/city=ct")).get.length === 2)
124124
}
125125
}
126126

127127
test("expire FileStatusCache when insert overwrite") {
128-
val table = "hive.default.tbl_partition"
129-
val identifier = Identifier.of(Array("default"), "tbl_partition")
128+
val dbName = "default"
129+
val tbName = "tbl_partition"
130+
val table = s"${catalogName}.${dbName}.${tbName}"
131+
130132
withTable(table) {
131133
spark.sql(s"create table $table (age int)partitioned by(city string) stored as orc").collect()
132-
val hiveCatalogTable = newCatalog().loadTable(identifier).asInstanceOf[HiveTable]
133-
val location = hiveCatalogTable.catalogTable.location.toString
134+
val location = newCatalog()
135+
.loadTable(Identifier.of(Array(dbName), tbName))
136+
.asInstanceOf[HiveTable]
137+
.catalogTable.location.toString
134138

135139
spark.sql(s"insert into $table partition(city='ct') values(10),(20),(30),(40),(50)").collect()
136140
assert(HiveFileStatusCache.getOrCreate(spark, table)
137141
.getLeafFiles(new Path(s"$location/city=ct")).isEmpty)
138142

139-
val df1 = spark.sql(s"select * from $table")
140-
assert(df1.count() === 5)
143+
assert(spark.sql(s"select * from $table").count() === 5)
141144
assert(HiveFileStatusCache.getOrCreate(spark, table)
142-
.getLeafFiles(new Path(s"$location/city=ct")).size === 1)
145+
.getLeafFiles(new Path(s"$location/city=ct")).get.length === 1)
143146

144147
spark.sql(s"insert overwrite $table partition(city='ct') values(11),(21),(31),(41),(51)")
145148
.collect()
146149
assert(HiveFileStatusCache.getOrCreate(spark, table)
147150
.getLeafFiles(new Path(s"$location/city=ct")).isEmpty)
148151

149-
val df2 = spark.sql(s"select * from $table")
150-
assert(df2.count() === 5)
152+
assert(spark.sql(s"select * from $table").count() === 5)
151153
assert(HiveFileStatusCache.getOrCreate(spark, table)
152-
.getLeafFiles(new Path(s"$location/city=ct")).size === 1)
154+
.getLeafFiles(new Path(s"$location/city=ct")).get.length === 1)
153155
}
154156
}
155157

156158
test("expire FileStatusCache when alter Table") {
157-
val table = "hive.default.tbl_partition"
158-
val identifier = Identifier.of(Array("default"), "tbl_partition")
159+
val dbName = "default"
160+
val tbName = "tbl_partition"
161+
val table = s"${catalogName}.${dbName}.${tbName}"
162+
159163
withTable(table) {
160164
spark.sql(s"create table $table (age int)partitioned by(city string) stored as orc").collect()
161-
val hiveCatalogTable = newCatalog().loadTable(identifier).asInstanceOf[HiveTable]
162-
val location = hiveCatalogTable.catalogTable.location.toString
165+
val location = newCatalog()
166+
.loadTable(Identifier.of(Array(dbName), tbName))
167+
.asInstanceOf[HiveTable]
168+
.catalogTable.location.toString
163169

164170
spark.sql(s"insert into $table partition(city='ct') values(10),(20),(30),(40),(50)").collect()
171+
spark.sql(s"select * from $table").collect()
165172
assert(HiveFileStatusCache.getOrCreate(spark, table)
166-
.getLeafFiles(new Path(s"$location/city=ct")).isEmpty)
167-
168-
val df1 = spark.sql(s"select * from $table")
169-
assert(df1.count() === 5)
170-
assert(HiveFileStatusCache.getOrCreate(spark, table)
171-
.getLeafFiles(new Path(s"$location/city=ct")).size === 1)
173+
.getLeafFiles(new Path(s"$location/city=ct")).get.length === 1)
172174

173175
spark.sql(s"ALTER TABLE $table ADD COLUMNS (name string)").collect()
174176
assert(HiveFileStatusCache.getOrCreate(spark, table)
@@ -177,28 +179,41 @@ class HiveFileStatusCacheSuite extends KyuubiHiveTest {
177179
}
178180

179181
test("expire FileStatusCache when rename Table") {
180-
val table = "hive.default.tbl_partition"
181-
val identifier = Identifier.of(Array("default"), "tbl_partition")
182-
withTable(table) {
183-
spark.sql(s"create table $table (age int)partitioned by(city string) stored as orc").collect()
184-
val hiveCatalogTable = newCatalog().loadTable(identifier).asInstanceOf[HiveTable]
185-
val location = hiveCatalogTable.catalogTable.location.toString
186-
187-
spark.sql(s"insert into $table partition(city='ct') values(10),(20),(30),(40),(50)").collect()
188-
assert(HiveFileStatusCache.getOrCreate(spark, table)
189-
.getLeafFiles(new Path(s"$location/city=ct")).isEmpty)
190-
191-
val df1 = spark.sql(s"select * from $table")
192-
assert(df1.count() === 5)
193-
assert(HiveFileStatusCache.getOrCreate(spark, table)
194-
.getLeafFiles(new Path(s"$location/city=ct")).size === 1)
182+
val dbName = "default"
183+
val oldTbName = "tbl_partition"
184+
val newTbName = "tbl_partition_new"
185+
val oldTable = s"${catalogName}.${dbName}.${oldTbName}"
186+
val newTable = s"${catalogName}.${dbName}.${newTbName}"
187+
188+
withTable(newTable) {
189+
spark.sql(s"create table ${oldTable} (age int)partitioned by(city string) stored as orc")
190+
.collect()
191+
spark.sql(s"insert into $oldTable partition(city='ct') values(10),(20),(30),(40),(50)")
192+
.collect()
193+
spark.sql(s"select * from $oldTable").collect()
194+
195+
val oldLocation = newCatalog()
196+
.loadTable(Identifier.of(Array(dbName), oldTbName))
197+
.asInstanceOf[HiveTable]
198+
.catalogTable.location.toString
199+
assert(HiveFileStatusCache.getOrCreate(spark, oldTable)
200+
.getLeafFiles(new Path(s"$oldLocation/city=ct")).get.length === 1)
201+
202+
spark.sql(s"DROP TABLE IF EXISTS ${newTable}").collect()
203+
spark.sql(s"use ${catalogName}.${dbName}").collect()
204+
spark.sql(s"ALTER TABLE $oldTbName RENAME TO $newTbName").collect()
205+
val newLocation = newCatalog()
206+
.loadTable(Identifier.of(Array(dbName), newTbName))
207+
.asInstanceOf[HiveTable]
208+
.catalogTable.location.toString
209+
210+
assert(HiveFileStatusCache.getOrCreate(spark, oldTable)
211+
.getLeafFiles(new Path(s"$oldLocation/city=ct"))
212+
.isEmpty)
195213

196-
val newTable = "hive.default.tbl_partition_1"
197-
spark.sql(s"ALTER TABLE $table RENAME TO $newTable").collect()
198-
assert(HiveFileStatusCache.getOrCreate(spark, table)
199-
.getLeafFiles(new Path(s"$location/city=ct")).isEmpty)
200214
assert(HiveFileStatusCache.getOrCreate(spark, newTable)
201-
.getLeafFiles(new Path(s"$location/city=ct")).isEmpty)
215+
.getLeafFiles(new Path(s"$newLocation/city=ct"))
216+
.isEmpty)
202217
}
203218
}
204219
}

0 commit comments

Comments
 (0)