From 6b0d2cb0b0916d5b40a010601a6efa3577238cf9 Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Mon, 10 Nov 2025 16:26:08 +0000 Subject: [PATCH 1/2] feat(metabase): Add comprehensive lineage extraction with tags and models - Dashboard-level lineage (table-to-dashboard via datasetEdges) - Native SQL query lineage parsing - Query builder lineage extraction - Nested query lineage (card-to-card references) - Collection tags on dashboards, charts, and models - Metabase Models as Dataset entities - Comprehensive unit and integration tests --- .../src/datahub/ingestion/source/metabase.py | 526 ++++++++++++- .../metabase/metabase_mces_golden.json | 92 +++ .../tests/unit/test_metabase_source.py | 704 ++++++++++++++++++ 3 files changed, 1296 insertions(+), 26 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/metabase.py b/metadata-ingestion/src/datahub/ingestion/source/metabase.py index d9a0c0b249ab32..a9a459e5aaa53c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/metabase.py +++ b/metadata-ingestion/src/datahub/ingestion/source/metabase.py @@ -44,6 +44,7 @@ from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import ( ChartSnapshot, DashboardSnapshot, + DatasetSnapshot, ) from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent from datahub.metadata.schema_classes import ( @@ -52,10 +53,17 @@ ChartQueryTypeClass, ChartTypeClass, DashboardInfoClass, + DatasetPropertiesClass, EdgeClass, + GlobalTagsClass, OwnerClass, OwnershipClass, OwnershipTypeClass, + SubTypesClass, + TagAssociationClass, + UpstreamClass, + UpstreamLineageClass, + ViewPropertiesClass, ) from datahub.sql_parsing.sqlglot_lineage import create_lineage_sql_parsed_result from datahub.utilities import config_clean @@ -113,11 +121,21 @@ class MetabaseConfig( default=False, description="Flag that if true, exclude other user collections", ) + extract_collections_as_tags: bool = Field( + default=True, + description="Extract Metabase collections as tags on dashboards and charts", + ) + extract_models: bool = Field( + default=True, + description="Extract Metabase models (saved questions used as data sources) as datasets", + ) stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None @field_validator("connect_uri", "display_uri", mode="after") @classmethod - def remove_trailing_slash(cls, v): + def remove_trailing_slash(cls, v: Optional[str]) -> Optional[str]: + if v is None: + return None return config_clean.remove_trailing_slashes(v) @model_validator(mode="after") @@ -136,48 +154,157 @@ class MetabaseReport(StaleEntityRemovalSourceReport): @config_class(MetabaseConfig) @support_status(SupportStatus.CERTIFIED) @capability(SourceCapability.PLATFORM_INSTANCE, "Enabled by default") -@capability(SourceCapability.LINEAGE_COARSE, "Supported by default") +@capability( + SourceCapability.LINEAGE_COARSE, "Supported by default for charts and dashboards" +) class MetabaseSource(StatefulIngestionSourceBase): """ - This plugin extracts Charts, dashboards, and associated metadata. This plugin is in beta and has only been tested - on PostgreSQL and H2 database. - - ### Collection + This plugin extracts Charts, Dashboards, Models, and their associated metadata from Metabase. + It supports comprehensive lineage extraction, including table-to-chart, table-to-dashboard, and nested query lineage. - [/api/collection](https://www.metabase.com/docs/latest/api/collection) endpoint is used to - retrieve the available collections. + ### Entities Extracted - [/api/collection//items?models=dashboard](https://www.metabase.com/docs/latest/api/collection#get-apicollectioniditems) endpoint is used to retrieve a given collection and list their dashboards. + The connector extracts the following Metabase entities into DataHub: + - **Dashboards** → DataHub Dashboards + - **Charts (Cards/Questions)** → DataHub Charts + - **Models** → DataHub Datasets (with "Model" and "View" subtypes) + - **Collections** → DataHub Tags - ### Dashboard + ### Dashboard Extraction - [/api/dashboard/](https://www.metabase.com/docs/latest/api/dashboard) endpoint is used to retrieve a given Dashboard and grab its information. + Dashboards are extracted using the [/api/dashboard](https://www.metabase.com/docs/latest/api/dashboard) endpoint. + **Extracted Information:** - Title and description - - Last edited by - - Owner + - Owner and last modified user - Link to the dashboard in Metabase - - Associated charts + - Associated charts (via chartEdges) + - **Lineage to upstream database tables** (via datasetEdges) + - Collection tags for organization - ### Chart + **Dashboard Lineage:** The connector automatically aggregates table dependencies from all charts within a dashboard, + creating direct table-to-dashboard lineage. This enables: + - Impact analysis: See which dashboards are affected when a table changes + - Data discovery: Find dashboards consuming specific datasets + - Dependency tracking: Understand the complete data flow - [/api/card](https://www.metabase.com/docs/latest/api-documentation.html#card) endpoint is used to - retrieve the following information. + ### Chart Extraction + Charts (Metabase Cards/Questions) are extracted using the [/api/card](https://www.metabase.com/docs/latest/api-documentation.html#card) endpoint. + + **Extracted Information:** - Title and description - - Last edited by - - Owner + - Owner and last modified user - Link to the chart in Metabase - - Datasource and lineage + - Chart type (bar, line, table, pie, etc.) + - Source dataset lineage + - SQL query (for native queries) + - Collection tags - The following properties for a chart are ingested in DataHub. + **Custom Properties:** - | Name | Description | + | Property | Description | | ------------- | ----------------------------------------------- | - | `Dimensions` | Column names | + | `Dimensions` | Column names used in the visualization | | `Filters` | Any filters applied to the chart | - | `Metrics` | All columns that are being used for aggregation | - + | `Metrics` | Columns used for aggregation (COUNT, SUM, etc.) | + + ### Lineage Extraction + + The connector provides comprehensive lineage extraction with support for multiple query types: + + #### 1. Native SQL Query Lineage + - Parses SQL queries using DataHub's SQL parser + - Extracts table references from SELECT, JOIN, and subqueries + - Handles Metabase template variables (`{{variable}}`) and optional clauses (`[[WHERE ...]]`) + - Creates lineage from source tables to charts + + #### 2. Query Builder Lineage + - Extracts source tables from Metabase's visual query builder + - Maps `source-table` IDs to actual database tables + - Creates lineage from tables to charts + + #### 3. Nested Query Lineage + - Handles charts built on top of other charts (using `card__` references) + - Recursively resolves the chain to find ultimate source tables + - Useful for multi-layered analysis workflows + + #### 4. Dashboard-Level Lineage + - Aggregates lineage from all charts in a dashboard + - Creates direct table-to-dashboard edges (via `datasetEdges`) + - Automatically deduplicates tables referenced by multiple charts + - Enables dashboard-level impact analysis + + #### 5. Model Lineage + - Extracts upstream table dependencies for Metabase Models + - Models appear as Dataset entities with lineage to their source tables + - Supports both SQL-based and query builder-based models + + ### Collection Tags + + Metabase Collections are mapped to DataHub tags for better organization and discoverability: + - Collection names are automatically converted to tags (e.g., "Sales Dashboard" → `metabase_collection_sales_dashboard`) + - Tags are applied to dashboards, charts, and models within that collection + - Enables filtering by collection in DataHub + - Helps organize assets by team, department, or project + + **Configuration:** Set `extract_collections_as_tags: false` to disable this feature. + + ### Metabase Models + + Metabase Models (introduced in v0.41) are saved questions that can be used as data sources for other questions. + The connector extracts Models as DataHub Dataset entities: + + **Model as Datasets:** + - URN format: `urn:li:dataset:(urn:li:dataPlatform:metabase,model.{model_id},PROD)` + - SubTypes: `["Model", "View"]` to indicate virtual dataset nature + - ViewProperties: Preserves the underlying SQL query + - Upstream lineage: Connects models to their source tables + - Collection tags: Applied based on model's collection membership + + **Why Extract Models:** + - **Lineage tracking**: See dependencies between models and source tables + - **Impact analysis**: Understand which models are affected by table changes + - **Documentation**: Model queries and descriptions are preserved + - **Discovery**: Find reusable data models in your organization + + **Configuration:** Set `extract_models: false` to disable model extraction. + + ### Collections + + The [/api/collection](https://www.metabase.com/docs/latest/api/collection) endpoint is used to: + - Retrieve available collections + - List dashboards within each collection via [/api/collection/{id}/items?models=dashboard](https://www.metabase.com/docs/latest/api/collection#get-apicollectioniditems) + - Map collections to tags on assets + + ### Configuration Examples + + ```yaml + # Enable all features (default) + source: + type: metabase + config: + connect_uri: https://metabase.company.com + username: datahub_user + password: secure_password + extract_collections_as_tags: true # Map collections to tags + extract_models: true # Extract Metabase Models as datasets + + # Disable optional features + source: + type: metabase + config: + connect_uri: https://metabase.company.com + api_key: your-api-key # Recommended over username/password + extract_collections_as_tags: false # Don't create tags + extract_models: false # Don't extract models + ``` + + ### Compatibility + + - Tested with Metabase v0.41+ (Models require v0.41+) + - Works with various database backends: PostgreSQL, MySQL, BigQuery, Snowflake, Redshift, and more + - Supports both username/password and API key authentication (API key recommended) """ @@ -185,7 +312,7 @@ class MetabaseSource(StatefulIngestionSourceBase): report: MetabaseReport platform = "metabase" - def __hash__(self): + def __hash__(self) -> int: return id(self) def __init__(self, ctx: PipelineContext, config: MetabaseConfig): @@ -354,6 +481,11 @@ def construct_dashboard_from_api_data( ) ) + # Lineage - extract table dependencies from dashboard charts + dataset_edges = self.construct_dashboard_lineage( + dashboard_details, last_modified.lastModified + ) + dashboard_info_class = DashboardInfoClass( description=description, title=title, @@ -361,6 +493,7 @@ def construct_dashboard_from_api_data( lastModified=last_modified, dashboardUrl=f"{self.config.display_uri}/dashboard/{dashboard_id}", customProperties={}, + datasetEdges=dataset_edges if dataset_edges else None, ) dashboard_snapshot.aspects.append(dashboard_info_class) @@ -369,8 +502,170 @@ def construct_dashboard_from_api_data( if ownership is not None: dashboard_snapshot.aspects.append(ownership) + # Tags - extract from collection + tags = self._get_tags_from_collection(dashboard_details.get("collection_id")) + if tags is not None: + dashboard_snapshot.aspects.append(tags) + return dashboard_snapshot + def construct_dashboard_lineage( + self, dashboard_details: dict, last_modified: AuditStamp + ) -> Optional[List[EdgeClass]]: + """ + Construct dashboard lineage by extracting table dependencies from all charts in the dashboard. + This creates lineage from database tables to the dashboard using datasetEdges. + """ + upstream_tables = [] + cards_data = dashboard_details.get("dashcards", {}) + + for card_info in cards_data: + card_id = card_info.get("card", {}).get("id") + if not card_id: + continue + + # Get detailed card information + card_details = self.get_card_details_by_id(card_id) + if not card_details: + continue + + # Extract table URNs from the card + table_urns = self._get_table_urns_from_card(card_details) + if table_urns: + upstream_tables.extend(table_urns) + + # Remove duplicates + unique_table_urns = list(set(upstream_tables)) + + if not unique_table_urns: + return None + + # Create dataset edges for dashboard lineage + dataset_edges = [ + EdgeClass( + destinationUrn=table_urn, + lastModified=last_modified, + ) + for table_urn in unique_table_urns + ] + + return dataset_edges + + def _get_table_urns_from_card(self, card_details: dict) -> List[str]: + """ + Extract table URNs from a card (question/chart) by analyzing its query. + Supports both native SQL queries and query builder queries. + """ + table_urns = [] + + query_type = card_details.get("dataset_query", {}).get("type") + + if query_type == "native": + # Handle native SQL queries + table_urns = self._get_table_urns_from_native_query(card_details) + elif query_type == "query": + # Handle query builder queries + table_urns = self._get_table_urns_from_query_builder(card_details) + + return table_urns + + def _get_table_urns_from_native_query(self, card_details: dict) -> List[str]: + """ + Extract table URNs from a native SQL query by parsing the SQL. + """ + datasource_id = card_details.get("database_id") + if not datasource_id: + return [] + + ( + platform, + database_name, + database_schema, + platform_instance, + ) = self.get_datasource_from_id(datasource_id) + + if not platform: + return [] + + raw_query = ( + card_details.get("dataset_query", {}).get("native", {}).get("query", "") + ) + + if not raw_query: + return [] + + # Strip template expressions before parsing + raw_query_stripped = self.strip_template_expressions(raw_query) + + # Parse SQL to extract table references + result = create_lineage_sql_parsed_result( + query=raw_query_stripped, + default_db=database_name, + default_schema=database_schema or self.config.default_schema, + platform=platform, + platform_instance=platform_instance, + env=self.config.env, + graph=self.ctx.graph, + ) + + if result.debug_info.table_error: + logger.debug( + f"Failed to parse lineage from query: {result.debug_info.table_error}" + ) + + return result.in_tables if result.in_tables else [] + + def _get_table_urns_from_query_builder(self, card_details: dict) -> List[str]: + """ + Extract table URNs from a query builder query by getting the source table ID. + """ + source_table_id = ( + card_details.get("dataset_query", {}).get("query", {}).get("source-table") + ) + + if not source_table_id: + return [] + + # Check if this is a nested query (references another card) + if str(source_table_id).startswith("card__"): + # Recursively get the source table from the referenced card + referenced_card_id = source_table_id.replace("card__", "") + referenced_card = self.get_card_details_by_id(referenced_card_id) + if referenced_card: + return self._get_table_urns_from_card(referenced_card) + return [] + + # Get table details + datasource_id = card_details.get("database_id") + if not datasource_id: + return [] + + ( + platform, + database_name, + database_schema, + platform_instance, + ) = self.get_datasource_from_id(datasource_id) + + if not platform: + return [] + + schema_name, table_name = self.get_source_table_from_id(source_table_id) + + if not table_name: + return [] + + # Build the dataset URN + name_components = [database_name, schema_name, table_name] + table_urn = builder.make_dataset_urn_with_platform_instance( + platform=platform, + name=".".join([v for v in name_components if v]), + platform_instance=platform_instance, + env=self.config.env, + ) + + return [table_urn] + @lru_cache(maxsize=None) def _get_ownership(self, creator_id: int) -> Optional[OwnershipClass]: user_info_url = f"{self.config.connect_uri}/api/user/{creator_id}" @@ -411,6 +706,48 @@ def _get_ownership(self, creator_id: int) -> Optional[OwnershipClass]: return None + def _get_tags_from_collection( + self, collection_id: Optional[Union[int, str]] + ) -> Optional[GlobalTagsClass]: + """ + Extract tags from a Metabase collection. + Maps collection names to DataHub tags for better organization and searchability. + """ + if not self.config.extract_collections_as_tags or not collection_id: + return None + + # Find the collection by ID + collection = None + try: + collections_response = self.session.get( + f"{self.config.connect_uri}/api/collection/" + f"?exclude-other-user-collections={json.dumps(self.config.exclude_other_user_collections)}" + ) + collections_response.raise_for_status() + collections = collections_response.json() + + # Find matching collection + for coll in collections: + if str(coll.get("id")) == str(collection_id): + collection = coll + break + + except HTTPError as http_error: + logger.debug( + f"Failed to retrieve collection {collection_id}: {str(http_error)}" + ) + return None + + if not collection: + return None + + # Create tag URN from collection name + # Sanitize collection name for use in tag + collection_name = collection.get("name", "").replace(" ", "_").lower() + tag_urn = builder.make_tag_urn(f"metabase_collection_{collection_name}") + + return GlobalTagsClass(tags=[TagAssociationClass(tag=tag_urn)]) + def emit_card_mces(self) -> Iterable[MetadataWorkUnit]: try: card_response = self.session.get(f"{self.config.connect_uri}/api/card") @@ -533,6 +870,11 @@ def construct_card_from_api_data(self, card_data: dict) -> Optional[ChartSnapsho if ownership is not None: chart_snapshot.aspects.append(ownership) + # Tags - extract from collection + tags = self._get_tags_from_collection(card_details.get("collection_id")) + if tags is not None: + chart_snapshot.aspects.append(tags) + return chart_snapshot def _get_chart_type(self, card_id: int, display_type: str) -> Optional[str]: @@ -845,9 +1187,141 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: ).workunit_processor, ] + def _is_metabase_model(self, card_details: dict) -> bool: + """ + Check if a card is a Metabase Model. + Models are special saved questions that can be used as data sources. + Introduced in Metabase v0.41+ + """ + return card_details.get("type") == "model" + + def emit_model_mces(self) -> Iterable[MetadataWorkUnit]: + """ + Emit Metabase Models as Dataset entities. + Models are saved questions that can be used as data sources for other questions. + """ + if not self.config.extract_models: + return + + try: + card_response = self.session.get(f"{self.config.connect_uri}/api/card") + card_response.raise_for_status() + cards = card_response.json() + + for card_info in cards: + card_id = card_info.get("id") + if card_id is None: + continue + + # Get full card details + card_details = self.get_card_details_by_id(card_id) + if not card_details: + continue + + # Only process if it's a model + if not self._is_metabase_model(card_details): + continue + + dataset_snapshot = self.construct_model_from_api_data(card_details) + if dataset_snapshot is not None: + mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot) + yield MetadataWorkUnit(id=dataset_snapshot.urn, mce=mce) + + except HTTPError as http_error: + self.report.report_failure( + title="Unable to Retrieve Models", + message="Request to retrieve models from Metabase failed.", + context=f"Error: {str(http_error)}", + ) + + def construct_model_from_api_data( + self, card_details: dict + ) -> Optional[DatasetSnapshot]: + """ + Construct a Dataset entity from a Metabase Model. + Models are virtual datasets created from saved questions. + """ + card_id = card_details.get("id") + if card_id is None: + return None + + # Create dataset URN for the model + # Use a special platform name to distinguish models from real tables + model_urn = builder.make_dataset_urn( + platform="metabase", + name=f"model.{card_id}", + env=self.config.env, + ) + + dataset_snapshot = DatasetSnapshot( + urn=model_urn, + aspects=[], + ) + + # Dataset properties + dataset_properties = DatasetPropertiesClass( + name=card_details.get("name", ""), + description=card_details.get("description", ""), + customProperties={ + "model_id": str(card_id), + "display_type": card_details.get("display", ""), + "metabase_url": f"{self.config.display_uri}/model/{card_id}", + }, + ) + dataset_snapshot.aspects.append(dataset_properties) + + # Subtype to indicate this is a model/view + subtypes = SubTypesClass(typeNames=["Model", "View"]) + dataset_snapshot.aspects.append(subtypes) # type: ignore + + # View properties with the underlying query + if card_details.get("dataset_query"): + raw_query = None + if card_details.get("query_type") == "native": + raw_query = ( + card_details.get("dataset_query", {}) + .get("native", {}) + .get("query", "") + ) + + if raw_query: + view_properties = ViewPropertiesClass( + materialized=False, + viewLogic=raw_query, + viewLanguage="SQL", + ) + dataset_snapshot.aspects.append(view_properties) + + # Upstream lineage - tables this model depends on + table_urns = self._get_table_urns_from_card(card_details) + if table_urns: + upstream_lineage = UpstreamLineageClass( + upstreams=[ + UpstreamClass( + dataset=table_urn, + type="TRANSFORMED", + ) + for table_urn in table_urns + ] + ) + dataset_snapshot.aspects.append(upstream_lineage) + + # Ownership + ownership = self._get_ownership(card_details.get("creator_id", "")) + if ownership is not None: + dataset_snapshot.aspects.append(ownership) + + # Tags from collection + tags = self._get_tags_from_collection(card_details.get("collection_id")) + if tags is not None: + dataset_snapshot.aspects.append(tags) + + return dataset_snapshot + def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: yield from self.emit_card_mces() yield from self.emit_dashboard_mces() + yield from self.emit_model_mces() def get_report(self) -> SourceReport: return self.report diff --git a/metadata-ingestion/tests/integration/metabase/metabase_mces_golden.json b/metadata-ingestion/tests/integration/metabase/metabase_mces_golden.json index d947d5d1f1cf90..7e2276b7474d67 100644 --- a/metadata-ingestion/tests/integration/metabase/metabase_mces_golden.json +++ b/metadata-ingestion/tests/integration/metabase/metabase_mces_golden.json @@ -240,6 +240,29 @@ } ], "datasets": [], + "datasetEdges": [ + { + "destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,acryl-data.public.customer,PROD)", + "lastModified": { + "time": 1705398694904, + "actor": "urn:li:corpuser:admin@metabase.com" + } + }, + { + "destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,acryl-data.public.payment,PROD)", + "lastModified": { + "time": 1705398694904, + "actor": "urn:li:corpuser:admin@metabase.com" + } + }, + { + "destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,dvdrental.public.film,PROD)", + "lastModified": { + "time": 1705398694904, + "actor": "urn:li:corpuser:admin@metabase.com" + } + } + ], "dashboards": [], "lastModified": { "created": { @@ -314,6 +337,29 @@ } ], "datasets": [], + "datasetEdges": [ + { + "destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,acryl-data.public.customer,PROD)", + "lastModified": { + "time": 1705398694904, + "actor": "urn:li:corpuser:admin@metabase.com" + } + }, + { + "destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,acryl-data.public.payment,PROD)", + "lastModified": { + "time": 1705398694904, + "actor": "urn:li:corpuser:admin@metabase.com" + } + }, + { + "destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,dvdrental.public.film,PROD)", + "lastModified": { + "time": 1705398694904, + "actor": "urn:li:corpuser:admin@metabase.com" + } + } + ], "dashboards": [], "lastModified": { "created": { @@ -388,6 +434,29 @@ } ], "datasets": [], + "datasetEdges": [ + { + "destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,acryl-data.public.customer,PROD)", + "lastModified": { + "time": 1705398694904, + "actor": "urn:li:corpuser:admin@metabase.com" + } + }, + { + "destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,acryl-data.public.payment,PROD)", + "lastModified": { + "time": 1705398694904, + "actor": "urn:li:corpuser:admin@metabase.com" + } + }, + { + "destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,dvdrental.public.film,PROD)", + "lastModified": { + "time": 1705398694904, + "actor": "urn:li:corpuser:admin@metabase.com" + } + } + ], "dashboards": [], "lastModified": { "created": { @@ -462,6 +531,29 @@ } ], "datasets": [], + "datasetEdges": [ + { + "destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,acryl-data.public.customer,PROD)", + "lastModified": { + "time": 1705398694904, + "actor": "urn:li:corpuser:admin@metabase.com" + } + }, + { + "destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,acryl-data.public.payment,PROD)", + "lastModified": { + "time": 1705398694904, + "actor": "urn:li:corpuser:admin@metabase.com" + } + }, + { + "destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,dvdrental.public.film,PROD)", + "lastModified": { + "time": 1705398694904, + "actor": "urn:li:corpuser:admin@metabase.com" + } + } + ], "dashboards": [], "lastModified": { "created": { diff --git a/metadata-ingestion/tests/unit/test_metabase_source.py b/metadata-ingestion/tests/unit/test_metabase_source.py index 065cda0f8402a4..e1af83a0bb4ab7 100644 --- a/metadata-ingestion/tests/unit/test_metabase_source.py +++ b/metadata-ingestion/tests/unit/test_metabase_source.py @@ -8,6 +8,11 @@ MetabaseReport, MetabaseSource, ) +from datahub.metadata.com.linkedin.pegasus2avro.common import AuditStamp +from datahub.metadata.schema_classes import ( + GlobalTagsClass, + UpstreamLineageClass, +) class TestMetabaseSource(MetabaseSource): @@ -135,3 +140,702 @@ def test_fail_session_delete(mock_post, mock_get, mock_delete): metabase_source.close() mock_report.report_failure.assert_called_once() + + +@patch("requests.delete") +@patch("requests.Session.get") +@patch("requests.post") +def test_get_table_urns_from_native_query(mock_post, mock_get, mock_delete): + """Test extraction of table URNs from native SQL queries""" + metabase_config = MetabaseConfig( + connect_uri="http://localhost:3000", + username="test", + password=pydantic.SecretStr("pwd"), + ) + ctx = PipelineContext(run_id="metabase-test") + ctx.graph = None + + # Mock successful authentication + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"id": "session-token"} + mock_get.return_value = mock_response + mock_post.return_value = mock_response + mock_delete.return_value = mock_response + + metabase_source = MetabaseSource(ctx, metabase_config) + + # Mock get_datasource_from_id + metabase_source.get_datasource_from_id = MagicMock( + return_value=("postgres", "mydb", "public", None) + ) + + # Test with a simple SQL query + card_details = { + "database_id": "1", + "dataset_query": { + "native": { + "query": "SELECT * FROM users JOIN orders ON users.id = orders.user_id" + } + }, + } + + table_urns = metabase_source._get_table_urns_from_native_query(card_details) + + # Should extract both tables + assert len(table_urns) == 2 + assert any("users" in urn for urn in table_urns) + assert any("orders" in urn for urn in table_urns) + + metabase_source.close() + + +@patch("requests.delete") +@patch("requests.Session.get") +@patch("requests.post") +def test_get_table_urns_from_query_builder(mock_post, mock_get, mock_delete): + """Test extraction of table URNs from query builder queries""" + metabase_config = MetabaseConfig( + connect_uri="http://localhost:3000", + username="test", + password=pydantic.SecretStr("pwd"), + ) + ctx = PipelineContext(run_id="metabase-test") + + # Mock successful authentication + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"id": "session-token"} + mock_get.return_value = mock_response + mock_post.return_value = mock_response + mock_delete.return_value = mock_response + + metabase_source = MetabaseSource(ctx, metabase_config) + + # Mock dependencies + metabase_source.get_datasource_from_id = MagicMock( + return_value=("postgres", "mydb", "public", None) + ) + metabase_source.get_source_table_from_id = MagicMock( + return_value=("public", "products") + ) + + # Test with query builder query + card_details = { + "database_id": "1", + "dataset_query": {"query": {"source-table": "42"}}, + } + + table_urns = metabase_source._get_table_urns_from_query_builder(card_details) + + # Should extract one table + assert len(table_urns) == 1 + assert "products" in table_urns[0] + assert "postgres" in table_urns[0] + + metabase_source.close() + + +@patch("requests.delete") +@patch("requests.Session.get") +@patch("requests.post") +def test_get_table_urns_from_nested_query(mock_post, mock_get, mock_delete): + """Test extraction of table URNs from nested queries (card referencing another card)""" + metabase_config = MetabaseConfig( + connect_uri="http://localhost:3000", + username="test", + password=pydantic.SecretStr("pwd"), + ) + ctx = PipelineContext(run_id="metabase-test") + + # Mock successful authentication + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"id": "session-token"} + mock_get.return_value = mock_response + mock_post.return_value = mock_response + mock_delete.return_value = mock_response + + metabase_source = MetabaseSource(ctx, metabase_config) + + # Mock dependencies + metabase_source.get_datasource_from_id = MagicMock( + return_value=("postgres", "mydb", "public", None) + ) + metabase_source.get_source_table_from_id = MagicMock( + return_value=("public", "products") + ) + + # The referenced card + referenced_card = { + "database_id": "1", + "dataset_query": {"type": "query", "query": {"source-table": "42"}}, + } + + # Mock get_card_details_by_id to return the referenced card + metabase_source.get_card_details_by_id = MagicMock(return_value=referenced_card) # type: ignore[method-assign] + + # Test with nested query (card referencing another card) + card_details = { + "database_id": "1", + "dataset_query": {"query": {"source-table": "card__123"}}, + } + + table_urns = metabase_source._get_table_urns_from_query_builder(card_details) + + # Should recursively resolve to the underlying table + assert len(table_urns) == 1 + assert "products" in table_urns[0] + + # Verify that get_card_details_by_id was called with the correct ID + metabase_source.get_card_details_by_id.assert_called_with("123") + + metabase_source.close() + + +@patch("requests.delete") +@patch("requests.Session.get") +@patch("requests.post") +def test_construct_dashboard_lineage(mock_post, mock_get, mock_delete): + """Test construction of dashboard-level lineage""" + metabase_config = MetabaseConfig( + connect_uri="http://localhost:3000", + username="test", + password=pydantic.SecretStr("pwd"), + ) + ctx = PipelineContext(run_id="metabase-test") + ctx.graph = None + + # Mock successful authentication + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"id": "session-token"} + mock_get.return_value = mock_response + mock_post.return_value = mock_response + mock_delete.return_value = mock_response + + metabase_source = MetabaseSource(ctx, metabase_config) + + # Mock dependencies + metabase_source.get_datasource_from_id = MagicMock( + return_value=("postgres", "mydb", "public", None) + ) + metabase_source.get_source_table_from_id = MagicMock( + return_value=("public", "products") + ) + + # Define cards that will be returned + card1 = { + "database_id": "1", + "dataset_query": { + "type": "native", + "native": {"query": "SELECT * FROM users"}, + }, + } + + card2 = { + "database_id": "1", + "dataset_query": {"type": "query", "query": {"source-table": "42"}}, + } + + def mock_get_card_details(card_id): + if card_id == "1": + return card1 + elif card_id == "2": + return card2 + return None + + metabase_source.get_card_details_by_id = MagicMock( # type: ignore[method-assign] + side_effect=mock_get_card_details + ) + + # Test dashboard with multiple cards + dashboard_details = { + "id": "100", + "name": "Test Dashboard", + "dashcards": [{"card": {"id": "1"}}, {"card": {"id": "2"}}], + } + + # Create a mock AuditStamp for last_modified + last_modified = AuditStamp(time=0, actor="urn:li:corpuser:test") + + dataset_edges = metabase_source.construct_dashboard_lineage( + dashboard_details, last_modified + ) + + # Should create dataset edges + assert dataset_edges is not None + assert isinstance(dataset_edges, list) + assert len(dataset_edges) >= 1 # At least one dataset edge + + # All edges should have destination URNs + for edge in dataset_edges: + assert edge.destinationUrn is not None + assert edge.lastModified is not None + + metabase_source.close() + + +@patch("requests.delete") +@patch("requests.Session.get") +@patch("requests.post") +def test_construct_dashboard_lineage_empty_dashcards(mock_post, mock_get, mock_delete): + """Test dashboard lineage construction with no cards""" + metabase_config = MetabaseConfig( + connect_uri="http://localhost:3000", + username="test", + password=pydantic.SecretStr("pwd"), + ) + ctx = PipelineContext(run_id="metabase-test") + + # Mock successful authentication + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"id": "session-token"} + mock_get.return_value = mock_response + mock_post.return_value = mock_response + mock_delete.return_value = mock_response + + metabase_source = MetabaseSource(ctx, metabase_config) + + # Test dashboard with no cards + dashboard_details = { + "id": "100", + "name": "Empty Dashboard", + "dashcards": [], + } + + # Create a mock AuditStamp for last_modified + last_modified = AuditStamp(time=0, actor="urn:li:corpuser:test") + + dataset_edges = metabase_source.construct_dashboard_lineage( + dashboard_details, last_modified + ) + + # Should return None for empty dashboard + assert dataset_edges is None + + metabase_source.close() + + +@patch("requests.delete") +@patch("requests.Session.get") +@patch("requests.post") +def test_construct_dashboard_lineage_deduplication(mock_post, mock_get, mock_delete): + """Test that dashboard lineage deduplicates table URNs""" + metabase_config = MetabaseConfig( + connect_uri="http://localhost:3000", + username="test", + password=pydantic.SecretStr("pwd"), + ) + ctx = PipelineContext(run_id="metabase-test") + ctx.graph = None + + # Mock successful authentication + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"id": "session-token"} + mock_get.return_value = mock_response + mock_post.return_value = mock_response + mock_delete.return_value = mock_response + + metabase_source = MetabaseSource(ctx, metabase_config) + + # Mock dependencies + metabase_source.get_datasource_from_id = MagicMock( + return_value=("postgres", "mydb", "public", None) + ) + metabase_source.get_source_table_from_id = MagicMock( + return_value=("public", "users") + ) + + # Both cards reference the same table + card1 = { + "database_id": "1", + "dataset_query": { + "type": "native", + "native": {"query": "SELECT count(*) FROM users"}, + }, + } + + card2 = { + "database_id": "1", + "dataset_query": { + "type": "native", + "native": {"query": "SELECT * FROM users WHERE active = true"}, + }, + } + + def mock_get_card_details(card_id): + if card_id == "1": + return card1 + elif card_id == "2": + return card2 + return None + + metabase_source.get_card_details_by_id = MagicMock( # type: ignore[method-assign] + side_effect=mock_get_card_details + ) + + # Dashboard with two cards referencing the same table + dashboard_details = { + "id": "100", + "name": "Test Dashboard", + "dashcards": [{"card": {"id": "1"}}, {"card": {"id": "2"}}], + } + + # Create a mock AuditStamp for last_modified + last_modified = AuditStamp(time=0, actor="urn:li:corpuser:test") + + dataset_edges = metabase_source.construct_dashboard_lineage( + dashboard_details, last_modified + ) + + # Should deduplicate to only one dataset edge + assert dataset_edges is not None + assert len(dataset_edges) == 1 + assert "users" in dataset_edges[0].destinationUrn + + metabase_source.close() + + +@patch("requests.delete") +@patch("requests.Session.get") +@patch("requests.post") +def test_get_table_urns_handles_missing_database_id(mock_post, mock_get, mock_delete): + """Test that missing database_id is handled gracefully""" + metabase_config = MetabaseConfig( + connect_uri="http://localhost:3000", + username="test", + password=pydantic.SecretStr("pwd"), + ) + ctx = PipelineContext(run_id="metabase-test") + + # Mock successful authentication + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"id": "session-token"} + mock_get.return_value = mock_response + mock_post.return_value = mock_response + mock_delete.return_value = mock_response + + metabase_source = MetabaseSource(ctx, metabase_config) + + # Test with missing database_id + card_details = { + "dataset_query": { + "type": "native", + "native": {"query": "SELECT * FROM users"}, + } + } + + table_urns = metabase_source._get_table_urns_from_native_query(card_details) + + # Should return empty list + assert table_urns == [] + + metabase_source.close() + + +@patch("requests.delete") +@patch("requests.Session.get") +@patch("requests.post") +def test_get_table_urns_handles_missing_query(mock_post, mock_get, mock_delete): + """Test that missing query is handled gracefully""" + metabase_config = MetabaseConfig( + connect_uri="http://localhost:3000", + username="test", + password=pydantic.SecretStr("pwd"), + ) + ctx = PipelineContext(run_id="metabase-test") + + # Mock successful authentication + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"id": "session-token"} + mock_get.return_value = mock_response + mock_post.return_value = mock_response + mock_delete.return_value = mock_response + + metabase_source = MetabaseSource(ctx, metabase_config) + + metabase_source.get_datasource_from_id = MagicMock( + return_value=("postgres", "mydb", "public", None) + ) + + # Test with missing query + card_details = { + "database_id": "1", + "dataset_query": {"native": {}}, + } + + table_urns = metabase_source._get_table_urns_from_native_query(card_details) + + # Should return empty list + assert table_urns == [] + + metabase_source.close() + + +@patch("requests.delete") +@patch("requests.Session.get") +@patch("requests.post") +def test_extract_tags_from_collection(mock_post, mock_get, mock_delete): + """Test that tags are extracted from collections""" + metabase_config = MetabaseConfig( + connect_uri="http://localhost:3000", + username="test", + password=pydantic.SecretStr("pwd"), + extract_collections_as_tags=True, + ) + ctx = PipelineContext(run_id="metabase-test") + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"id": "session-token"} + mock_get.return_value = mock_response + mock_post.return_value = mock_response + mock_delete.return_value = mock_response + + metabase_source = MetabaseSource(ctx, metabase_config) + + collections_response = MagicMock() + collections_response.status_code = 200 + collections_response.json.return_value = [ + {"id": "1", "name": "Sales Dashboard"}, + ] + + def mock_get_collections(url): + if "/api/collection/" in url: + return collections_response + return mock_response + + metabase_source.session.get = MagicMock(side_effect=mock_get_collections) # type: ignore[method-assign] + tags = metabase_source._get_tags_from_collection("1") + + assert tags is not None + assert isinstance(tags, GlobalTagsClass) + metabase_source.close() + + +@patch("requests.delete") +@patch("requests.Session.get") +@patch("requests.post") +def test_is_metabase_model(mock_post, mock_get, mock_delete): + """Test model detection""" + metabase_config = MetabaseConfig( + connect_uri="http://localhost:3000", + username="test", + password=pydantic.SecretStr("pwd"), + ) + ctx = PipelineContext(run_id="metabase-test") + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"id": "session-token"} + mock_get.return_value = mock_response + mock_post.return_value = mock_response + mock_delete.return_value = mock_response + + metabase_source = MetabaseSource(ctx, metabase_config) + + model_card = {"id": "123", "type": "model", "name": "Sales Model"} + assert metabase_source._is_metabase_model(model_card) is True + + question_card = {"id": "456", "type": "question", "name": "Sales Query"} + assert metabase_source._is_metabase_model(question_card) is False + + metabase_source.close() + + +@patch("requests.delete") +@patch("requests.Session.get") +@patch("requests.post") +def test_construct_model_from_api_data(mock_post, mock_get, mock_delete): + """Test construction of DatasetSnapshot for Metabase Models""" + from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import ( + DatasetSnapshot, + ) + + metabase_config = MetabaseConfig( + connect_uri="http://localhost:3000", + username="test", + password=pydantic.SecretStr("pwd"), + ) + ctx = PipelineContext(run_id="metabase-test") + ctx.graph = None + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"id": "session-token"} + mock_get.return_value = mock_response + mock_post.return_value = mock_response + mock_delete.return_value = mock_response + + metabase_source = MetabaseSource(ctx, metabase_config) + metabase_source.get_datasource_from_id = MagicMock( + return_value=("postgres", "mydb", "public", None) + ) + + # Mock collections API response + collections_response = MagicMock() + collections_response.status_code = 200 + collections_response.json.return_value = [ + {"id": "42", "name": "Analytics"}, + ] + + def mock_get_collections(url): + if "/api/collection/" in url: + return collections_response + return mock_response + + metabase_source.session.get = MagicMock(side_effect=mock_get_collections) # type: ignore[method-assign] + + model_card = { + "id": 123, + "name": "Customer Revenue Model", + "description": "A model for customer revenue analysis", + "type": "model", + "database_id": 1, + "query_type": "native", # This triggers ViewPropertiesClass + "dataset_query": { + "type": "native", + "native": { + "query": "SELECT customer_id, SUM(revenue) FROM orders GROUP BY customer_id" + }, + }, + "collection_id": 42, + "creator_id": 1, + "created_at": "2024-01-15T10:00:00Z", + } + + dataset_snapshot = metabase_source.construct_model_from_api_data(model_card) + + assert dataset_snapshot is not None + assert isinstance(dataset_snapshot, DatasetSnapshot) + assert "metabase" in dataset_snapshot.urn + assert "123" in dataset_snapshot.urn + + aspect_types = [type(aspect).__name__ for aspect in dataset_snapshot.aspects] + assert "DatasetPropertiesClass" in aspect_types + assert "SubTypesClass" in aspect_types + assert "ViewPropertiesClass" in aspect_types + assert "OwnershipClass" in aspect_types + assert "GlobalTagsClass" in aspect_types # Tags from collection + + metabase_source.close() + + +@patch("requests.delete") +@patch("requests.Session.get") +@patch("requests.post") +def test_construct_model_with_lineage(mock_post, mock_get, mock_delete): + """Test that models include lineage to source tables""" + metabase_config = MetabaseConfig( + connect_uri="http://localhost:3000", + username="test", + password=pydantic.SecretStr("pwd"), + ) + ctx = PipelineContext(run_id="metabase-test") + ctx.graph = None + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"id": "session-token"} + mock_get.return_value = mock_response + mock_post.return_value = mock_response + mock_delete.return_value = mock_response + + metabase_source = MetabaseSource(ctx, metabase_config) + metabase_source.get_datasource_from_id = MagicMock( + return_value=("postgres", "mydb", "public", None) + ) + + model_card = { + "id": 456, + "name": "Sales Model", + "type": "model", + "database_id": 1, + "dataset_query": { + "type": "native", + "native": {"query": "SELECT * FROM sales WHERE amount > 100"}, + }, + "collection_id": None, + "creator_id": 1, + } + + dataset_snapshot = metabase_source.construct_model_from_api_data(model_card) + + assert dataset_snapshot is not None + upstream_lineage = None + for aspect in dataset_snapshot.aspects: + if isinstance(aspect, UpstreamLineageClass): + upstream_lineage = aspect + break + + assert upstream_lineage is not None + assert len(upstream_lineage.upstreams) > 0 + assert any("sales" in upstream.dataset for upstream in upstream_lineage.upstreams) + + metabase_source.close() + + +@patch("requests.delete") +@patch("requests.Session.get") +@patch("requests.post") +def test_extract_models_config_disabled(mock_post, mock_get, mock_delete): + """Test that models are not extracted when config is disabled""" + metabase_config = MetabaseConfig( + connect_uri="http://localhost:3000", + username="test", + password=pydantic.SecretStr("pwd"), + extract_models=False, + ) + ctx = PipelineContext(run_id="metabase-test") + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"id": "session-token"} + mock_get.return_value = mock_response + mock_post.return_value = mock_response + mock_delete.return_value = mock_response + + metabase_source = MetabaseSource(ctx, metabase_config) + + card_response = MagicMock() + card_response.status_code = 200 + card_response.json.return_value = [{"id": 1, "name": "Test Model", "type": "model"}] + metabase_source.session.get = MagicMock(return_value=card_response) # type: ignore[method-assign] + + workunits = list(metabase_source.emit_model_mces()) + assert len(workunits) == 0 + + metabase_source.close() + + +@patch("requests.delete") +@patch("requests.Session.get") +@patch("requests.post") +def test_extract_collections_as_tags_config_disabled(mock_post, mock_get, mock_delete): + """Test that tags are not extracted when config is disabled""" + metabase_config = MetabaseConfig( + connect_uri="http://localhost:3000", + username="test", + password=pydantic.SecretStr("pwd"), + extract_collections_as_tags=False, + ) + ctx = PipelineContext(run_id="metabase-test") + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"id": "session-token"} + mock_get.return_value = mock_response + mock_post.return_value = mock_response + mock_delete.return_value = mock_response + + metabase_source = MetabaseSource(ctx, metabase_config) + + tags = metabase_source._get_tags_from_collection("42") + assert tags is None + + metabase_source.close() From b14800f7e46337478c1f268b3294db7c396f7ea1 Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Mon, 10 Nov 2025 16:35:54 +0000 Subject: [PATCH 2/2] Update capability_summary.json --- .../datahub/ingestion/autogenerated/capability_summary.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/autogenerated/capability_summary.json b/metadata-ingestion/src/datahub/ingestion/autogenerated/capability_summary.json index ecf0ee955e7f61..e14f6d16f1202f 100644 --- a/metadata-ingestion/src/datahub/ingestion/autogenerated/capability_summary.json +++ b/metadata-ingestion/src/datahub/ingestion/autogenerated/capability_summary.json @@ -1,5 +1,5 @@ { - "generated_at": "2025-10-23T14:26:01.879155+00:00", + "generated_at": "2025-11-10T16:34:14.990322+00:00", "generated_by": "metadata-ingestion/scripts/capability_summary.py", "plugin_details": { "abs": { @@ -1745,7 +1745,7 @@ }, { "capability": "LINEAGE_COARSE", - "description": "Supported by default", + "description": "Supported by default for charts and dashboards", "subtype_modifier": null, "supported": true }