Skip to content

Commit fa48c3c

Browse files
aokolnychyizifeif2
authored andcommitted
[SPARK-51771][SQL][FOLLOWUP] Rename currentVersion to version in DSv2 Table
### What changes were proposed in this pull request? This PR renames `currentVersion` to `version` in DSv2 `Table`. This method is supposed to be a simple getter and must not trigger a refresh of the underlying table state. ### Why are the changes needed? These changes are needed to avoid ambiguity in the about to be released API in 4.1. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#53118 from aokolnychyi/spark-51771-followup. Authored-by: Anton Okolnychyi <aokolnychyi@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent d463a6c commit fa48c3c

File tree

9 files changed

+33
-31
lines changed

9 files changed

+33
-31
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Table.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,10 @@ default Map<String, String> properties() {
104104
default Constraint[] constraints() { return new Constraint[0]; }
105105

106106
/**
107-
* Returns the current table version if implementation supports versioning.
108-
* If the table is not versioned, null can be returned here.
107+
* Returns the version of this table if versioning is supported, null otherwise.
108+
* <p>
109+
* This method must not trigger a refresh of the table metadata. It should return
110+
* the version that corresponds to the current state of this table instance.
109111
*/
110-
default String currentVersion() { return null; }
112+
default String version() { return null; }
111113
}

sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ case class DataSourceV2Relation(
134134
def autoSchemaEvolution(): Boolean =
135135
table.capabilities().contains(TableCapability.AUTOMATIC_SCHEMA_EVOLUTION)
136136

137-
def isVersioned: Boolean = table.currentVersion != null
137+
def isVersioned: Boolean = table.version != null
138138
}
139139

140140
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2TableRefreshUtil.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ private[sql] object V2TableRefreshUtil extends SQLConfHelper with Logging {
4242
case r @ ExtractV2CatalogAndIdentifier(catalog, ident)
4343
if r.isVersioned && r.timeTravelSpec.isEmpty =>
4444
val tableName = V2TableUtil.toQualifiedName(catalog, ident)
45-
val version = r.table.currentVersion
45+
val version = r.table.version
4646
logDebug(s"Pinning table version for $tableName to $version")
4747
r.copy(timeTravelSpec = Some(AsOfVersion(version)))
4848
}

sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1357,24 +1357,24 @@ class CatalogSuite extends SparkFunSuite {
13571357
intercept[NoSuchFunctionException](catalog.loadFunction(Identifier.of(Array("ns1"), "func")))
13581358
}
13591359

1360-
test("currentVersion") {
1360+
test("version") {
13611361
val catalog = newCatalog()
13621362

13631363
val table = catalog.createTable(testIdent, columns, emptyTrans, emptyProps)
13641364
.asInstanceOf[InMemoryTable]
1365-
assert(table.currentVersion() == "0")
1365+
assert(table.version() == "0")
13661366
table.withData(Array(
13671367
BufferedRows("3", table.columns()).withRow(InternalRow(0, "abc", "3")),
13681368
BufferedRows("4", table.columns()).withRow(InternalRow(1, "def", "4"))))
1369-
assert(table.currentVersion() == "1")
1369+
assert(table.version() == "1")
13701370

13711371
table.truncateTable()
1372-
assert(catalog.loadTable(testIdent).currentVersion() == "2")
1372+
assert(catalog.loadTable(testIdent).version() == "2")
13731373

13741374
catalog.alterTable(testIdent, TableChange.setProperty("prop-1", "1"))
1375-
assert(catalog.loadTable(testIdent).currentVersion() == "3")
1375+
assert(catalog.loadTable(testIdent).version() == "3")
13761376

13771377
catalog.alterTable(testIdent, TableChange.addConstraint(constraints.apply(0), "3"))
1378-
assert(catalog.loadTable(testIdent).currentVersion() == "4")
1378+
assert(catalog.loadTable(testIdent).version() == "4")
13791379
}
13801380
}

sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ abstract class InMemoryBaseTable(
6666
extends Table with SupportsRead with SupportsWrite with SupportsMetadataColumns {
6767

6868
// Tracks the current version number of the table.
69-
protected var currentTableVersion: Int = 0
69+
protected var tableVersion: Int = 0
7070

7171
// Stores the table version validated during the last `ALTER TABLE ... ADD CONSTRAINT` operation.
7272
private var validatedTableVersion: String = null
@@ -75,14 +75,14 @@ abstract class InMemoryBaseTable(
7575

7676
override def columns(): Array[Column] = tableColumns
7777

78-
override def currentVersion(): String = currentTableVersion.toString
78+
override def version(): String = tableVersion.toString
7979

80-
def setCurrentVersion(version: String): Unit = {
81-
currentTableVersion = version.toInt
80+
def setVersion(version: String): Unit = {
81+
tableVersion = version.toInt
8282
}
8383

84-
def increaseCurrentVersion(): Unit = {
85-
currentTableVersion += 1
84+
def increaseVersion(): Unit = {
85+
tableVersion += 1
8686
}
8787

8888
def validatedVersion(): String = {

sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ class InMemoryTable(
6969
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
7070
dataMap --= InMemoryTable
7171
.filtersToKeys(dataMap.keys, partCols.map(_.toSeq.quoted).toImmutableArraySeq, filters)
72-
increaseCurrentVersion()
72+
increaseVersion()
7373
}
7474

7575
override def withData(data: Array[BufferedRows]): InMemoryTable = {
@@ -109,7 +109,7 @@ class InMemoryTable(
109109
row.getInt(0) == InMemoryTable.uncommittableValue()))) {
110110
throw new IllegalArgumentException(s"Test only mock write failure")
111111
}
112-
increaseCurrentVersion()
112+
increaseVersion()
113113
this
114114
}
115115
}
@@ -155,7 +155,7 @@ class InMemoryTable(
155155

156156
copiedTable.commits ++= commits.map(_.copy())
157157

158-
copiedTable.setCurrentVersion(currentVersion())
158+
copiedTable.setVersion(version())
159159
if (validatedVersion() != null) {
160160
copiedTable.setValidatedVersion(validatedVersion())
161161
}
@@ -165,12 +165,12 @@ class InMemoryTable(
165165

166166
override def equals(other: Any): Boolean = other match {
167167
case that: InMemoryTable =>
168-
this.id == that.id && this.currentVersion() == that.currentVersion()
168+
this.id == that.id && this.version() == that.version()
169169
case _ => false
170170
}
171171

172172
override def hashCode(): Int = {
173-
Objects.hash(id, currentVersion())
173+
Objects.hash(id, version())
174174
}
175175

176176
class InMemoryWriterBuilderWithOverWrite(override val info: LogicalWriteInfo)

sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -179,8 +179,8 @@ class BasicInMemoryTableCatalog extends TableCatalog {
179179
throw new IllegalArgumentException(s"Cannot drop all fields")
180180
}
181181

182-
table.increaseCurrentVersion()
183-
val currentVersion = table.currentVersion()
182+
table.increaseVersion()
183+
val currentVersion = table.version()
184184
val newTable = new InMemoryTable(
185185
name = table.name,
186186
columns = CatalogV2Util.structTypeToV2Columns(schema),
@@ -189,7 +189,7 @@ class BasicInMemoryTableCatalog extends TableCatalog {
189189
constraints = constraints,
190190
id = table.id)
191191
.alterTableWithData(table.data, schema)
192-
newTable.setCurrentVersion(currentVersion)
192+
newTable.setVersion(currentVersion)
193193
changes.foreach {
194194
case a: TableChange.AddConstraint =>
195195
newTable.setValidatedVersion(a.validatedTableVersion())
@@ -209,7 +209,7 @@ class BasicInMemoryTableCatalog extends TableCatalog {
209209

210210
Option(tables.remove(oldIdent)) match {
211211
case Some(table: InMemoryBaseTable) =>
212-
table.increaseCurrentVersion()
212+
table.increaseVersion()
213213
tables.put(newIdent, table)
214214
case _ =>
215215
throw new NoSuchTableException(oldIdent.asMultipartIdentifier)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -571,7 +571,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
571571
val condition = a.checkConstraint.condition
572572
val change = TableChange.addConstraint(
573573
check.toV2Constraint,
574-
d.relation.table.currentVersion)
574+
d.relation.table.version)
575575
ResolveTableConstraints.validateCatalogForTableChange(Seq(change), catalog, ident)
576576
AddCheckConstraintExec(catalog, ident, change, condition, planLater(a.child)) :: Nil
577577

sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CheckConstraintSuite.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ class CheckConstraintSuite extends QueryTest with CommandSuiteBase with DDLComma
207207
sql(s"INSERT INTO $t VALUES (1, 'a'), (null, 'b')")
208208
sql(s"ALTER TABLE $t ADD CONSTRAINT c1 CHECK (id > 0) $characteristic")
209209
val table = loadTable(nonPartitionCatalog, "ns", "tbl")
210-
assert(table.currentVersion() == "2")
210+
assert(table.version() == "2")
211211
assert(table.validatedVersion() == "1")
212212
val constraint = getCheckConstraint(table)
213213
assert(constraint.name() == "c1")
@@ -254,7 +254,7 @@ class CheckConstraintSuite extends QueryTest with CommandSuiteBase with DDLComma
254254
// Add a valid check constraint
255255
sql(s"ALTER TABLE $t ADD CONSTRAINT valid_positive_num CHECK (s.num >= -1)")
256256
val table = loadTable(nonPartitionCatalog, "ns", "tbl")
257-
assert(table.currentVersion() == "2")
257+
assert(table.version() == "2")
258258
assert(table.validatedVersion() == "1")
259259
val constraint = getCheckConstraint(table)
260260
assert(constraint.name() == "valid_positive_num")
@@ -284,7 +284,7 @@ class CheckConstraintSuite extends QueryTest with CommandSuiteBase with DDLComma
284284
// Add a valid check constraint
285285
sql(s"ALTER TABLE $t ADD CONSTRAINT valid_map_val CHECK (m['a'] >= -1)")
286286
val table = loadTable(nonPartitionCatalog, "ns", "tbl")
287-
assert(table.currentVersion() == "2")
287+
assert(table.version() == "2")
288288
assert(table.validatedVersion() == "1")
289289
val constraint = getCheckConstraint(table)
290290
assert(constraint.name() == "valid_map_val")
@@ -312,7 +312,7 @@ class CheckConstraintSuite extends QueryTest with CommandSuiteBase with DDLComma
312312
// Add a valid check constraint
313313
sql(s"ALTER TABLE $t ADD CONSTRAINT valid_array CHECK (a[1] >= -2)")
314314
val table = loadTable(nonPartitionCatalog, "ns", "tbl")
315-
assert(table.currentVersion() == "2")
315+
assert(table.version() == "2")
316316
assert(table.validatedVersion() == "1")
317317
val constraint = getCheckConstraint(table)
318318
assert(constraint.name() == "valid_array")

0 commit comments

Comments
 (0)