diff --git a/src/databricks/labs/ucx/hive_metastore/table_migrate.py b/src/databricks/labs/ucx/hive_metastore/table_migrate.py index 740de3901b..d0fcafbcbe 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migrate.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migrate.py @@ -322,54 +322,77 @@ def _catalog_storage(self): self._spark._jvm.org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat # pylint: disable=protected-access ) - @staticmethod - def _get_entity_storage_locations(table_metadata): - """Obtain the entityStorageLocations property for table metadata, if the property is present.""" - # This is needed because: - # - DBR 16.0 introduced entityStorageLocations as a property on table metadata, and this is required for - # as a constructor parameter for CatalogTable. - # - We need to be compatible with earlier versions of DBR. - # - The normal hasattr() check does not work with Py4J-based objects: it always returns True and non-existent - # methods will be automatically created on the proxy but fail when invoked. - # Instead the only approach is to use dir() to check if the method exists _prior_ to trying to access it. - # (After trying to access it, dir() will also include it even though it doesn't exist.) - return table_metadata.entityStorageLocations() if 'entityStorageLocations' in dir(table_metadata) else None - def _convert_hms_table_to_external(self, src_table: Table) -> bool: """Converts a Hive metastore table to external using Spark JVM methods.""" logger.info(f"Changing HMS managed table {src_table.name} to External Table type.") inventory_table = self._tables_crawler.full_name + database = self._spark._jvm.scala.Some(src_table.database) # pylint: disable=protected-access try: - database = self._spark._jvm.scala.Some(src_table.database) # pylint: disable=protected-access table_identifier = self._table_identifier(src_table.name, database) old_table = self._catalog.getTableMetadata(table_identifier) - entity_storage_locations = self._get_entity_storage_locations(old_table) - new_table = self._catalog_table( - old_table.identifier(), - self._catalog_type('EXTERNAL'), - old_table.storage(), - old_table.schema(), - old_table.provider(), - old_table.partitionColumnNames(), - old_table.bucketSpec(), - old_table.owner(), - old_table.createTime(), - old_table.lastAccessTime(), - old_table.createVersion(), - old_table.properties(), - old_table.stats(), - old_table.viewText(), - old_table.comment(), - old_table.unsupportedFeatures(), - old_table.tracksPartitionsInCatalog(), - old_table.schemaPreservesCase(), - old_table.ignoredProperties(), - old_table.viewOriginalText(), - # From DBR 16, there's a new constructor argument: entityStorageLocations (Seq[EntityStorageLocation]) - # (We can't detect whether the argument is needed by the constructor, but assume that if the accessor - # is present on the source table then the argument is needed.) - *([entity_storage_locations] if entity_storage_locations is not None else []), - ) + # two alternative ways to create the new_table object, one for DBR15 or older one for DBR16. + # Since we can't detect the DBR version from code, we try to detect if the entityStorageLocations + # accessor is present on the source table metadata object. + # This is needed because: + # - DBR 16.0 introduced entityStorageLocations as a property on table metadata, and this is required for + # as a constructor parameter for CatalogTable. + # - We need to be compatible with earlier versions of DBR. + # - The normal hasattr() check does not work with Py4J-based objects: it always returns True and non-existent + # methods will be automatically created on the proxy but fail when invoked. + # Instead the only approach is to use dir() to check if the method exists _prior_ to trying to access it. + # (After trying to access it, dir() will also include it even though it doesn't exist.) + if 'collation' in dir(old_table): + logger.debug("Detected Collation property on table metadata, assuming DBR16+") + new_table = self._catalog_table( + old_table.identifier(), + self._catalog_type('EXTERNAL'), + old_table.storage(), + old_table.schema(), + old_table.provider(), + old_table.partitionColumnNames(), + old_table.bucketSpec(), + old_table.owner(), + old_table.createTime(), + old_table.lastAccessTime(), + old_table.createVersion(), + old_table.properties(), + old_table.stats(), + old_table.viewText(), + old_table.comment(), + old_table.collation(), + old_table.unsupportedFeatures(), + old_table.tracksPartitionsInCatalog(), + old_table.schemaPreservesCase(), + old_table.ignoredProperties(), + old_table.viewOriginalText(), + old_table.entityStorageLocations(), + old_table.resourceName(), + ) + else: + logger.debug("No Collation property on table metadata, assuming DBR15 or older") + new_table = self._catalog_table( + old_table.identifier(), + self._catalog_type('EXTERNAL'), + old_table.storage(), + old_table.schema(), + old_table.provider(), + old_table.partitionColumnNames(), + old_table.bucketSpec(), + old_table.owner(), + old_table.createTime(), + old_table.lastAccessTime(), + old_table.createVersion(), + old_table.properties(), + old_table.stats(), + old_table.viewText(), + old_table.comment(), + old_table.unsupportedFeatures(), + old_table.tracksPartitionsInCatalog(), + old_table.schemaPreservesCase(), + old_table.ignoredProperties(), + old_table.viewOriginalText(), + old_table.entityStorageLocations(), + ) self._catalog.alterTable(new_table) self._update_table_status(src_table, inventory_table) except Exception as e: # pylint: disable=broad-exception-caught @@ -392,7 +415,6 @@ def _convert_wasbs_table_to_abfss(self, src_table: Table) -> bool: return False try: old_table = self._catalog.getTableMetadata(table_identifier) - entity_storage_locations = self._get_entity_storage_locations(old_table) table_location = old_table.storage() new_location = self._catalog_storage( self._spark._jvm.scala.Some( # pylint: disable=protected-access @@ -404,32 +426,58 @@ def _convert_wasbs_table_to_abfss(self, src_table: Table) -> bool: table_location.compressed(), table_location.properties(), ) - new_table = self._catalog_table( - old_table.identifier(), - old_table.tableType(), - new_location, - old_table.schema(), - old_table.provider(), - old_table.partitionColumnNames(), - old_table.bucketSpec(), - old_table.owner(), - old_table.createTime(), - old_table.lastAccessTime(), - old_table.createVersion(), - old_table.properties(), - old_table.stats(), - old_table.viewText(), - old_table.comment(), - old_table.unsupportedFeatures(), - old_table.tracksPartitionsInCatalog(), - old_table.schemaPreservesCase(), - old_table.ignoredProperties(), - old_table.viewOriginalText(), - # From DBR 16, there's a new constructor argument: entityStorageLocations (Seq[EntityStorageLocation]) - # (We can't detect whether the argument is needed by the constructor, but assume that if the accessor - # is present on the source table then the argument is needed.) - *([entity_storage_locations] if entity_storage_locations is not None else []), - ) + if 'collation' in dir(old_table): + logger.debug("Detected Collation property on table metadata, assuming DBR16+") + new_table = self._catalog_table( + old_table.identifier(), + old_table.tableType(), + new_location, + old_table.schema(), + old_table.provider(), + old_table.partitionColumnNames(), + old_table.bucketSpec(), + old_table.owner(), + old_table.createTime(), + old_table.lastAccessTime(), + old_table.createVersion(), + old_table.properties(), + old_table.stats(), + old_table.viewText(), + old_table.comment(), + old_table.collation(), + old_table.unsupportedFeatures(), + old_table.tracksPartitionsInCatalog(), + old_table.schemaPreservesCase(), + old_table.ignoredProperties(), + old_table.viewOriginalText(), + old_table.entityStorageLocations(), + old_table.resourceName(), + ) + else: + logger.debug("No Collation property on table metadata, assuming DBR15 or older") + new_table = self._catalog_table( + old_table.identifier(), + old_table.tableType(), + new_location, + old_table.schema(), + old_table.provider(), + old_table.partitionColumnNames(), + old_table.bucketSpec(), + old_table.owner(), + old_table.createTime(), + old_table.lastAccessTime(), + old_table.createVersion(), + old_table.properties(), + old_table.stats(), + old_table.viewText(), + old_table.comment(), + old_table.unsupportedFeatures(), + old_table.tracksPartitionsInCatalog(), + old_table.schemaPreservesCase(), + old_table.ignoredProperties(), + old_table.viewOriginalText(), + old_table.entityStorageLocations(), + ) self._catalog.alterTable(new_table) except Exception as e: # pylint: disable=broad-exception-caught logger.warning(f"Error converting HMS table {src_table.name} to abfss: {e}", exc_info=True) diff --git a/tests/unit/hive_metastore/test_table_migrate.py b/tests/unit/hive_metastore/test_table_migrate.py index bc5cbcb2bd..fb74f73742 100644 --- a/tests/unit/hive_metastore/test_table_migrate.py +++ b/tests/unit/hive_metastore/test_table_migrate.py @@ -50,6 +50,81 @@ def mock_pyspark(mocker): pyspark_sql_session = mocker.Mock() sys.modules["pyspark.sql.session"] = pyspark_sql_session + yield pyspark_sql_session + + +@pytest.fixture +def mock_table_location(mocker): + table_location = mocker.Mock() + table_location.inputFormat.return_value = "org.apache.hadoop.mapred.TextInputFormat" + table_location.outputFormat.return_value = "org.apache.hadoop.mapred.TextOutputFormat" + table_location.serde.return_value = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe" + table_location.compressed.return_value = False + table_location.properties.return_value = { + "serialization.format": "1", + "field.delim": ",", + } + return table_location + + +@pytest.fixture +def mock_hms_table_dbr15(mocker, mock_table_location): + table_metadata = mocker.Mock() + table_metadata.identifier.return_value = "hive_metastore.db1_src.managed_dbfs" + table_metadata.tableType.return_value = "MANAGED" + table_metadata.storage.return_value = mock_table_location + table_metadata.schema.return_value = "db1_src" + table_metadata.provider.return_value = "DELTA" + table_metadata.partitionColumnNames.return_value = [] + table_metadata.bucketSpec.return_value = None + table_metadata.owner.return_value = "owner" + table_metadata.createTime.return_value = 1600000000 + table_metadata.lastAccessTime.return_value = None + table_metadata.createVersion.return_value = None + table_metadata.properties.return_value = {} + table_metadata.stats.return_value = None + table_metadata.viewText.return_value = None + table_metadata.comment.return_value = None + table_metadata.unsupportedFeatures.return_value = [] + table_metadata.tracksPartitionsInCatalog.return_value = False + table_metadata.schemaPreservesCase.return_value = False + table_metadata.ignoredProperties.return_value = [] + table_metadata.viewOriginalText.return_value = None + table_metadata.entityStorageLocations.return_value = {} + mock_catalog = mocker.Mock() + mock_catalog.getTableMetadata.return_value = table_metadata + return mock_catalog + + +@pytest.fixture +def mock_hms_table_dbr16(mocker, mock_table_location): + table_metadata = mocker.Mock() + table_metadata.identifier.return_value = "hive_metastore.db1_src.managed_mnt" + table_metadata.tableType.return_value = "MANAGED" + table_metadata.storage.return_value = mock_table_location + table_metadata.schema.return_value = "db1_src" + table_metadata.provider.return_value = "DELTA" + table_metadata.partitionColumnNames.return_value = ["dt"] + table_metadata.bucketSpec.return_value = None + table_metadata.owner.return_value = "owner2" + table_metadata.createTime.return_value = 1610000000 + table_metadata.lastAccessTime.return_value = None + table_metadata.createVersion.return_value = None + table_metadata.properties.return_value = {"some": "prop"} + table_metadata.stats.return_value = None + table_metadata.viewText.return_value = None + table_metadata.comment.return_value = "test comment" + table_metadata.collation.return_value = None + table_metadata.unsupportedFeatures.return_value = [] + table_metadata.tracksPartitionsInCatalog.return_value = True + table_metadata.schemaPreservesCase.return_value = False + table_metadata.ignoredProperties.return_value = [] + table_metadata.viewOriginalText.return_value = None + table_metadata.entityStorageLocations.return_value = {"default": "s3://some_location/table"} + table_metadata.resourceName.return_value = "hive_metastore.db1_src.managed_mnt" + mock_catalog = mocker.Mock() + mock_catalog.getTableMetadata.return_value = table_metadata + return mock_catalog def test_migrate_dbfs_root_tables_should_produce_proper_queries(ws, mock_pyspark): @@ -204,7 +279,10 @@ def test_migrate_external_tables_should_produce_proper_queries(ws, mock_pyspark) ] -def test_migrate_managed_table_as_external_tables_with_conversion(ws, mock_pyspark): +@pytest.mark.parametrize("dbr_16", [True, False]) +def test_migrate_managed_table_as_external_tables_with_conversion( + ws, mock_pyspark, mock_hms_table_dbr15, mock_hms_table_dbr16, dbr_16, caplog, mocker +): errors = {} rows = {r"SYNC .*": MockBackend.rows("status_code", "description")[("SUCCESS", "test")]} crawler_backend = MockBackend(fails_on_first=errors, rows=rows) @@ -217,16 +295,47 @@ def test_migrate_managed_table_as_external_tables_with_conversion(ws, mock_pyspa table_migrate = TablesMigrator( table_crawler, ws, backend, table_mapping, migration_status_refresher, migrate_grants, external_locations ) - table_migrate.convert_managed_hms_to_external(managed_table_external_storage="CONVERT_TO_EXTERNAL") + caplog.set_level(logging.DEBUG) + + # Mock Spark session to return different table metadata based on DBR version + mock_session = mocker.Mock() + mock_session_state = mocker.Mock() + if dbr_16: + mock_session_state.catalog.return_value = mock_hms_table_dbr16 + else: + mock_session_state.catalog.return_value = mock_hms_table_dbr15 + mock_session._jsparkSession.sessionState.return_value = mock_session_state # pylint: disable=protected-access + mock_pyspark.SparkSession.builder.getOrCreate.return_value = mock_session + table_migrate.convert_managed_hms_to_external(managed_table_external_storage="CONVERT_TO_EXTERNAL") external_locations.resolve_mount.assert_not_called() migrate_grants.apply.assert_not_called() assert backend.queries == [ "UPDATE `hive_metastore`.`inventory_database`.`tables` SET object_type = 'EXTERNAL' WHERE catalog='hive_metastore' AND database='db1_src' AND name='managed_other';" ] + if dbr_16: + assert "DBR16" in caplog.text + else: + assert "DBR15" in caplog.text + + +@pytest.mark.parametrize("dbr_16", [True, False]) +def test_convert_wasbs_to_adls_gen2( + ws, mock_pyspark, caplog, mock_hms_table_dbr15, mock_hms_table_dbr16, dbr_16, mocker +): + caplog.set_level(logging.DEBUG) + + # Mock Spark session to return different table metadata based on DBR version + mock_session = mocker.Mock() + mock_session_state = mocker.Mock() + if dbr_16: + mock_session_state.catalog.return_value = mock_hms_table_dbr16 + else: + mock_session_state.catalog.return_value = mock_hms_table_dbr15 + mock_session._jsparkSession.sessionState.return_value = mock_session_state # pylint: disable=protected-access + mock_pyspark.SparkSession.builder.getOrCreate.return_value = mock_session -def test_convert_wasbs_to_adls_gen2(ws, mock_pyspark): errors = {} rows = {r"SYNC .*": MockBackend.rows("status_code", "description")[("SUCCESS", "test")]} crawler_backend = MockBackend(fails_on_first=errors, rows=rows) @@ -245,6 +354,10 @@ def test_convert_wasbs_to_adls_gen2(ws, mock_pyspark): assert backend.queries == [ "UPDATE `hive_metastore`.`inventory_database`.`tables` SET location = 'abfss://bucket/test/table1' WHERE catalog='hive_metastore' AND database='db1_src' AND name='wasbs_src';" ] + if dbr_16: + assert "DBR16" in caplog.text + else: + assert "DBR15" in caplog.text def test_migrate_managed_table_as_external_tables_without_conversion(ws, mock_pyspark):