Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 116 additions & 68 deletions src/databricks/labs/ucx/hive_metastore/table_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,54 +322,77 @@ def _catalog_storage(self):
self._spark._jvm.org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat # pylint: disable=protected-access
)

@staticmethod
def _get_entity_storage_locations(table_metadata):
"""Obtain the entityStorageLocations property for table metadata, if the property is present."""
# This is needed because:
# - DBR 16.0 introduced entityStorageLocations as a property on table metadata, and this is required for
# as a constructor parameter for CatalogTable.
# - We need to be compatible with earlier versions of DBR.
# - The normal hasattr() check does not work with Py4J-based objects: it always returns True and non-existent
# methods will be automatically created on the proxy but fail when invoked.
# Instead the only approach is to use dir() to check if the method exists _prior_ to trying to access it.
# (After trying to access it, dir() will also include it even though it doesn't exist.)
return table_metadata.entityStorageLocations() if 'entityStorageLocations' in dir(table_metadata) else None

def _convert_hms_table_to_external(self, src_table: Table) -> bool:
"""Converts a Hive metastore table to external using Spark JVM methods."""
logger.info(f"Changing HMS managed table {src_table.name} to External Table type.")
inventory_table = self._tables_crawler.full_name
database = self._spark._jvm.scala.Some(src_table.database) # pylint: disable=protected-access
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be retained under the try right?

try:
database = self._spark._jvm.scala.Some(src_table.database) # pylint: disable=protected-access
table_identifier = self._table_identifier(src_table.name, database)
old_table = self._catalog.getTableMetadata(table_identifier)
entity_storage_locations = self._get_entity_storage_locations(old_table)
new_table = self._catalog_table(
old_table.identifier(),
self._catalog_type('EXTERNAL'),
old_table.storage(),
old_table.schema(),
old_table.provider(),
old_table.partitionColumnNames(),
old_table.bucketSpec(),
old_table.owner(),
old_table.createTime(),
old_table.lastAccessTime(),
old_table.createVersion(),
old_table.properties(),
old_table.stats(),
old_table.viewText(),
old_table.comment(),
old_table.unsupportedFeatures(),
old_table.tracksPartitionsInCatalog(),
old_table.schemaPreservesCase(),
old_table.ignoredProperties(),
old_table.viewOriginalText(),
# From DBR 16, there's a new constructor argument: entityStorageLocations (Seq[EntityStorageLocation])
# (We can't detect whether the argument is needed by the constructor, but assume that if the accessor
# is present on the source table then the argument is needed.)
*([entity_storage_locations] if entity_storage_locations is not None else []),
)
# two alternative ways to create the new_table object, one for DBR15 or older one for DBR16.
# Since we can't detect the DBR version from code, we try to detect if the entityStorageLocations
# accessor is present on the source table metadata object.
# This is needed because:
# - DBR 16.0 introduced entityStorageLocations as a property on table metadata, and this is required for
# as a constructor parameter for CatalogTable.
# - We need to be compatible with earlier versions of DBR.
# - The normal hasattr() check does not work with Py4J-based objects: it always returns True and non-existent
# methods will be automatically created on the proxy but fail when invoked.
# Instead the only approach is to use dir() to check if the method exists _prior_ to trying to access it.
# (After trying to access it, dir() will also include it even though it doesn't exist.)
if 'collation' in dir(old_table):
logger.debug("Detected Collation property on table metadata, assuming DBR16+")
new_table = self._catalog_table(
old_table.identifier(),
self._catalog_type('EXTERNAL'),
old_table.storage(),
old_table.schema(),
old_table.provider(),
old_table.partitionColumnNames(),
old_table.bucketSpec(),
old_table.owner(),
old_table.createTime(),
old_table.lastAccessTime(),
old_table.createVersion(),
old_table.properties(),
old_table.stats(),
old_table.viewText(),
old_table.comment(),
old_table.collation(),
old_table.unsupportedFeatures(),
old_table.tracksPartitionsInCatalog(),
old_table.schemaPreservesCase(),
old_table.ignoredProperties(),
old_table.viewOriginalText(),
old_table.entityStorageLocations(),
old_table.resourceName(),
)
else:
logger.debug("No Collation property on table metadata, assuming DBR15 or older")
new_table = self._catalog_table(
old_table.identifier(),
self._catalog_type('EXTERNAL'),
old_table.storage(),
old_table.schema(),
old_table.provider(),
old_table.partitionColumnNames(),
old_table.bucketSpec(),
old_table.owner(),
old_table.createTime(),
old_table.lastAccessTime(),
old_table.createVersion(),
old_table.properties(),
old_table.stats(),
old_table.viewText(),
old_table.comment(),
old_table.unsupportedFeatures(),
old_table.tracksPartitionsInCatalog(),
old_table.schemaPreservesCase(),
old_table.ignoredProperties(),
old_table.viewOriginalText(),
old_table.entityStorageLocations(),
)
self._catalog.alterTable(new_table)
self._update_table_status(src_table, inventory_table)
except Exception as e: # pylint: disable=broad-exception-caught
Expand All @@ -392,7 +415,6 @@ def _convert_wasbs_table_to_abfss(self, src_table: Table) -> bool:
return False
try:
old_table = self._catalog.getTableMetadata(table_identifier)
entity_storage_locations = self._get_entity_storage_locations(old_table)
table_location = old_table.storage()
new_location = self._catalog_storage(
self._spark._jvm.scala.Some( # pylint: disable=protected-access
Expand All @@ -404,32 +426,58 @@ def _convert_wasbs_table_to_abfss(self, src_table: Table) -> bool:
table_location.compressed(),
table_location.properties(),
)
new_table = self._catalog_table(
old_table.identifier(),
old_table.tableType(),
new_location,
old_table.schema(),
old_table.provider(),
old_table.partitionColumnNames(),
old_table.bucketSpec(),
old_table.owner(),
old_table.createTime(),
old_table.lastAccessTime(),
old_table.createVersion(),
old_table.properties(),
old_table.stats(),
old_table.viewText(),
old_table.comment(),
old_table.unsupportedFeatures(),
old_table.tracksPartitionsInCatalog(),
old_table.schemaPreservesCase(),
old_table.ignoredProperties(),
old_table.viewOriginalText(),
# From DBR 16, there's a new constructor argument: entityStorageLocations (Seq[EntityStorageLocation])
# (We can't detect whether the argument is needed by the constructor, but assume that if the accessor
# is present on the source table then the argument is needed.)
*([entity_storage_locations] if entity_storage_locations is not None else []),
)
if 'collation' in dir(old_table):
logger.debug("Detected Collation property on table metadata, assuming DBR16+")
new_table = self._catalog_table(
old_table.identifier(),
old_table.tableType(),
new_location,
old_table.schema(),
old_table.provider(),
old_table.partitionColumnNames(),
old_table.bucketSpec(),
old_table.owner(),
old_table.createTime(),
old_table.lastAccessTime(),
old_table.createVersion(),
old_table.properties(),
old_table.stats(),
old_table.viewText(),
old_table.comment(),
old_table.collation(),
old_table.unsupportedFeatures(),
old_table.tracksPartitionsInCatalog(),
old_table.schemaPreservesCase(),
old_table.ignoredProperties(),
old_table.viewOriginalText(),
old_table.entityStorageLocations(),
old_table.resourceName(),
)
else:
logger.debug("No Collation property on table metadata, assuming DBR15 or older")
new_table = self._catalog_table(
old_table.identifier(),
old_table.tableType(),
new_location,
old_table.schema(),
old_table.provider(),
old_table.partitionColumnNames(),
old_table.bucketSpec(),
old_table.owner(),
old_table.createTime(),
old_table.lastAccessTime(),
old_table.createVersion(),
old_table.properties(),
old_table.stats(),
old_table.viewText(),
old_table.comment(),
old_table.unsupportedFeatures(),
old_table.tracksPartitionsInCatalog(),
old_table.schemaPreservesCase(),
old_table.ignoredProperties(),
old_table.viewOriginalText(),
old_table.entityStorageLocations(),
)
self._catalog.alterTable(new_table)
except Exception as e: # pylint: disable=broad-exception-caught
logger.warning(f"Error converting HMS table {src_table.name} to abfss: {e}", exc_info=True)
Expand Down
119 changes: 116 additions & 3 deletions tests/unit/hive_metastore/test_table_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,81 @@
def mock_pyspark(mocker):
pyspark_sql_session = mocker.Mock()
sys.modules["pyspark.sql.session"] = pyspark_sql_session
yield pyspark_sql_session


@pytest.fixture
def mock_table_location(mocker):
table_location = mocker.Mock()
table_location.inputFormat.return_value = "org.apache.hadoop.mapred.TextInputFormat"
table_location.outputFormat.return_value = "org.apache.hadoop.mapred.TextOutputFormat"
table_location.serde.return_value = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"
table_location.compressed.return_value = False
table_location.properties.return_value = {
"serialization.format": "1",
"field.delim": ",",
}
return table_location


@pytest.fixture
def mock_hms_table_dbr15(mocker, mock_table_location):
table_metadata = mocker.Mock()
table_metadata.identifier.return_value = "hive_metastore.db1_src.managed_dbfs"
table_metadata.tableType.return_value = "MANAGED"
table_metadata.storage.return_value = mock_table_location
table_metadata.schema.return_value = "db1_src"
table_metadata.provider.return_value = "DELTA"
table_metadata.partitionColumnNames.return_value = []
table_metadata.bucketSpec.return_value = None
table_metadata.owner.return_value = "owner"
table_metadata.createTime.return_value = 1600000000
table_metadata.lastAccessTime.return_value = None
table_metadata.createVersion.return_value = None
table_metadata.properties.return_value = {}
table_metadata.stats.return_value = None
table_metadata.viewText.return_value = None
table_metadata.comment.return_value = None
table_metadata.unsupportedFeatures.return_value = []
table_metadata.tracksPartitionsInCatalog.return_value = False
table_metadata.schemaPreservesCase.return_value = False
table_metadata.ignoredProperties.return_value = []
table_metadata.viewOriginalText.return_value = None
table_metadata.entityStorageLocations.return_value = {}
mock_catalog = mocker.Mock()
mock_catalog.getTableMetadata.return_value = table_metadata
return mock_catalog


@pytest.fixture
def mock_hms_table_dbr16(mocker, mock_table_location):
table_metadata = mocker.Mock()
table_metadata.identifier.return_value = "hive_metastore.db1_src.managed_mnt"
table_metadata.tableType.return_value = "MANAGED"
table_metadata.storage.return_value = mock_table_location
table_metadata.schema.return_value = "db1_src"
table_metadata.provider.return_value = "DELTA"
table_metadata.partitionColumnNames.return_value = ["dt"]
table_metadata.bucketSpec.return_value = None
table_metadata.owner.return_value = "owner2"
table_metadata.createTime.return_value = 1610000000
table_metadata.lastAccessTime.return_value = None
table_metadata.createVersion.return_value = None
table_metadata.properties.return_value = {"some": "prop"}
table_metadata.stats.return_value = None
table_metadata.viewText.return_value = None
table_metadata.comment.return_value = "test comment"
table_metadata.collation.return_value = None
table_metadata.unsupportedFeatures.return_value = []
table_metadata.tracksPartitionsInCatalog.return_value = True
table_metadata.schemaPreservesCase.return_value = False
table_metadata.ignoredProperties.return_value = []
table_metadata.viewOriginalText.return_value = None
table_metadata.entityStorageLocations.return_value = {"default": "s3://some_location/table"}
table_metadata.resourceName.return_value = "hive_metastore.db1_src.managed_mnt"
mock_catalog = mocker.Mock()
mock_catalog.getTableMetadata.return_value = table_metadata
return mock_catalog


def test_migrate_dbfs_root_tables_should_produce_proper_queries(ws, mock_pyspark):
Expand Down Expand Up @@ -204,7 +279,10 @@ def test_migrate_external_tables_should_produce_proper_queries(ws, mock_pyspark)
]


def test_migrate_managed_table_as_external_tables_with_conversion(ws, mock_pyspark):
@pytest.mark.parametrize("dbr_16", [True, False])
def test_migrate_managed_table_as_external_tables_with_conversion(
ws, mock_pyspark, mock_hms_table_dbr15, mock_hms_table_dbr16, dbr_16, caplog, mocker
):
errors = {}
rows = {r"SYNC .*": MockBackend.rows("status_code", "description")[("SUCCESS", "test")]}
crawler_backend = MockBackend(fails_on_first=errors, rows=rows)
Expand All @@ -217,16 +295,47 @@ def test_migrate_managed_table_as_external_tables_with_conversion(ws, mock_pyspa
table_migrate = TablesMigrator(
table_crawler, ws, backend, table_mapping, migration_status_refresher, migrate_grants, external_locations
)
table_migrate.convert_managed_hms_to_external(managed_table_external_storage="CONVERT_TO_EXTERNAL")

caplog.set_level(logging.DEBUG)

# Mock Spark session to return different table metadata based on DBR version
mock_session = mocker.Mock()
mock_session_state = mocker.Mock()
if dbr_16:
mock_session_state.catalog.return_value = mock_hms_table_dbr16
else:
mock_session_state.catalog.return_value = mock_hms_table_dbr15
mock_session._jsparkSession.sessionState.return_value = mock_session_state # pylint: disable=protected-access
mock_pyspark.SparkSession.builder.getOrCreate.return_value = mock_session
table_migrate.convert_managed_hms_to_external(managed_table_external_storage="CONVERT_TO_EXTERNAL")
external_locations.resolve_mount.assert_not_called()
migrate_grants.apply.assert_not_called()
assert backend.queries == [
"UPDATE `hive_metastore`.`inventory_database`.`tables` SET object_type = 'EXTERNAL' WHERE catalog='hive_metastore' AND database='db1_src' AND name='managed_other';"
]
if dbr_16:
assert "DBR16" in caplog.text
else:
assert "DBR15" in caplog.text


@pytest.mark.parametrize("dbr_16", [True, False])
def test_convert_wasbs_to_adls_gen2(
ws, mock_pyspark, caplog, mock_hms_table_dbr15, mock_hms_table_dbr16, dbr_16, mocker
):
caplog.set_level(logging.DEBUG)

# Mock Spark session to return different table metadata based on DBR version
mock_session = mocker.Mock()
mock_session_state = mocker.Mock()
if dbr_16:
mock_session_state.catalog.return_value = mock_hms_table_dbr16
else:
mock_session_state.catalog.return_value = mock_hms_table_dbr15

mock_session._jsparkSession.sessionState.return_value = mock_session_state # pylint: disable=protected-access
mock_pyspark.SparkSession.builder.getOrCreate.return_value = mock_session

def test_convert_wasbs_to_adls_gen2(ws, mock_pyspark):
errors = {}
rows = {r"SYNC .*": MockBackend.rows("status_code", "description")[("SUCCESS", "test")]}
crawler_backend = MockBackend(fails_on_first=errors, rows=rows)
Expand All @@ -245,6 +354,10 @@ def test_convert_wasbs_to_adls_gen2(ws, mock_pyspark):
assert backend.queries == [
"UPDATE `hive_metastore`.`inventory_database`.`tables` SET location = 'abfss://bucket/test/table1' WHERE catalog='hive_metastore' AND database='db1_src' AND name='wasbs_src';"
]
if dbr_16:
assert "DBR16" in caplog.text
else:
assert "DBR15" in caplog.text


def test_migrate_managed_table_as_external_tables_without_conversion(ws, mock_pyspark):
Expand Down
Loading