diff --git a/_data-prepper/pipelines/configuration/sources/kafka.md b/_data-prepper/pipelines/configuration/sources/kafka.md index bdcfcbf549..9ed6586d4b 100644 --- a/_data-prepper/pipelines/configuration/sources/kafka.md +++ b/_data-prepper/pipelines/configuration/sources/kafka.md @@ -14,17 +14,19 @@ You can use the Apache Kafka source (`kafka`) in OpenSearch Data Prepper to read The following example shows the `kafka` source in a Data Prepper pipeline: -```json +```yaml kafka-pipeline: source: kafka: bootstrap_servers: - - 127.0.0.1:9093 + - 127.0.0.1:9092 topics: - name: Topic1 group_id: groupID1 - name: Topic2 group_id: groupID1 + sink: + - stdout: {} ``` ## Configuration @@ -44,7 +46,7 @@ Option | Required | Type | Description ### Topics -Use the following options in the `topics` array. +Use the following options in the `topics` array for each topic. Option | Required | Type | Description :--- | :--- | :--- | :--- @@ -65,55 +67,102 @@ Option | Required | Type | Description `retry_backoff` | No | Integer | The amount of time to wait before attempting to retry a failed request to a given topic partition. Default is `10s`. `max_poll_interval` | No | Integer | The maximum delay between invocations of a `poll()` when using group management through Kafka's `max.poll.interval.ms` option. Default is `300s`. `consumer_max_poll_records` | No | Integer | The maximum number of records returned in a single `poll()` call through Kafka's `max.poll.records` setting. Default is `500`. -`key_mode` | No | String | Indicates how the key field of the Kafka message should be handled. The default setting is `include_as_field`, which includes the key in the `kafka_key` event. The `include_as_metadata` setting includes the key in the event's metadata. The `discard` setting discards the key. +`key_mode` | No | String | Indicates how the key field of the Kafka message should be handled. The default setting is `include_as_field`, which includes the key in the `kafka_key` event. The `include_as_metadata` setting includes the key in the event's metadata. The `discard` setting discards the key. ### Schema -The following option is required inside the `schema` configuration. +The `schema` configuration has the following options. -Option | Type | Description +Option | Type | Required | Description :--- | :--- | :--- -`type` | String | Sets the type of schema based on your registry, either the AWS Glue Schema Registry, `aws_glue`, or the Confluent Schema Registry, `confluent`. When using the `aws_glue` registry, set any [AWS](#aws) configuration options. +`type` | String | Yes | Sets the type of schema based on your registry, either the AWS Glue schema registry, `aws_glue`, or the Confluent schema registry, `confluent`. When using the `aws_glue` registry, set any [AWS](#aws) configuration options. +`basic_auth_credentials_source` | String | No | Where schema registry credentials come from. Use `USER_INFO` when providing `api_key/api_secret`. Other valid values are `URL` and `SASL_INHERIT`. Default typically aligns with the underlying client. The following configuration options are only required when using a `confluent` registry. Option | Type | Description :--- | :--- | :--- -`registry_url` | String | Deserializes a record value from a `bytearray` into a string. Default is `org.apache.kafka.common.serialization.StringDeserializer`. -`version` | String | Deserializes a record key from a `bytearray` into a string. Default is `org.apache.kafka.common.serialization.StringDeserializer`. -`schema_registry_api_key` | String | The schema registry API key. -`schema_registry_api_secret` | String | The schema registry API secret. +`registry_url` | String | Base URL of the schema registry, for example, `http://schema-registry:8081` or `https://sr.example.com`. +`version` | String | Schema version to use per subject. Use an integer or `"latest"`. +`api_key` | String | The schema registry API key. +`api_secret` | String | The schema registry API secret. + +See following example of configuring schema registry: + +```yaml +schema: + type: confluent + registry_url: "http://schema-registry:8081" + api_key: "" + api_secret: "" + version: "latest" +``` +{% include copy.html %} -### Authentication +#### schema registry over TLS -The following option is required inside the `authentication` object. +The Kafka source uses the JVM truststore when connecting to schema registry over `https`. If schema registry is signed by a custom CA, add that CA to the Data Prepper JVM truststore or provide a custom truststore using environment variables. -Option | Type | Description -:--- | :--- | :--- -`sasl` | JSON object | The Simple Authentication and Security Layer (SASL) authentication configuration. +You can use the following command to build a truststore with your CA certificate: -### SASL +```bash +keytool -importcert -noprompt -alias sr-ca -file sr-ca.pem -keystore /usr/share/data-prepper/certs/sr.truststore.jks -storepass changeit +``` +{% include copy.html %} -Use one of the following options when configuring SASL authentication. +The following command configures Data Prepper using `JAVA_TOOL_OPTIONS`: +```yaml +JAVA_TOOL_OPTIONS=-Djavax.net.ssl.trustStore=/usr/share/data-prepper/certs/sr.truststore.jks -Djavax.net.ssl.trustStorePassword=changeit +``` +{% include copy.html %} -Option | Type | Description -:--- | :--- | :--- -`plaintext` | JSON object | The [PLAINTEXT](#sasl-plaintext) authentication configuration. -`aws_msk_iam` | String | The Amazon MSK AWS Identity and Access Management (IAM) configuration. If set to `role`, the `sts_role_arm` set in the `aws` configuration is used. Default is `default`. +You can configure Data Pepper in `docker-compose.yaml` using the following method: + +```yaml +environment: + - JAVA_TOOL_OPTIONS=-Djavax.net.ssl.trustStore=/usr/share/data-prepper/certs/sr.truststore.jks -Djavax.net.ssl.trustStorePassword=changeit +volumes: + - ./certs:/usr/share/data-prepper/certs:ro +``` +{% include copy.html %} + +### Authentication +The `authentication` section configures SASL. +```yaml +authentication: + sasl: + plain: + username: alice + password: secret +``` +{% include copy.html %} + +| Option | Type | Description | +|:---|:---|:---| +| `sasl` | Object | SASL configuration. | -#### SASL PLAINTEXT +#### SASL -The following options are required when using the [SASL PLAINTEXT](https://kafka.apache.org/10/javadoc/org/apache/kafka/common/security/auth/SecurityProtocol.html) protocol. +Use one of the following options when configuring SASL authentication. Option | Type | Description :--- | :--- | :--- -`username` | String | The username for the PLAINTEXT auth. -`password` | String | The password for the PLAINTEXT auth. +`plain` | JSON object | The plaintext authentication configuration. See [SASL/PLAIN](#sasl-plaintext) fr further details. +`aws_msk_iam` | String | The Amazon MSK AWS Identity and Access Management (IAM) configuration. If set to `role`, the `sts_role_arm` set in the `aws` configuration is used. Default is `default`. -#### Encryption +##### SASL plaintext + +The following options are required when using the [SASL.plain](https://kafka.apache.org/10/javadoc/org/apache/kafka/common/security/auth/SecurityProtocol.html) protocol. + +| Option | Type | Description | +|:---|:---|:---| +| `username` | String | SASL/PLAIN username. | +| `password` | String | SASL/PLAIN password. | + +### Encryption Use the following options when setting SSL encryption. @@ -122,6 +171,13 @@ Option | Required | Type | Description `type` | No | String | The encryption type. Use `none` to disable encryption. Default is `ssl`. `insecure` | No | Boolean | A Boolean flag used to turn off SSL certificate verification. If set to `true`, certificate authority (CA) certificate verification is turned off and insecure HTTP requests are sent. Default is `false`. +```yaml +encryption: + type: ssl + # With public CA: no extra config needed. + # With private CA: trust using JVM truststore. +``` +{% include copy.html %} #### AWS @@ -140,5 +196,162 @@ Use the following options inside the `msk` object. Option | Required | Type | Description :--- | :--- | :--- | :--- `arn` | Yes | String | The [MSK ARN](https://docs.aws.amazon.com/msk/1.0/apireference/configurations-arn.html) to use. -`broker_connection_type` No | String | The type of connector to use with the MSK broker, either `public`, `single_vpc`, or `multip_vpc`. Default is `single_vpc`. +`broker_connection_type` | No | String | The type of connector to use with the MSK broker, either `public`, `single_vpc`, or `multip_vpc`. Default is `single_vpc`. + +## Configuration examples + +This section demonstrates different pipeline configuration options. + +### Basic Kafka source + +The following example pipeline reads JSON messages from a single plaintext Kafka topic with multiple consumer workers, parses them, and indexes into OpenSearch: + +```yaml +kafka-pipeline: + source: + kafka: + bootstrap_servers: + - localhost:9092 + topics: + - name: my-topic + group_id: data-prepper-group + workers: 4 + processor: + - parse_json: + sink: + - opensearch: + hosts: ["https://localhost:9200"] + username: admin + password: admin_password + index: kafka-data +``` +{% include copy.html %} + +### Kafka source with SSL encryption + +The following example pipeline connects to a Kafka broker over TLS, consumes from a secure topic and writes results to OpenSearch: + +```yaml +kafka-pipeline: + source: + kafka: + bootstrap_servers: + - kafka-broker.example.com:9093 + topics: + - name: secure-topic + group_id: secure-group + encryption: + type: ssl + sink: + - opensearch: + hosts: ["https://localhost:9200"] + username: admin + password: admin_password + index: secure-kafka-data +``` +{% include copy.html %} + +### Kafka source with SASL PLAIN authentication + +The following example pipeline authenticates to Kafka using SASL/PLAIN over TLS, consumes from the topic, and indexes into OpenSearch. + +```yaml +kafka-pipeline: + source: + kafka: + bootstrap_servers: + - kafka-broker.example.com:9094 + topics: + - name: authenticated-topic + group_id: auth-group + encryption: + type: ssl + authentication: + sasl: + plaintext: + username: kafka-user + password: kafka-password + sink: + - opensearch: + hosts: ["https://localhost:9200"] + username: admin + password: admin_password + index: authenticated-kafka-data +``` +{% include copy.html %} + +### Amazon MSK with AWS Glue schema registry + +The following example configures Amazon MSK with AWS Glue schema registry, consumes from an MSK cluster using AWS settings, deserializes payload using AWS Glue schema registry, normalizes timestamps, and writes to an Amazon OpenSearch domain: + +```yaml +msk-pipeline: + source: + kafka: + acknowledgments: true + topics: + - name: my-msk-topic + group_id: msk-consumer-group + auto_offset_reset: earliest + aws: + region: us-east-1 + sts_role_arn: arn:aws:iam::123456789012:role/data-prepper-role + msk: + arn: arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster-name/uuid + schema: + type: aws_glue + registry_name: my-glue-registry + processor: + - date: + match: + - key: timestamp + patterns: ["epoch_milli"] + destination: "@timestamp" + sink: + - opensearch: + hosts: ["https://search-my-domain.us-east-1.opensearch.amazonaws.com"] + aws: + region: us-east-1 + sts_role_arn: arn:aws:iam::123456789012:role/opensearch-role + index: msk-data + index_type: custom +``` +{% include copy.html %} + +### Confluent Kafka with schema registry + +The following example configures Confluent Kafka with schema registry, connects to Confluent Cloud over TLS with SASL and Confluent schema registry credentials, decodes payloads, and indexes them into OpenSearch: + +```yaml +confluent-pipeline: + source: + kafka: + bootstrap_servers: + - pkc-xxxxx.us-east-1.aws.confluent.cloud:9092 + topics: + - name: confluent-topic + group_id: confluent-group + auto_offset_reset: earliest + encryption: + type: ssl + authentication: + sasl: + plain: + username: confluent-api-key + password: confluent-api-secret + schema: + type: confluent + registry_url: https://psrc-xxxxx.us-east-1.aws.confluent.cloud + api_key: "${{aws_secrets:schema-secret:schema_registry_api_key}}" + api_secret: "${{aws_secrets:schema-secret:schema_registry_api_secret}}" + basic_auth_credentials_source: USER_INFO + sink: + - opensearch: + hosts: ["https://localhost:9200"] + username: admin + password: admin_password + index_type: custom + index: confluent-data +``` +{% include copy.html %}