diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py index da05aaf5e4e177..cb37cb6e390d7b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py @@ -527,19 +527,32 @@ def _process_schema( ) elif self.store_table_refs: # Need table_refs to calculate lineage and usage + logger.debug( + f"Lightweight table discovery for dataset {dataset_name} in project {project_id}" + ) for table_item in self.schema_api.list_tables(dataset_name, project_id): + table_type = getattr(table_item, "table_type", "UNKNOWN") + identifier = BigqueryTableIdentifier( project_id=project_id, dataset=dataset_name, table=table_item.table_id, ) + + logger.debug(f"Processing {table_type}: {identifier.raw_table_name()}") + if not self.config.table_pattern.allowed(identifier.raw_table_name()): + logger.debug( + f"Dropped by table_pattern: {identifier.raw_table_name()}" + ) self.report.report_dropped(identifier.raw_table_name()) continue try: - self.table_refs.add( - str(BigQueryTableRef(identifier).get_sanitized_table_ref()) + table_ref = str( + BigQueryTableRef(identifier).get_sanitized_table_ref() ) + self.table_refs.add(table_ref) + logger.debug(f"Added to table_refs: {table_ref}") except Exception as e: logger.warning( f"Could not create table ref for {table_item.path}: {e}" @@ -628,14 +641,24 @@ def _process_table( table_identifier = BigqueryTableIdentifier(project_id, dataset_name, table.name) self.report.report_entity_scanned(table_identifier.raw_table_name()) + logger.debug( + f"Full schema processing - Scanning TABLE: {table_identifier.raw_table_name()}" + ) if not self.config.table_pattern.allowed(table_identifier.raw_table_name()): + logger.debug( + f"Full schema processing - Dropped TABLE by table_pattern: {table_identifier.raw_table_name()}" + ) self.report.report_dropped(table_identifier.raw_table_name()) return if self.store_table_refs: - self.table_refs.add( - str(BigQueryTableRef(table_identifier).get_sanitized_table_ref()) + table_ref = str( + BigQueryTableRef(table_identifier).get_sanitized_table_ref() + ) + self.table_refs.add(table_ref) + logger.debug( + f"Full schema processing - Added TABLE to table_refs: {table_ref}" ) table.column_count = len(columns) @@ -668,13 +691,20 @@ def _process_view( table_identifier = BigqueryTableIdentifier(project_id, dataset_name, view.name) self.report.report_entity_scanned(table_identifier.raw_table_name(), "view") + logger.debug( + f"Full schema processing - Scanning VIEW: {table_identifier.raw_table_name()}" + ) if not self.config.view_pattern.allowed(table_identifier.raw_table_name()): + logger.debug( + f"Full schema processing - Dropped VIEW by view_pattern: {table_identifier.raw_table_name()}" + ) self.report.report_dropped(table_identifier.raw_table_name()) return table_ref = str(BigQueryTableRef(table_identifier).get_sanitized_table_ref()) self.table_refs.add(table_ref) + logger.debug(f"Full schema processing - Added VIEW to table_refs: {table_ref}") if view.view_definition: self.view_refs_by_project[project_id].add(table_ref) self.view_definitions[table_ref] = view.view_definition @@ -720,6 +750,9 @@ def _process_snapshot( table_ref = str(BigQueryTableRef(table_identifier).get_sanitized_table_ref()) self.table_refs.add(table_ref) + logger.debug( + f"Full schema processing - Added SNAPSHOT to table_refs: {table_ref}" + ) if snapshot.base_table_identifier: self.snapshot_refs_by_project[project_id].add(table_ref) self.snapshots_by_ref[table_ref] = snapshot diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py index 8097890d38afcf..fcd7e8b7491450 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py @@ -245,12 +245,18 @@ def is_temp_table(self, name: str) -> bool: # 1. this name would be allowed by the dataset patterns, and # 2. we have a list of discovered tables, and # 3. it's not in the discovered tables list + if ( self.filters.is_allowed(table) and self.discovered_tables - and str(BigQueryTableRef(table)) not in self.discovered_tables + and self.identifiers.standardize_identifier_case( + str(BigQueryTableRef(table)) + ) + not in self.discovered_tables ): - logger.debug(f"inferred as temp table {name}") + logger.debug( + f"Inferred as temp table {name} (is_allowed?{self.filters.is_allowed(table)})" + ) self.report.inferred_temp_tables.add(name) return True @@ -263,7 +269,10 @@ def is_allowed_table(self, name: str) -> bool: table = BigqueryTableIdentifier.from_string_name(name) if ( self.discovered_tables - and str(BigQueryTableRef(table)) not in self.discovered_tables + and self.identifiers.standardize_identifier_case( + str(BigQueryTableRef(table)) + ) + not in self.discovered_tables ): logger.debug(f"not allowed table {name}") return False