From faf6d93c9c6c58f50110e76deb14c9ebd2ca2ede Mon Sep 17 00:00:00 2001 From: Anton Rubin Date: Thu, 6 Nov 2025 11:55:06 +0000 Subject: [PATCH 1/4] expanding on kafka data prepper docs Signed-off-by: Anton Rubin --- .../pipelines/configuration/sources/kafka.md | 289 ++++++++++++++++-- 1 file changed, 261 insertions(+), 28 deletions(-) diff --git a/_data-prepper/pipelines/configuration/sources/kafka.md b/_data-prepper/pipelines/configuration/sources/kafka.md index bdcfcbf5492..afe80b85b2b 100644 --- a/_data-prepper/pipelines/configuration/sources/kafka.md +++ b/_data-prepper/pipelines/configuration/sources/kafka.md @@ -14,17 +14,19 @@ You can use the Apache Kafka source (`kafka`) in OpenSearch Data Prepper to read The following example shows the `kafka` source in a Data Prepper pipeline: -```json +```yaml kafka-pipeline: source: kafka: bootstrap_servers: - - 127.0.0.1:9093 + - 127.0.0.1:9092 topics: - name: Topic1 group_id: groupID1 - name: Topic2 group_id: groupID1 + sink: + - stdout: {} ``` ## Configuration @@ -44,7 +46,7 @@ Option | Required | Type | Description ### Topics -Use the following options in the `topics` array. +Use the following options in the `topics` array (per topic). Option | Required | Type | Description :--- | :--- | :--- | :--- @@ -65,55 +67,102 @@ Option | Required | Type | Description `retry_backoff` | No | Integer | The amount of time to wait before attempting to retry a failed request to a given topic partition. Default is `10s`. `max_poll_interval` | No | Integer | The maximum delay between invocations of a `poll()` when using group management through Kafka's `max.poll.interval.ms` option. Default is `300s`. `consumer_max_poll_records` | No | Integer | The maximum number of records returned in a single `poll()` call through Kafka's `max.poll.records` setting. Default is `500`. -`key_mode` | No | String | Indicates how the key field of the Kafka message should be handled. The default setting is `include_as_field`, which includes the key in the `kafka_key` event. The `include_as_metadata` setting includes the key in the event's metadata. The `discard` setting discards the key. +`key_mode` | No | String | Indicates how the key field of the Kafka message should be handled. The default setting is `include_as_field`, which includes the key in the `kafka_key` event. The `include_as_metadata` setting includes the key in the event's metadata. The `discard` setting discards the key. ### Schema -The following option is required inside the `schema` configuration. +The `schema` configuration has the following configuration options. -Option | Type | Description +Option | Type | Required | Description :--- | :--- | :--- -`type` | String | Sets the type of schema based on your registry, either the AWS Glue Schema Registry, `aws_glue`, or the Confluent Schema Registry, `confluent`. When using the `aws_glue` registry, set any [AWS](#aws) configuration options. +`type` | String | Yes | Sets the type of schema based on your registry, either the AWS Glue Schema Registry, `aws_glue`, or the Confluent Schema Registry, `confluent`. When using the `aws_glue` registry, set any [AWS](#aws) configuration options. +`basic_auth_credentials_source` | String | No | Where Schema Registry credentials come from. Use `USER_INFO` when providing `api_key/api_secret`. Other valid values are `URL` and `SASL_INHERIT`. Default typically aligns with the underlying client. The following configuration options are only required when using a `confluent` registry. Option | Type | Description :--- | :--- | :--- -`registry_url` | String | Deserializes a record value from a `bytearray` into a string. Default is `org.apache.kafka.common.serialization.StringDeserializer`. -`version` | String | Deserializes a record key from a `bytearray` into a string. Default is `org.apache.kafka.common.serialization.StringDeserializer`. -`schema_registry_api_key` | String | The schema registry API key. -`schema_registry_api_secret` | String | The schema registry API secret. +`registry_url` | String | Base URL of the Schema Registry, for example, `http://schema-registry:8081` or `https://sr.example.com`. +`version` | String | Schema version to use per subject. Use an integer or `"latest"`. +`api_key` | String | The schema registry API key. +`api_secret` | String | The schema registry API secret. + +See following example of configuring schema registry: + +```yaml +schema: + type: confluent + registry_url: "http://schema-registry:8081" + api_key: "" + api_secret: "" + version: "latest" +``` +{% include copy.html %} -### Authentication +#### Schema Registry over TLS -The following option is required inside the `authentication` object. +The Kafka source uses the JVM truststore when connecting to Schema Registry over `https`. If Schema Registry is signed by a custom CA, add that CA to the Data Prepper JVM truststore or provide a custom truststore via environment variables. The following commands can be used to configure this in docker-compose deployment. -Option | Type | Description -:--- | :--- | :--- -`sasl` | JSON object | The Simple Authentication and Security Layer (SASL) authentication configuration. +Build a truststore with your CA: -### SASL +```bash +keytool -importcert -noprompt -alias sr-ca -file sr-ca.pem -keystore /usr/share/data-prepper/certs/sr.truststore.jks -storepass changeit +``` +{% include copy.html %} -Use one of the following options when configuring SASL authentication. +Configure Data Prepper using `JAVA_TOOL_OPTIONS`: +```yaml +JAVA_TOOL_OPTIONS=-Djavax.net.ssl.trustStore=/usr/share/data-prepper/certs/sr.truststore.jks -Djavax.net.ssl.trustStorePassword=changeit +``` +{% include copy.html %} -Option | Type | Description -:--- | :--- | :--- -`plaintext` | JSON object | The [PLAINTEXT](#sasl-plaintext) authentication configuration. -`aws_msk_iam` | String | The Amazon MSK AWS Identity and Access Management (IAM) configuration. If set to `role`, the `sts_role_arm` set in the `aws` configuration is used. Default is `default`. +The following configuration can be used to configure Data Pepper in Docker Compose: + +```yaml +environment: + - JAVA_TOOL_OPTIONS=-Djavax.net.ssl.trustStore=/usr/share/data-prepper/certs/sr.truststore.jks -Djavax.net.ssl.trustStorePassword=changeit +volumes: + - ./certs:/usr/share/data-prepper/certs:ro +``` +{% include copy.html %} +### Authentication +The `authentication` section configures SASL. + +```yaml +authentication: + sasl: + plain: + username: alice + password: secret +``` +{% include copy.html %} -#### SASL PLAINTEXT +| Option | Type | Description | +|:---|:---|:---| +| `sasl` | Object | SASL configuration. | -The following options are required when using the [SASL PLAINTEXT](https://kafka.apache.org/10/javadoc/org/apache/kafka/common/security/auth/SecurityProtocol.html) protocol. +#### SASL + +Use one of the following options when configuring SASL authentication. Option | Type | Description :--- | :--- | :--- -`username` | String | The username for the PLAINTEXT auth. -`password` | String | The password for the PLAINTEXT auth. +`plain` | JSON object | The plaintext authentication configuration. See [SASL/PLAIN](#sasl-plaintext) fr further details. +`aws_msk_iam` | String | The Amazon MSK AWS Identity and Access Management (IAM) configuration. If set to `role`, the `sts_role_arm` set in the `aws` configuration is used. Default is `default`. + +##### SASL plaintext -#### Encryption +The following options are required when using the [SASL.plain](https://kafka.apache.org/10/javadoc/org/apache/kafka/common/security/auth/SecurityProtocol.html) protocol. + +| Option | Type | Description | +|:---|:---|:---| +| `username` | String | SASL/PLAIN username. | +| `password` | String | SASL/PLAIN password. | + +### Encryption Use the following options when setting SSL encryption. @@ -122,6 +171,35 @@ Option | Required | Type | Description `type` | No | String | The encryption type. Use `none` to disable encryption. Default is `ssl`. `insecure` | No | Boolean | A Boolean flag used to turn off SSL certificate verification. If set to `true`, certificate authority (CA) certificate verification is turned off and insecure HTTP requests are sent. Default is `false`. +```yaml +encryption: + type: ssl + # With public CA: no extra config needed. + # With private CA: trust via JVM truststore. +``` +{% include copy.html %} + +**Trusting a private CA for brokers** + +As with Schema Registry, the Kafka client inside Data Prepper uses the JVM truststore. Provide a truststore containing your Kafka broker CA: + +```bash +keytool -importcert -noprompt -alias kafka-ca -file kafka-ca.pem -keystore /usr/share/data-prepper/certs/kafka.truststore.jks -storepass changeit +``` +{% include copy.html %} + +See following docker-compose excerpt: + +```yaml +services: + data-prepper: + image: opensearchproject/data-prepper:latest + environment: + - JAVA_TOOL_OPTIONS=-Djavax.net.ssl.trustStore=/usr/share/data-prepper/certs/kafka.truststore.jks -Djavax.net.ssl.trustStorePassword=changeit + volumes: + - ./certs:/usr/share/data-prepper/certs:ro +``` +{% include copy.html %} #### AWS @@ -140,5 +218,160 @@ Use the following options inside the `msk` object. Option | Required | Type | Description :--- | :--- | :--- | :--- `arn` | Yes | String | The [MSK ARN](https://docs.aws.amazon.com/msk/1.0/apireference/configurations-arn.html) to use. -`broker_connection_type` No | String | The type of connector to use with the MSK broker, either `public`, `single_vpc`, or `multip_vpc`. Default is `single_vpc`. +`broker_connection_type` | No | String | The type of connector to use with the MSK broker, either `public`, `single_vpc`, or `multip_vpc`. Default is `single_vpc`. + +## Configuration examples + +### Basic Kafka source + +The following example pipeline reads JSON messages from a single plaintext Kafka topic with multiple consumer workers, parses them, and indexes into OpenSearch: + +```yaml +kafka-pipeline: + source: + kafka: + bootstrap_servers: + - localhost:9092 + topics: + - name: my-topic + group_id: data-prepper-group + workers: 4 + processor: + - parse_json: + sink: + - opensearch: + hosts: ["https://localhost:9200"] + username: admin + password: admin + index: kafka-data +``` +{% include copy.html %} + +### Kafka source with SSL encryption + +The following example pipeline connects to a Kafka broker over TLS, consumes from a secure topic and writes results to OpenSearch: + +```yaml +kafka-pipeline: + source: + kafka: + bootstrap_servers: + - kafka-broker.example.com:9093 + topics: + - name: secure-topic + group_id: secure-group + encryption: + type: ssl + sink: + - opensearch: + hosts: ["https://localhost:9200"] + username: admin + password: admin + index: secure-kafka-data +``` +{% include copy.html %} + +### Kafka source with SASL PLAIN authentication + +The following example pipeline authenticates to Kafka using SASL/PLAIN over TLS, consumes from the topic, and indexes into OpenSearch. + +```yaml +kafka-pipeline: + source: + kafka: + bootstrap_servers: + - kafka-broker.example.com:9094 + topics: + - name: authenticated-topic + group_id: auth-group + encryption: + type: ssl + authentication: + sasl: + plaintext: + username: kafka-user + password: kafka-password + sink: + - opensearch: + hosts: ["https://localhost:9200"] + username: admin + password: admin + index: authenticated-kafka-data +``` +{% include copy.html %} + +### Amazon MSK with AWS Glue Schema Registry + +The following example configures Amazon MSK with AWS Glue Schema Registry, consumes from an MSK cluster using AWS settings, deserializes payload using AWS Glue Schema Registry, normalizes timestamps, and writes to an Amazon OpenSearch domain: + +```yaml +msk-pipeline: + source: + kafka: + acknowledgments: true + topics: + - name: my-msk-topic + group_id: msk-consumer-group + auto_offset_reset: earliest + aws: + region: us-east-1 + sts_role_arn: arn:aws:iam::123456789012:role/data-prepper-role + msk: + arn: arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster-name/uuid + schema: + type: aws_glue + registry_name: my-glue-registry + processor: + - date: + match: + - key: timestamp + patterns: ["epoch_milli"] + destination: "@timestamp" + sink: + - opensearch: + hosts: ["https://search-my-domain.us-east-1.opensearch.amazonaws.com"] + aws: + region: us-east-1 + sts_role_arn: arn:aws:iam::123456789012:role/opensearch-role + index: msk-data + index_type: custom +``` +{% include copy.html %} + +### Confluent Kafka with Schema Registry + +The following example configures Confluent Kafka with Schema Registry, connects to Confluent Cloud over TLS with SASL and Confluent Schema Registry credentials, decodes payloads, and indexes them into OpenSearch: + +```yaml +confluent-pipeline: + source: + kafka: + bootstrap_servers: + - pkc-xxxxx.us-east-1.aws.confluent.cloud:9092 + topics: + - name: confluent-topic + group_id: confluent-group + auto_offset_reset: earliest + encryption: + type: ssl + authentication: + sasl: + plain: + username: confluent-api-key + password: confluent-api-secret + schema: + type: confluent + registry_url: https://psrc-xxxxx.us-east-1.aws.confluent.cloud + api_key: "${{aws_secrets:schema-secret:schema_registry_api_key}}" + api_secret: "${{aws_secrets:schema-secret:schema_registry_api_secret}}" + basic_auth_credentials_source: USER_INFO + sink: + - opensearch: + hosts: ["https://localhost:9200"] + username: admin + password: admin_password + index_type: custom + index: confluent-data +``` +{% include copy.html %} From 1d7f51868ffa9625c3476bf1383a6bcde4f7dc5f Mon Sep 17 00:00:00 2001 From: Anton Rubin Date: Thu, 6 Nov 2025 12:11:19 +0000 Subject: [PATCH 2/4] expanding on kafka data prepper docs Signed-off-by: Anton Rubin --- .../pipelines/configuration/sources/kafka.md | 36 +++++-------------- 1 file changed, 8 insertions(+), 28 deletions(-) diff --git a/_data-prepper/pipelines/configuration/sources/kafka.md b/_data-prepper/pipelines/configuration/sources/kafka.md index afe80b85b2b..8bd99fdf47f 100644 --- a/_data-prepper/pipelines/configuration/sources/kafka.md +++ b/_data-prepper/pipelines/configuration/sources/kafka.md @@ -46,7 +46,7 @@ Option | Required | Type | Description ### Topics -Use the following options in the `topics` array (per topic). +Use the following options in the `topics` array for each topic. Option | Required | Type | Description :--- | :--- | :--- | :--- @@ -71,7 +71,7 @@ Option | Required | Type | Description ### Schema -The `schema` configuration has the following configuration options. +The `schema` configuration has the following options. Option | Type | Required | Description :--- | :--- | :--- @@ -101,23 +101,23 @@ schema: #### Schema Registry over TLS -The Kafka source uses the JVM truststore when connecting to Schema Registry over `https`. If Schema Registry is signed by a custom CA, add that CA to the Data Prepper JVM truststore or provide a custom truststore via environment variables. The following commands can be used to configure this in docker-compose deployment. +The Kafka source uses the JVM truststore when connecting to Schema Registry over `https`. If Schema Registry is signed by a custom CA, add that CA to the Data Prepper JVM truststore or provide a custom truststore via environment variables. -Build a truststore with your CA: +You can use the following command to build a truststore with your CA certificate: ```bash keytool -importcert -noprompt -alias sr-ca -file sr-ca.pem -keystore /usr/share/data-prepper/certs/sr.truststore.jks -storepass changeit ``` {% include copy.html %} -Configure Data Prepper using `JAVA_TOOL_OPTIONS`: +The following command configures Data Prepper using `JAVA_TOOL_OPTIONS`: ```yaml JAVA_TOOL_OPTIONS=-Djavax.net.ssl.trustStore=/usr/share/data-prepper/certs/sr.truststore.jks -Djavax.net.ssl.trustStorePassword=changeit ``` {% include copy.html %} -The following configuration can be used to configure Data Pepper in Docker Compose: +You can configure Data Pepper in `docker-compose.yaml` using the following method: ```yaml environment: @@ -179,28 +179,6 @@ encryption: ``` {% include copy.html %} -**Trusting a private CA for brokers** - -As with Schema Registry, the Kafka client inside Data Prepper uses the JVM truststore. Provide a truststore containing your Kafka broker CA: - -```bash -keytool -importcert -noprompt -alias kafka-ca -file kafka-ca.pem -keystore /usr/share/data-prepper/certs/kafka.truststore.jks -storepass changeit -``` -{% include copy.html %} - -See following docker-compose excerpt: - -```yaml -services: - data-prepper: - image: opensearchproject/data-prepper:latest - environment: - - JAVA_TOOL_OPTIONS=-Djavax.net.ssl.trustStore=/usr/share/data-prepper/certs/kafka.truststore.jks -Djavax.net.ssl.trustStorePassword=changeit - volumes: - - ./certs:/usr/share/data-prepper/certs:ro -``` -{% include copy.html %} - #### AWS Use the following options when setting up authentication for `aws` services. @@ -222,6 +200,8 @@ Option | Required | Type | Description ## Configuration examples +This section demonstrates different pipeline configuration options. + ### Basic Kafka source The following example pipeline reads JSON messages from a single plaintext Kafka topic with multiple consumer workers, parses them, and indexes into OpenSearch: From 9c9aba53d29107d64577b3161d65c2c2c9e3e0c0 Mon Sep 17 00:00:00 2001 From: Anton Rubin Date: Thu, 6 Nov 2025 12:42:50 +0000 Subject: [PATCH 3/4] expanding on kafka data prepper docs Signed-off-by: Anton Rubin --- .../pipelines/configuration/sources/kafka.md | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/_data-prepper/pipelines/configuration/sources/kafka.md b/_data-prepper/pipelines/configuration/sources/kafka.md index 8bd99fdf47f..1388cefc4cb 100644 --- a/_data-prepper/pipelines/configuration/sources/kafka.md +++ b/_data-prepper/pipelines/configuration/sources/kafka.md @@ -75,14 +75,14 @@ The `schema` configuration has the following options. Option | Type | Required | Description :--- | :--- | :--- -`type` | String | Yes | Sets the type of schema based on your registry, either the AWS Glue Schema Registry, `aws_glue`, or the Confluent Schema Registry, `confluent`. When using the `aws_glue` registry, set any [AWS](#aws) configuration options. -`basic_auth_credentials_source` | String | No | Where Schema Registry credentials come from. Use `USER_INFO` when providing `api_key/api_secret`. Other valid values are `URL` and `SASL_INHERIT`. Default typically aligns with the underlying client. +`type` | String | Yes | Sets the type of schema based on your registry, either the AWS Glue schema registry, `aws_glue`, or the Confluent schema registry, `confluent`. When using the `aws_glue` registry, set any [AWS](#aws) configuration options. +`basic_auth_credentials_source` | String | No | Where schema registry credentials come from. Use `USER_INFO` when providing `api_key/api_secret`. Other valid values are `URL` and `SASL_INHERIT`. Default typically aligns with the underlying client. The following configuration options are only required when using a `confluent` registry. Option | Type | Description :--- | :--- | :--- -`registry_url` | String | Base URL of the Schema Registry, for example, `http://schema-registry:8081` or `https://sr.example.com`. +`registry_url` | String | Base URL of the schema registry, for example, `http://schema-registry:8081` or `https://sr.example.com`. `version` | String | Schema version to use per subject. Use an integer or `"latest"`. `api_key` | String | The schema registry API key. `api_secret` | String | The schema registry API secret. @@ -99,9 +99,9 @@ schema: ``` {% include copy.html %} -#### Schema Registry over TLS +#### schema registry over TLS -The Kafka source uses the JVM truststore when connecting to Schema Registry over `https`. If Schema Registry is signed by a custom CA, add that CA to the Data Prepper JVM truststore or provide a custom truststore via environment variables. +The Kafka source uses the JVM truststore when connecting to schema registry over `https`. If schema registry is signed by a custom CA, add that CA to the Data Prepper JVM truststore or provide a custom truststore using environment variables. You can use the following command to build a truststore with your CA certificate: @@ -175,7 +175,7 @@ Option | Required | Type | Description encryption: type: ssl # With public CA: no extra config needed. - # With private CA: trust via JVM truststore. + # With private CA: trust using JVM truststore. ``` {% include copy.html %} @@ -280,9 +280,9 @@ kafka-pipeline: ``` {% include copy.html %} -### Amazon MSK with AWS Glue Schema Registry +### Amazon MSK with AWS Glue schema registry -The following example configures Amazon MSK with AWS Glue Schema Registry, consumes from an MSK cluster using AWS settings, deserializes payload using AWS Glue Schema Registry, normalizes timestamps, and writes to an Amazon OpenSearch domain: +The following example configures Amazon MSK with AWS Glue schema registry, consumes from an MSK cluster using AWS settings, deserializes payload using AWS Glue schema registry, normalizes timestamps, and writes to an Amazon OpenSearch domain: ```yaml msk-pipeline: @@ -318,9 +318,9 @@ msk-pipeline: ``` {% include copy.html %} -### Confluent Kafka with Schema Registry +### Confluent Kafka with schema registry -The following example configures Confluent Kafka with Schema Registry, connects to Confluent Cloud over TLS with SASL and Confluent Schema Registry credentials, decodes payloads, and indexes them into OpenSearch: +The following example configures Confluent Kafka with schema registry, connects to Confluent Cloud over TLS with SASL and Confluent schema registry credentials, decodes payloads, and indexes them into OpenSearch: ```yaml confluent-pipeline: From 0281a2db4830869a3211803194be588a6f6578a2 Mon Sep 17 00:00:00 2001 From: AntonEliatra Date: Thu, 6 Nov 2025 13:48:57 +0000 Subject: [PATCH 4/4] Update kafka.md Signed-off-by: AntonEliatra --- _data-prepper/pipelines/configuration/sources/kafka.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/_data-prepper/pipelines/configuration/sources/kafka.md b/_data-prepper/pipelines/configuration/sources/kafka.md index 1388cefc4cb..9ed6586d4b4 100644 --- a/_data-prepper/pipelines/configuration/sources/kafka.md +++ b/_data-prepper/pipelines/configuration/sources/kafka.md @@ -222,7 +222,7 @@ kafka-pipeline: - opensearch: hosts: ["https://localhost:9200"] username: admin - password: admin + password: admin_password index: kafka-data ``` {% include copy.html %} @@ -246,7 +246,7 @@ kafka-pipeline: - opensearch: hosts: ["https://localhost:9200"] username: admin - password: admin + password: admin_password index: secure-kafka-data ``` {% include copy.html %} @@ -275,7 +275,7 @@ kafka-pipeline: - opensearch: hosts: ["https://localhost:9200"] username: admin - password: admin + password: admin_password index: authenticated-kafka-data ``` {% include copy.html %}