Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -527,19 +527,32 @@ def _process_schema(
)
elif self.store_table_refs:
# Need table_refs to calculate lineage and usage
logger.debug(
f"Lightweight table discovery for dataset {dataset_name} in project {project_id}"
)
for table_item in self.schema_api.list_tables(dataset_name, project_id):
table_type = getattr(table_item, "table_type", "UNKNOWN")

identifier = BigqueryTableIdentifier(
project_id=project_id,
dataset=dataset_name,
table=table_item.table_id,
)

logger.debug(f"Processing {table_type}: {identifier.raw_table_name()}")

if not self.config.table_pattern.allowed(identifier.raw_table_name()):
logger.debug(
f"Dropped by table_pattern: {identifier.raw_table_name()}"
)
self.report.report_dropped(identifier.raw_table_name())
continue
try:
self.table_refs.add(
str(BigQueryTableRef(identifier).get_sanitized_table_ref())
table_ref = str(
BigQueryTableRef(identifier).get_sanitized_table_ref()
)
self.table_refs.add(table_ref)
logger.debug(f"Added to table_refs: {table_ref}")
except Exception as e:
logger.warning(
f"Could not create table ref for {table_item.path}: {e}"
Expand Down Expand Up @@ -628,14 +641,24 @@ def _process_table(
table_identifier = BigqueryTableIdentifier(project_id, dataset_name, table.name)

self.report.report_entity_scanned(table_identifier.raw_table_name())
logger.debug(
f"Full schema processing - Scanning TABLE: {table_identifier.raw_table_name()}"
)

if not self.config.table_pattern.allowed(table_identifier.raw_table_name()):
logger.debug(
f"Full schema processing - Dropped TABLE by table_pattern: {table_identifier.raw_table_name()}"
)
self.report.report_dropped(table_identifier.raw_table_name())
return

if self.store_table_refs:
self.table_refs.add(
str(BigQueryTableRef(table_identifier).get_sanitized_table_ref())
table_ref = str(
BigQueryTableRef(table_identifier).get_sanitized_table_ref()
)
self.table_refs.add(table_ref)
logger.debug(
f"Full schema processing - Added TABLE to table_refs: {table_ref}"
)
table.column_count = len(columns)

Expand Down Expand Up @@ -668,13 +691,20 @@ def _process_view(
table_identifier = BigqueryTableIdentifier(project_id, dataset_name, view.name)

self.report.report_entity_scanned(table_identifier.raw_table_name(), "view")
logger.debug(
f"Full schema processing - Scanning VIEW: {table_identifier.raw_table_name()}"
)

if not self.config.view_pattern.allowed(table_identifier.raw_table_name()):
logger.debug(
f"Full schema processing - Dropped VIEW by view_pattern: {table_identifier.raw_table_name()}"
)
self.report.report_dropped(table_identifier.raw_table_name())
return

table_ref = str(BigQueryTableRef(table_identifier).get_sanitized_table_ref())
self.table_refs.add(table_ref)
logger.debug(f"Full schema processing - Added VIEW to table_refs: {table_ref}")
if view.view_definition:
self.view_refs_by_project[project_id].add(table_ref)
self.view_definitions[table_ref] = view.view_definition
Expand Down Expand Up @@ -720,6 +750,9 @@ def _process_snapshot(

table_ref = str(BigQueryTableRef(table_identifier).get_sanitized_table_ref())
self.table_refs.add(table_ref)
logger.debug(
f"Full schema processing - Added SNAPSHOT to table_refs: {table_ref}"
)
if snapshot.base_table_identifier:
self.snapshot_refs_by_project[project_id].add(table_ref)
self.snapshots_by_ref[table_ref] = snapshot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,12 +245,18 @@ def is_temp_table(self, name: str) -> bool:
# 1. this name would be allowed by the dataset patterns, and
# 2. we have a list of discovered tables, and
# 3. it's not in the discovered tables list

if (
self.filters.is_allowed(table)
and self.discovered_tables
and str(BigQueryTableRef(table)) not in self.discovered_tables
and self.identifiers.standardize_identifier_case(
str(BigQueryTableRef(table))
)
not in self.discovered_tables
):
logger.debug(f"inferred as temp table {name}")
logger.debug(
f"Inferred as temp table {name} (is_allowed?{self.filters.is_allowed(table)})"
)
self.report.inferred_temp_tables.add(name)
return True

Expand All @@ -263,7 +269,10 @@ def is_allowed_table(self, name: str) -> bool:
table = BigqueryTableIdentifier.from_string_name(name)
if (
self.discovered_tables
and str(BigQueryTableRef(table)) not in self.discovered_tables
and self.identifiers.standardize_identifier_case(
str(BigQueryTableRef(table))
)
not in self.discovered_tables
):
logger.debug(f"not allowed table {name}")
return False
Expand Down
Loading