Skip to content

Commit 474a880

Browse files
authored
feat(lookml): Use Looker API to get fields and generate SQL using field chunks with ability to emit partial results (#15060)
1 parent 600e463 commit 474a880

File tree

15 files changed

+2994
-176
lines changed

15 files changed

+2994
-176
lines changed

metadata-ingestion/docs/sources/looker/lookml_post.md

Lines changed: 397 additions & 31 deletions
Large diffs are not rendered by default.

metadata-ingestion/docs/sources/looker/lookml_pre.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,13 @@ jobs:
122122
client_id: ${LOOKER_CLIENT_ID}
123123
client_secret: ${LOOKER_CLIENT_SECRET}
124124
base_url: ${LOOKER_BASE_URL}
125+
# Enable API-based lineage extraction (required for field splitting features)
126+
use_api_for_view_lineage: true
127+
# Optional: Large view handling configuration
128+
# field_threshold_for_splitting: 100
129+
# allow_partial_lineage_results: true
130+
# enable_individual_field_fallback: true
131+
# max_workers_for_parallel_processing: 10
125132
sink:
126133
type: datahub-rest
127134
config:

metadata-ingestion/docs/sources/looker/lookml_recipe.yml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,30 @@ source:
1818
client_id: ${LOOKER_CLIENT_ID}
1919
client_secret: ${LOOKER_CLIENT_SECRET}
2020

21+
# Enable API-based lineage extraction (REQUIRED for field splitting features)
22+
# When enabled, uses Looker API to get SQL representation of views for lineage parsing
23+
# This enables field splitting, parallel processing, and individual field fallback for large views
24+
# NOTE: Only works for "reachable views" - views that are referenced by explores in model files
25+
# Unreachable views will fall back to regex-based parsing
26+
use_api_for_view_lineage: true
27+
28+
# Control whether unreachable views are processed
29+
# If true (default), only views referenced by explores are processed
30+
# If false, all views are processed, but unreachable ones use regex parsing instead of API
31+
# emit_reachable_views_only: true
32+
33+
# Optional: Enable API caching for better performance
34+
# use_api_cache_for_view_lineage: true
35+
36+
# Large View Handling (for views with 100+ fields)
37+
# These options help handle large views with many fields by splitting them into chunks
38+
# and processing in parallel for better performance and reliability
39+
# NOTE: Requires 'api' configuration and 'use_api_for_view_lineage: true' to work
40+
# field_threshold_for_splitting: 100 # Split views with more than this many fields (default: 100)
41+
# allow_partial_lineage_results: true # Return partial lineage if some chunks fail (default: true)
42+
# enable_individual_field_fallback: true # Process fields individually if chunk fails (default: true)
43+
# max_workers_for_parallel_processing: 10 # Parallel workers for processing (default: 10, max: 100)
44+
2145
# Alternative to API section above if you want a purely file-based ingestion with no api calls to Looker or if you want to provide platform_instance ids for your connections
2246
# project_name: PROJECT_NAME # See (https://docs.looker.com/data-modeling/getting-started/how-project-works) to understand what is your project name
2347
# connection_to_platform_map:

metadata-ingestion/src/datahub/ingestion/source/looker/looker_lib_wrapper.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -228,10 +228,18 @@ def all_lookml_models(self) -> Sequence[LookmlModel]:
228228
transport_options=self.transport_options,
229229
)
230230

231-
def lookml_model_explore(self, model: str, explore_name: str) -> LookmlModelExplore:
231+
def lookml_model_explore(
232+
self,
233+
model: str,
234+
explore_name: str,
235+
fields: Optional[List[str]] = None,
236+
) -> LookmlModelExplore:
232237
self.client_stats.explore_calls += 1
233238
return self.client.lookml_model_explore(
234-
model, explore_name, transport_options=self.transport_options
239+
model,
240+
explore_name,
241+
fields=self.__fields_mapper(fields) if fields else None,
242+
transport_options=self.transport_options,
235243
)
236244

237245
@lru_cache(maxsize=1000)

metadata-ingestion/src/datahub/ingestion/source/looker/lookml_config.py

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from typing import Any, Dict, Literal, Optional, Union
66

77
import pydantic
8-
from pydantic import model_validator
8+
from pydantic import field_validator, model_validator
99
from pydantic.fields import Field
1010

1111
from datahub.configuration.common import AllowDenyPattern
@@ -211,6 +211,48 @@ class LookMLSourceConfig(
211211
"All if comments are evaluated to true for configured looker_environment value",
212212
)
213213

214+
field_threshold_for_splitting: int = Field(
215+
100,
216+
description="When the total number of fields returned by Looker API exceeds this threshold, "
217+
"the fields will be split into multiple API calls to avoid SQL parsing failures. "
218+
"This helps provide partial column and table lineage when dealing with large field sets.",
219+
)
220+
221+
allow_partial_lineage_results: bool = Field(
222+
True,
223+
description="When enabled, allows partial lineage results to be returned even when some field chunks "
224+
"fail or when there are SQL parsing errors. This provides better resilience for large field sets "
225+
"and ensures some lineage information is available rather than complete failure.",
226+
)
227+
228+
enable_individual_field_fallback: bool = Field(
229+
True,
230+
description="When enabled, if a field chunk fails, the system will attempt to process each field "
231+
"individually to maximize information and isolate problematic fields. This helps identify "
232+
"which specific fields are causing issues while still getting lineage for working fields.",
233+
)
234+
235+
max_workers_for_parallel_processing: int = Field(
236+
10,
237+
description="Maximum number of worker threads to use for parallel processing of field chunks and individual fields. "
238+
"Set to 1 to process everything sequentially. Higher values can improve performance but may increase memory usage. "
239+
"Maximum allowed value is 100 to prevent resource exhaustion.",
240+
)
241+
242+
@field_validator("max_workers_for_parallel_processing")
243+
def validate_max_workers(cls, v: int) -> int:
244+
if v < 1:
245+
raise ValueError(
246+
f"max_workers_for_parallel_processing must be at least 1, got {v}"
247+
)
248+
if v > 100:
249+
logger.warning(
250+
f"max_workers_for_parallel_processing is set to {v}, which exceeds the recommended maximum of 100. "
251+
f"This may cause resource exhaustion. Using 100 instead."
252+
)
253+
return 100
254+
return v
255+
214256
@model_validator(mode="before")
215257
@classmethod
216258
def convert_string_to_connection_def(cls, values: Dict[str, Any]) -> Dict[str, Any]:

metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py

Lines changed: 79 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import logging
22
import pathlib
33
import tempfile
4-
from collections import OrderedDict
4+
from collections import OrderedDict, defaultdict
55
from dataclasses import dataclass
66
from datetime import datetime, timezone
77
from typing import Dict, Iterable, List, Optional, Set, Tuple, Union
@@ -709,10 +709,14 @@ def get_internal_workunits(self) -> Iterable[Union[MetadataWorkUnit, Entity]]:
709709
# Value: Tuple(model file name, connection name)
710710
view_connection_map: Dict[str, Tuple[str, str]] = {}
711711

712-
# Map of view name to explore name for API-based view lineage
713-
# A view can be referenced by multiple explores, we only need one of the explores to use Looker Query API
714-
# Key: view_name, Value: explore_name
715-
view_to_explore_map: Dict[str, str] = {}
712+
# Map of view name to all possible explores for API-based view lineage
713+
# A view can be referenced by multiple explores, we'll optimize the assignment
714+
# Key: view_name, Value: set of explore_names
715+
view_to_explores: Dict[str, Set[str]] = defaultdict(set)
716+
717+
# Temporary map to keep track of the views in an explore
718+
# Key: explore_name, Value: set of view_names
719+
explore_to_views: Dict[str, Set[str]] = defaultdict(set)
716720

717721
# The ** means "this directory and all subdirectories", and hence should
718722
# include all the files we want.
@@ -789,8 +793,9 @@ def get_internal_workunits(self) -> Iterable[Union[MetadataWorkUnit, Entity]]:
789793
for view_name in explore.upstream_views:
790794
if self.source_config.emit_reachable_views_only:
791795
explore_reachable_views.add(view_name.include)
792-
# Build view to explore mapping for API-based view lineage
793-
view_to_explore_map[view_name.include] = explore.name
796+
797+
view_to_explores[view_name.include].add(explore.name)
798+
explore_to_views[explore.name].add(view_name.include)
794799
except Exception as e:
795800
self.reporter.report_warning(
796801
title="Failed to process explores",
@@ -804,6 +809,16 @@ def get_internal_workunits(self) -> Iterable[Union[MetadataWorkUnit, Entity]]:
804809
model.connection, set()
805810
)
806811

812+
view_to_explore_map = {}
813+
if view_to_explores and explore_to_views:
814+
view_to_explore_map = self._optimize_views_by_common_explore(
815+
view_to_explores, explore_to_views
816+
)
817+
else:
818+
logger.warning(
819+
f"Either view_to_explores: {view_to_explores} or explore_to_views: {explore_to_views} is empty"
820+
)
821+
807822
project_name = self.get_project_name(model_name)
808823

809824
looker_view_id_cache: LookerViewIdCache = LookerViewIdCache(
@@ -888,9 +903,7 @@ def get_internal_workunits(self) -> Iterable[Union[MetadataWorkUnit, Entity]]:
888903
config=self.source_config,
889904
ctx=self.ctx,
890905
looker_client=self.looker_client,
891-
view_to_explore_map=view_to_explore_map
892-
if view_to_explore_map
893-
else None,
906+
view_to_explore_map=view_to_explore_map,
894907
)
895908
except Exception as e:
896909
self.reporter.report_warning(
@@ -1040,5 +1053,61 @@ def report_skipped_unreachable_views(
10401053
context=(f"Project: {project}, View File Path: {path}"),
10411054
)
10421055

1056+
def _optimize_views_by_common_explore(
1057+
self,
1058+
view_to_explores: Dict[str, Set[str]],
1059+
explore_to_views: Dict[str, Set[str]],
1060+
) -> Dict[str, str]:
1061+
"""
1062+
Optimize view-to-explore mapping by grouping views to minimize API calls.
1063+
1064+
This uses a greedy algorithm that prioritizes explores that appear in the most views,
1065+
maximizing the number of views assigned to the same explore.
1066+
1067+
Args:
1068+
view_to_explores: Dict mapping view_name -> set of explore_names
1069+
explore_to_views: Dict mapping explore_name -> set of view_names
1070+
1071+
Returns:
1072+
Dict mapping view_name -> explore_name (optimized assignment)
1073+
"""
1074+
1075+
# Pre-compute explore sizes
1076+
explore_sizes = {
1077+
explore: len(views) for explore, views in explore_to_views.items()
1078+
}
1079+
1080+
# Build view-to-explore mapping using dynamic programming approach
1081+
view_to_explore: Dict[str, str] = {}
1082+
1083+
# For each view, find the explore with maximum size that contains it
1084+
for view_name, candidate_explores in view_to_explores.items():
1085+
if candidate_explores:
1086+
# Find explore with maximum size using max() with key function
1087+
# This assings the view to the explore with the most views that contains it
1088+
best_explore = max(
1089+
candidate_explores, key=lambda explore: explore_sizes[explore]
1090+
)
1091+
view_to_explore[view_name] = best_explore
1092+
1093+
# Log optimization results
1094+
unique_explores_used = len(set(view_to_explore.values()))
1095+
total_views = len(view_to_explore)
1096+
total_explores = len(explore_to_views)
1097+
1098+
if total_explores > 0:
1099+
efficiency = (1 - unique_explores_used / total_explores) * 100
1100+
logger.info(
1101+
f"View-explore optimization: Using {unique_explores_used}/{total_explores} "
1102+
f"explores for {total_views} views (efficiency: {efficiency:.1f}% savings)"
1103+
)
1104+
else:
1105+
logger.info(
1106+
f"View-explore optimization: No explores to optimize for {total_views} views"
1107+
)
1108+
1109+
logger.debug(f"Final View-to-explore mapping: {view_to_explore}")
1110+
return view_to_explore
1111+
10431112
def get_report(self):
10441113
return self.reporter

0 commit comments

Comments
 (0)