Skip to content

Commit bcabe9f

Browse files
fix(bigquery): apply case normalization consistently for temp table inference (#15252)
1 parent 6358bbc commit bcabe9f

File tree

2 files changed

+49
-7
lines changed

2 files changed

+49
-7
lines changed

metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -527,19 +527,32 @@ def _process_schema(
527527
)
528528
elif self.store_table_refs:
529529
# Need table_refs to calculate lineage and usage
530+
logger.debug(
531+
f"Lightweight table discovery for dataset {dataset_name} in project {project_id}"
532+
)
530533
for table_item in self.schema_api.list_tables(dataset_name, project_id):
534+
table_type = getattr(table_item, "table_type", "UNKNOWN")
535+
531536
identifier = BigqueryTableIdentifier(
532537
project_id=project_id,
533538
dataset=dataset_name,
534539
table=table_item.table_id,
535540
)
541+
542+
logger.debug(f"Processing {table_type}: {identifier.raw_table_name()}")
543+
536544
if not self.config.table_pattern.allowed(identifier.raw_table_name()):
545+
logger.debug(
546+
f"Dropped by table_pattern: {identifier.raw_table_name()}"
547+
)
537548
self.report.report_dropped(identifier.raw_table_name())
538549
continue
539550
try:
540-
self.table_refs.add(
541-
str(BigQueryTableRef(identifier).get_sanitized_table_ref())
551+
table_ref = str(
552+
BigQueryTableRef(identifier).get_sanitized_table_ref()
542553
)
554+
self.table_refs.add(table_ref)
555+
logger.debug(f"Added to table_refs: {table_ref}")
543556
except Exception as e:
544557
logger.warning(
545558
f"Could not create table ref for {table_item.path}: {e}"
@@ -628,14 +641,24 @@ def _process_table(
628641
table_identifier = BigqueryTableIdentifier(project_id, dataset_name, table.name)
629642

630643
self.report.report_entity_scanned(table_identifier.raw_table_name())
644+
logger.debug(
645+
f"Full schema processing - Scanning TABLE: {table_identifier.raw_table_name()}"
646+
)
631647

632648
if not self.config.table_pattern.allowed(table_identifier.raw_table_name()):
649+
logger.debug(
650+
f"Full schema processing - Dropped TABLE by table_pattern: {table_identifier.raw_table_name()}"
651+
)
633652
self.report.report_dropped(table_identifier.raw_table_name())
634653
return
635654

636655
if self.store_table_refs:
637-
self.table_refs.add(
638-
str(BigQueryTableRef(table_identifier).get_sanitized_table_ref())
656+
table_ref = str(
657+
BigQueryTableRef(table_identifier).get_sanitized_table_ref()
658+
)
659+
self.table_refs.add(table_ref)
660+
logger.debug(
661+
f"Full schema processing - Added TABLE to table_refs: {table_ref}"
639662
)
640663
table.column_count = len(columns)
641664

@@ -668,13 +691,20 @@ def _process_view(
668691
table_identifier = BigqueryTableIdentifier(project_id, dataset_name, view.name)
669692

670693
self.report.report_entity_scanned(table_identifier.raw_table_name(), "view")
694+
logger.debug(
695+
f"Full schema processing - Scanning VIEW: {table_identifier.raw_table_name()}"
696+
)
671697

672698
if not self.config.view_pattern.allowed(table_identifier.raw_table_name()):
699+
logger.debug(
700+
f"Full schema processing - Dropped VIEW by view_pattern: {table_identifier.raw_table_name()}"
701+
)
673702
self.report.report_dropped(table_identifier.raw_table_name())
674703
return
675704

676705
table_ref = str(BigQueryTableRef(table_identifier).get_sanitized_table_ref())
677706
self.table_refs.add(table_ref)
707+
logger.debug(f"Full schema processing - Added VIEW to table_refs: {table_ref}")
678708
if view.view_definition:
679709
self.view_refs_by_project[project_id].add(table_ref)
680710
self.view_definitions[table_ref] = view.view_definition
@@ -720,6 +750,9 @@ def _process_snapshot(
720750

721751
table_ref = str(BigQueryTableRef(table_identifier).get_sanitized_table_ref())
722752
self.table_refs.add(table_ref)
753+
logger.debug(
754+
f"Full schema processing - Added SNAPSHOT to table_refs: {table_ref}"
755+
)
723756
if snapshot.base_table_identifier:
724757
self.snapshot_refs_by_project[project_id].add(table_ref)
725758
self.snapshots_by_ref[table_ref] = snapshot

metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -245,12 +245,18 @@ def is_temp_table(self, name: str) -> bool:
245245
# 1. this name would be allowed by the dataset patterns, and
246246
# 2. we have a list of discovered tables, and
247247
# 3. it's not in the discovered tables list
248+
248249
if (
249250
self.filters.is_allowed(table)
250251
and self.discovered_tables
251-
and str(BigQueryTableRef(table)) not in self.discovered_tables
252+
and self.identifiers.standardize_identifier_case(
253+
str(BigQueryTableRef(table))
254+
)
255+
not in self.discovered_tables
252256
):
253-
logger.debug(f"inferred as temp table {name}")
257+
logger.debug(
258+
f"Inferred as temp table {name} (is_allowed?{self.filters.is_allowed(table)})"
259+
)
254260
self.report.inferred_temp_tables.add(name)
255261
return True
256262

@@ -263,7 +269,10 @@ def is_allowed_table(self, name: str) -> bool:
263269
table = BigqueryTableIdentifier.from_string_name(name)
264270
if (
265271
self.discovered_tables
266-
and str(BigQueryTableRef(table)) not in self.discovered_tables
272+
and self.identifiers.standardize_identifier_case(
273+
str(BigQueryTableRef(table))
274+
)
275+
not in self.discovered_tables
267276
):
268277
logger.debug(f"not allowed table {name}")
269278
return False

0 commit comments

Comments
 (0)