Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ default Map<String, String> properties() {
default Constraint[] constraints() { return new Constraint[0]; }

/**
* Returns the current table version if implementation supports versioning.
* Returns this table version without refreshing state if implementation supports versioning.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, can you explain more why to specify specially on "without refreshing state"? Does currentVersion implicitly refresh state possibly? I suppose currentVersion just returns the current version, and don't expect it will refresh state.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, without refreshing state is confusing

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about

/**
 * Returns the current table version if the implementation supports versioning.
 * The returned value is not automatically refreshed.
 * If the table is not versioned, this method may return null.
 */

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The returned value -> This table instance

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the whole purpose of the rename is to make sure connectors DO NOT automatically refresh the underlying table state. In other words, it must act like a guidance for connectors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about this, folks?

/**
 * Returns the version of this table if versioning is supported, null otherwise.
 * <p>
 * This method must not trigger a refresh of the table metadata. It should return
 * the version that corresponds to the current state of this table instance.
 */

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to avoid the word current as this may not be current and can be behind the version in the metastore.

* If the table is not versioned, null can be returned here.
*/
default String currentVersion() { return null; }
default String version() { return null; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ case class DataSourceV2Relation(
def autoSchemaEvolution(): Boolean =
table.capabilities().contains(TableCapability.AUTOMATIC_SCHEMA_EVOLUTION)

def isVersioned: Boolean = table.currentVersion != null
def isVersioned: Boolean = table.version != null
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ private[sql] object V2TableRefreshUtil extends SQLConfHelper with Logging {
case r @ ExtractV2CatalogAndIdentifier(catalog, ident)
if r.isVersioned && r.timeTravelSpec.isEmpty =>
val tableName = V2TableUtil.toQualifiedName(catalog, ident)
val version = r.table.currentVersion
val version = r.table.version
logDebug(s"Pinning table version for $tableName to $version")
r.copy(timeTravelSpec = Some(AsOfVersion(version)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1357,24 +1357,24 @@ class CatalogSuite extends SparkFunSuite {
intercept[NoSuchFunctionException](catalog.loadFunction(Identifier.of(Array("ns1"), "func")))
}

test("currentVersion") {
test("version") {
val catalog = newCatalog()

val table = catalog.createTable(testIdent, columns, emptyTrans, emptyProps)
.asInstanceOf[InMemoryTable]
assert(table.currentVersion() == "0")
assert(table.version() == "0")
table.withData(Array(
BufferedRows("3", table.columns()).withRow(InternalRow(0, "abc", "3")),
BufferedRows("4", table.columns()).withRow(InternalRow(1, "def", "4"))))
assert(table.currentVersion() == "1")
assert(table.version() == "1")

table.truncateTable()
assert(catalog.loadTable(testIdent).currentVersion() == "2")
assert(catalog.loadTable(testIdent).version() == "2")

catalog.alterTable(testIdent, TableChange.setProperty("prop-1", "1"))
assert(catalog.loadTable(testIdent).currentVersion() == "3")
assert(catalog.loadTable(testIdent).version() == "3")

catalog.alterTable(testIdent, TableChange.addConstraint(constraints.apply(0), "3"))
assert(catalog.loadTable(testIdent).currentVersion() == "4")
assert(catalog.loadTable(testIdent).version() == "4")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ abstract class InMemoryBaseTable(
extends Table with SupportsRead with SupportsWrite with SupportsMetadataColumns {

// Tracks the current version number of the table.
protected var currentTableVersion: Int = 0
protected var tableVersion: Int = 0

// Stores the table version validated during the last `ALTER TABLE ... ADD CONSTRAINT` operation.
private var validatedTableVersion: String = null
Expand All @@ -75,14 +75,14 @@ abstract class InMemoryBaseTable(

override def columns(): Array[Column] = tableColumns

override def currentVersion(): String = currentTableVersion.toString
override def version(): String = tableVersion.toString

def setCurrentVersion(version: String): Unit = {
currentTableVersion = version.toInt
def setVersion(version: String): Unit = {
tableVersion = version.toInt
}

def increaseCurrentVersion(): Unit = {
currentTableVersion += 1
def increaseVersion(): Unit = {
tableVersion += 1
}

def validatedVersion(): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class InMemoryTable(
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
dataMap --= InMemoryTable
.filtersToKeys(dataMap.keys, partCols.map(_.toSeq.quoted).toImmutableArraySeq, filters)
increaseCurrentVersion()
increaseVersion()
}

override def withData(data: Array[BufferedRows]): InMemoryTable = {
Expand Down Expand Up @@ -109,7 +109,7 @@ class InMemoryTable(
row.getInt(0) == InMemoryTable.uncommittableValue()))) {
throw new IllegalArgumentException(s"Test only mock write failure")
}
increaseCurrentVersion()
increaseVersion()
this
}
}
Expand Down Expand Up @@ -155,7 +155,7 @@ class InMemoryTable(

copiedTable.commits ++= commits.map(_.copy())

copiedTable.setCurrentVersion(currentVersion())
copiedTable.setVersion(version())
if (validatedVersion() != null) {
copiedTable.setValidatedVersion(validatedVersion())
}
Expand All @@ -165,12 +165,12 @@ class InMemoryTable(

override def equals(other: Any): Boolean = other match {
case that: InMemoryTable =>
this.id == that.id && this.currentVersion() == that.currentVersion()
this.id == that.id && this.version() == that.version()
case _ => false
}

override def hashCode(): Int = {
Objects.hash(id, currentVersion())
Objects.hash(id, version())
}

class InMemoryWriterBuilderWithOverWrite(override val info: LogicalWriteInfo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,8 @@ class BasicInMemoryTableCatalog extends TableCatalog {
throw new IllegalArgumentException(s"Cannot drop all fields")
}

table.increaseCurrentVersion()
val currentVersion = table.currentVersion()
table.increaseVersion()
val currentVersion = table.version()
val newTable = new InMemoryTable(
name = table.name,
columns = CatalogV2Util.structTypeToV2Columns(schema),
Expand All @@ -189,7 +189,7 @@ class BasicInMemoryTableCatalog extends TableCatalog {
constraints = constraints,
id = table.id)
.alterTableWithData(table.data, schema)
newTable.setCurrentVersion(currentVersion)
newTable.setVersion(currentVersion)
changes.foreach {
case a: TableChange.AddConstraint =>
newTable.setValidatedVersion(a.validatedTableVersion())
Expand All @@ -209,7 +209,7 @@ class BasicInMemoryTableCatalog extends TableCatalog {

Option(tables.remove(oldIdent)) match {
case Some(table: InMemoryBaseTable) =>
table.increaseCurrentVersion()
table.increaseVersion()
tables.put(newIdent, table)
case _ =>
throw new NoSuchTableException(oldIdent.asMultipartIdentifier)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
val condition = a.checkConstraint.condition
val change = TableChange.addConstraint(
check.toV2Constraint,
d.relation.table.currentVersion)
d.relation.table.version)
ResolveTableConstraints.validateCatalogForTableChange(Seq(change), catalog, ident)
AddCheckConstraintExec(catalog, ident, change, condition, planLater(a.child)) :: Nil

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ class CheckConstraintSuite extends QueryTest with CommandSuiteBase with DDLComma
sql(s"INSERT INTO $t VALUES (1, 'a'), (null, 'b')")
sql(s"ALTER TABLE $t ADD CONSTRAINT c1 CHECK (id > 0) $characteristic")
val table = loadTable(nonPartitionCatalog, "ns", "tbl")
assert(table.currentVersion() == "2")
assert(table.version() == "2")
assert(table.validatedVersion() == "1")
val constraint = getCheckConstraint(table)
assert(constraint.name() == "c1")
Expand Down Expand Up @@ -254,7 +254,7 @@ class CheckConstraintSuite extends QueryTest with CommandSuiteBase with DDLComma
// Add a valid check constraint
sql(s"ALTER TABLE $t ADD CONSTRAINT valid_positive_num CHECK (s.num >= -1)")
val table = loadTable(nonPartitionCatalog, "ns", "tbl")
assert(table.currentVersion() == "2")
assert(table.version() == "2")
assert(table.validatedVersion() == "1")
val constraint = getCheckConstraint(table)
assert(constraint.name() == "valid_positive_num")
Expand Down Expand Up @@ -284,7 +284,7 @@ class CheckConstraintSuite extends QueryTest with CommandSuiteBase with DDLComma
// Add a valid check constraint
sql(s"ALTER TABLE $t ADD CONSTRAINT valid_map_val CHECK (m['a'] >= -1)")
val table = loadTable(nonPartitionCatalog, "ns", "tbl")
assert(table.currentVersion() == "2")
assert(table.version() == "2")
assert(table.validatedVersion() == "1")
val constraint = getCheckConstraint(table)
assert(constraint.name() == "valid_map_val")
Expand Down Expand Up @@ -312,7 +312,7 @@ class CheckConstraintSuite extends QueryTest with CommandSuiteBase with DDLComma
// Add a valid check constraint
sql(s"ALTER TABLE $t ADD CONSTRAINT valid_array CHECK (a[1] >= -2)")
val table = loadTable(nonPartitionCatalog, "ns", "tbl")
assert(table.currentVersion() == "2")
assert(table.version() == "2")
assert(table.validatedVersion() == "1")
val constraint = getCheckConstraint(table)
assert(constraint.name() == "valid_array")
Expand Down