Skip to content

Commit b686d2b

Browse files
authored
feat(ingest/kafka-connect): Add Confluent Cloud connector and transform pipeline support (#14575)
1 parent 88091ec commit b686d2b

29 files changed

+12351
-1181
lines changed

metadata-ingestion/KAFKA_CONNECT_LINEAGE.md

Lines changed: 526 additions & 0 deletions
Large diffs are not rendered by default.

metadata-ingestion/docs/sources/kafka-connect/README.md

Lines changed: 165 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,38 @@ This plugin extracts the following:
66
- For Source connectors - Data Jobs to represent lineage information between source dataset to Kafka topic per `{connector_name}:{source_dataset}` combination
77
- For Sink connectors - Data Jobs to represent lineage information between Kafka topic to destination dataset per `{connector_name}:{topic}` combination
88

9+
### Requirements
10+
11+
**Java Runtime Dependency:**
12+
13+
This source requires Java to be installed and available on the system for transform pipeline support (RegexRouter, etc.). The Java runtime is accessed via JPype to enable Java regex pattern matching that's compatible with Kafka Connect transforms.
14+
15+
- **Python installations**: Install Java separately (e.g., `apt-get install openjdk-11-jre-headless` on Debian/Ubuntu)
16+
- **Docker deployments**: Ensure your DataHub ingestion Docker image includes a Java runtime. The official DataHub images include Java by default.
17+
- **Impact**: Without Java, transform pipeline features will be disabled and lineage accuracy may be reduced for connectors using transforms
18+
19+
**Note for Docker users**: If you're building custom Docker images for DataHub ingestion, ensure a Java Runtime Environment (JRE) is included in your image to support full transform pipeline functionality.
20+
21+
### Environment Support
22+
23+
DataHub's Kafka Connect source supports both **self-hosted** and **Confluent Cloud** environments with automatic detection and environment-specific topic retrieval strategies:
24+
25+
#### Self-hosted Kafka Connect
26+
27+
- **Topic Discovery**: Uses runtime `/connectors/{name}/topics` API endpoint
28+
- **Accuracy**: Returns actual topics that connectors are currently reading from/writing to
29+
- **Benefits**: Most accurate topic information as it reflects actual runtime state
30+
- **Requirements**: Standard Kafka Connect REST API access
31+
32+
#### Confluent Cloud
33+
34+
- **Topic Discovery**: Uses comprehensive Kafka REST API v3 for optimal transform pipeline support with config-based fallback
35+
- **Method**: Gets all topics from Kafka cluster via REST API, applies reverse transform pipeline for accurate mappings
36+
- **Transform Support**: Full support for complex transform pipelines via reverse pipeline strategy using actual cluster topics
37+
- **Fallback**: Falls back to config-based derivation if Kafka API is unavailable
38+
39+
**Environment Detection**: Automatically detects environment based on `connect_uri` patterns containing `confluent.cloud`.
40+
941
### Concept Mapping
1042

1143
This ingestion source maps the following Source System Concepts to DataHub Concepts:
@@ -16,9 +48,138 @@ This ingestion source maps the following Source System Concepts to DataHub Conce
1648
| [Connector](https://kafka.apache.org/documentation/#connect_connectorsandtasks) | [DataFlow](https://docs.datahub.com/docs/generated/metamodel/entities/dataflow/) | |
1749
| Kafka Topic | [Dataset](https://docs.datahub.com/docs/generated/metamodel/entities/dataset/) | |
1850

19-
## Current limitations
51+
## Supported Connectors and Lineage Extraction
52+
53+
DataHub supports different connector types with varying levels of lineage extraction capabilities depending on the environment (self-hosted vs Confluent Cloud):
54+
55+
### Source Connectors
56+
57+
| Connector Type | Self-hosted Support | Confluent Cloud Support | Topic Discovery Method | Lineage Extraction |
58+
| -------------------------------------------------------------------------------- | ------------------- | ----------------------- | --------------------------- | ------------------------------ |
59+
| **Platform JDBC Source**<br/>`io.confluent.connect.jdbc.JdbcSourceConnector` | ✅ Full | ✅ Full | Runtime API / Config-based | Table → Topic mapping |
60+
| **Cloud PostgreSQL CDC**<br/>`PostgresCdcSource` | ✅ Full | ✅ Full | Runtime API / Config-based | Table → Topic mapping |
61+
| **Cloud PostgreSQL CDC V2**<br/>`PostgresCdcSourceV2` | ✅ Full | ✅ Full | Runtime API / Config-based | Table → Topic mapping |
62+
| **Cloud MySQL Source**<br/>`MySqlSource` | ✅ Full | ✅ Full | Runtime API / Config-based | Table → Topic mapping |
63+
| **Cloud MySQL CDC**<br/>`MySqlCdcSource` | ✅ Full | ✅ Full | Runtime API / Config-based | Table → Topic mapping |
64+
| **Debezium MySQL**<br/>`io.debezium.connector.mysql.MySqlConnector` | ✅ Full | ✅ Partial | Runtime API / Config-based | Database → Topic CDC mapping |
65+
| **Debezium PostgreSQL**<br/>`io.debezium.connector.postgresql.PostgresConnector` | ✅ Full | ✅ Partial | Runtime API / Config-based | Database → Topic CDC mapping |
66+
| **Debezium SQL Server**<br/>`io.debezium.connector.sqlserver.SqlServerConnector` | ✅ Full | ✅ Partial | Runtime API / Config-based | Database → Topic CDC mapping |
67+
| **Debezium Oracle**<br/>`io.debezium.connector.oracle.OracleConnector` | ✅ Full | ✅ Partial | Runtime API / Config-based | Database → Topic CDC mapping |
68+
| **Debezium DB2**<br/>`io.debezium.connector.db2.Db2Connector` | ✅ Full | ✅ Partial | Runtime API / Config-based | Database → Topic CDC mapping |
69+
| **Debezium MongoDB**<br/>`io.debezium.connector.mongodb.MongoDbConnector` | ✅ Full | ✅ Partial | Runtime API / Config-based | Collection → Topic CDC mapping |
70+
| **Debezium Vitess**<br/>`io.debezium.connector.vitess.VitessConnector` | ✅ Full | ✅ Partial | Runtime API / Config-based | Table → Topic CDC mapping |
71+
| **MongoDB Source**<br/>`com.mongodb.kafka.connect.MongoSourceConnector` | ✅ Full | 🔧 Config Required | Runtime API / Manual config | Collection → Topic mapping |
72+
| **Generic Connectors** | 🔧 Config Required | 🔧 Config Required | User-defined mapping | Custom lineage mapping |
73+
74+
### Sink Connectors
75+
76+
| Connector Type | Self-hosted Support | Confluent Cloud Support | Topic Discovery Method | Lineage Extraction |
77+
| ------------------------------------------------------------------------------ | ------------------- | ----------------------- | -------------------------- | ------------------------- |
78+
| **BigQuery Sink**<br/>`com.wepay.kafka.connect.bigquery.BigQuerySinkConnector` | ✅ Full | ✅ Full | Runtime API / Config-based | Topic → Table mapping |
79+
| **S3 Sink**<br/>`io.confluent.connect.s3.S3SinkConnector` | ✅ Full | ✅ Full | Runtime API / Config-based | Topic → S3 object mapping |
80+
| **Snowflake Sink**<br/>`com.snowflake.kafka.connector.SnowflakeSinkConnector` | ✅ Full | ✅ Full | Runtime API / Config-based | Topic → Table mapping |
81+
| **Cloud PostgreSQL Sink**<br/>`PostgresSink` | ✅ Full | ✅ Full | Runtime API / Config-based | Topic → Table mapping |
82+
| **Cloud MySQL Sink**<br/>`MySqlSink` | ✅ Full | ✅ Full | Runtime API / Config-based | Topic → Table mapping |
83+
| **Cloud Snowflake Sink**<br/>`SnowflakeSink` | ✅ Full | ✅ Full | Runtime API / Config-based | Topic → Table mapping |
84+
85+
**Legend:**
86+
87+
-**Full**: Complete lineage extraction with accurate topic discovery
88+
-**Partial**: Lineage extraction supported but topic discovery may be limited (config-based only)
89+
- 🔧 **Config Required**: Requires `generic_connectors` configuration for lineage mapping
90+
91+
### Supported Transforms
92+
93+
DataHub uses an **advanced transform pipeline strategy** that automatically handles complex transform chains by applying the complete pipeline to all topics and checking if results exist. This provides robust support for any combination of transforms.
94+
95+
#### Topic Routing Transforms
96+
97+
- **RegexRouter**: `org.apache.kafka.connect.transforms.RegexRouter`
98+
- **Cloud RegexRouter**: `io.confluent.connect.cloud.transforms.TopicRegexRouter`
99+
- **Debezium EventRouter**: `io.debezium.transforms.outbox.EventRouter` (Outbox pattern)
100+
101+
#### Non-Topic Routing Transforms
102+
103+
DataHub recognizes but passes through these transforms (they don't affect lineage):
104+
105+
- InsertField, ReplaceField, MaskField, ValueToKey, HoistField, ExtractField
106+
- SetSchemaMetadata, Flatten, Cast, HeadersFrom, TimestampConverter
107+
- Filter, InsertHeader, DropHeaders, Drop, TombstoneHandler
108+
109+
#### Transform Pipeline Strategy
110+
111+
DataHub uses an improved **reverse transform pipeline approach** that:
112+
113+
1. **Takes all actual topics** from the connector manifest/Kafka cluster
114+
2. **Applies the complete transform pipeline** to each topic
115+
3. **Checks if transformed results exist** in the actual topic list
116+
4. **Creates lineage mappings** only for successful matches
117+
118+
**Benefits:**
119+
120+
-**Works with any transform combination** (single or chained transforms)
121+
-**Handles complex scenarios** like EventRouter + RegexRouter chains
122+
-**Uses actual topics as source of truth** (no prediction needed)
123+
-**Future-proof** for new transform types
124+
-**Works identically** for both self-hosted and Confluent Cloud environments
125+
126+
## Capabilities and Limitations
127+
128+
### Transform Pipeline Support
129+
130+
**✅ Fully Supported:**
131+
132+
- **Any combination of transforms**: RegexRouter, EventRouter, and non-routing transforms
133+
- **Complex transform chains**: Multiple chained transforms automatically handled
134+
- **Both environments**: Self-hosted and Confluent Cloud work identically
135+
- **Future-proof**: New transform types automatically supported
136+
137+
**⚠️ Considerations:**
138+
139+
- For connectors not listed in the supported connector table above, use the `generic_connectors` configuration to provide explicit lineage mappings
140+
- Some advanced connector-specific features may not be fully supported
141+
142+
### Environment-Specific Behavior
143+
144+
#### Self-hosted Kafka Connect
145+
146+
- **Topic Discovery**: Uses runtime `/connectors/{name}/topics` API endpoint for maximum accuracy
147+
- **Requirements**: Standard Kafka Connect REST API access
148+
- **Fallback**: If runtime API fails, falls back to config-based derivation
149+
150+
#### Confluent Cloud
151+
152+
- **Topic Discovery**: Uses comprehensive Kafka REST API v3 to get all topics, with automatic credential reuse
153+
- **Transform Support**: Full support for all transform combinations via reverse pipeline strategy using actual cluster topics
154+
- **Auto-derivation**: Automatically derives Kafka REST endpoint from connector configurations
155+
156+
### Configuration Control
157+
158+
The `use_connect_topics_api` flag controls topic retrieval behavior:
159+
160+
- **When `true` (default)**: Uses environment-specific topic discovery with full transform support
161+
- **When `false`**: Disables all topic discovery for air-gapped environments or performance optimization
162+
163+
### Advanced Scenarios
164+
165+
**Complex Transform Chains:**
166+
The new reverse transform pipeline strategy handles complex scenarios automatically:
167+
168+
```yaml
169+
# Example: EventRouter + RegexRouter chain
170+
transforms: EventRouter,RegexRouter
171+
transforms.EventRouter.type: io.debezium.transforms.outbox.EventRouter
172+
transforms.RegexRouter.type: org.apache.kafka.connect.transforms.RegexRouter
173+
transforms.RegexRouter.regex: "outbox\\.event\\.(.*)"
174+
transforms.RegexRouter.replacement: "events.$1"
175+
```
176+
177+
**Fallback Options:**
178+
179+
- If transform pipeline cannot determine mappings, DataHub falls back to simple topic-based lineage
180+
- For unsupported connector types or complex custom scenarios, use `generic_connectors` configuration
20181

21-
Works only for
182+
**Performance Optimization:**
22183

23-
- Source connectors: JDBC, Debezium, Mongo and Generic connectors with user-defined lineage graph
24-
- Sink connectors: BigQuery, Confluent, S3, Snowflake
184+
- Set `use_connect_topics_api: false` to disable topic discovery in air-gapped environments
185+
- Transform pipeline processing adds minimal overhead and improves lineage accuracy

0 commit comments

Comments
 (0)