Skip to content

Commit c19e181

Browse files
author
poornachandra
authored
Merge pull request #25 from data-integrations/feature/CDAP-13280-kafka-spark2-streaming
CDAP-13280 Add Spark 2 streaming Kafka source
2 parents 5c075b3 + 1e7a9d5 commit c19e181

File tree

56 files changed

+3064
-301
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+3064
-301
lines changed

kafka-plugins-0.9/docs/KAFKABATCHSOURCE.md renamed to kafka-plugins-0.10/docs/KAFKABATCHSOURCE.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ Kafka batch source that emits a records with user specified schema.
1010
Usage Notes
1111
-----------
1212

13-
Kafka Batch Source can be used to read events from a kafka topic. It uses kafka consumer [0.9.1 apis](https://kafka.apache.org/090/documentation.html) to read events from a kafka topic. The Kafka Batch Source supports providing additional kafka properties for the kafka consumer, reading from kerberos-enabled kafka and limiting the number of records read. Kafka Batch Source converts incoming kafka events into cdap structured records which then can be used for further transformations.
13+
Kafka Batch Source can be used to read events from a kafka topic. It uses kafka consumer [0.10.2 apis](https://kafka.apache.org/0100/documentation.html) to read events from a kafka topic. The Kafka Batch Source supports providing additional kafka properties for the kafka consumer, reading from kerberos-enabled kafka and limiting the number of records read. Kafka Batch Source converts incoming kafka events into cdap structured records which then can be used for further transformations.
1414

1515
The source will read from the earliest available offset or the initial offset that specified in the config for the first run, remember the last offset it read last run and continue from that offset for the next run.
1616

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
[![Build Status](https://travis-ci.org/hydrator/kafka-plugins.svg?branch=master)](https://travis-ci.org/hydrator/kafka-plugins) [![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)
2+
3+
Kafka Source
4+
===========
5+
6+
Kafka streaming source that emits a records with user specified schema.
7+
8+
<img align="center" src="kafka-source-plugin-config.png" width="400" alt="plugin configuration" />
9+
10+
Usage Notes
11+
-----------
12+
13+
Kafka Streaming Source can be used to read events from a kafka topic. It uses kafka consumer [0.10.2 apis](https://kafka.apache.org/0100/documentation.html) to read events from a kafka topic. Kafka Source converts incoming kafka events into cdap structured records which then can be used for further transformations.
14+
15+
The source provides capabilities to read from latest offset or from beginning or from the provided kafka offset. The plugin relies on Spark Streaming offset [storage capabilities](https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html) to manager offsets and checkpoints.
16+
17+
Plugin Configuration
18+
---------------------
19+
20+
| Configuration | Required | Default | Description |
21+
| :------------ | :------: | :----- | :---------- |
22+
| **Kafka Brokers** | **Y** | N/A | List of Kafka brokers specified in host1:port1,host2:port2 form. |
23+
| **Kafka Topic** | **Y** | N/A | The Kafka topic to read from. |
24+
| **Topic Partition** | **N** | N/A | List of topic partitions to read from. If not specified, all partitions will be read. |
25+
| **Default Initial Offset** | **N** | N/A | The default initial offset for all topic partitions. An offset of -2 means the smallest offset. An offset of -1 means the latest offset. Defaults to -1. Offsets are inclusive. If an offset of 5 is used, the message at offset 5 will be read. If you wish to set different initial offsets for different partitions, use the initialPartitionOffsets property. |
26+
| **Initial Partition Offsets** | **N** | N/A | The initial offset for each topic partition. If this is not specified, all partitions will use the same initial offset, which is determined by the defaultInitialOffset property. Any partitions specified in the partitions property, but not in this property will use the defaultInitialOffset. An offset of -2 means the smallest offset. An offset of -1 means the latest offset. Offsets are inclusive. If an offset of 5 is used, the message at offset 5 will be read. |
27+
| **Time Field** | **N** | N/A | Optional name of the field containing the read time of the batch. If this is not set, no time field will be added to output records. If set, this field must be present in the schema property and must be a long. |
28+
| **Key Field** | **N** | N/A | Optional name of the field containing the message key. If this is not set, no key field will be added to output records. If set, this field must be present in the schema property and must be bytes. |
29+
| **Partition Field** | **N** | N/A | Optional name of the field containing the partition the message was read from. If this is not set, no partition field will be added to output records. If set, this field must be present in the schema property and must be an int. |
30+
| **Offset Field** | **N** | N/A | Optional name of the field containing the partition offset the message was read from. If this is not set, no offset field will be added to output records. If set, this field must be present in the schema property and must be a long. |
31+
| **Format** | **N** | N/A | Optional format of the Kafka event message. Any format supported by CDAP is supported. For example, a value of 'csv' will attempt to parse Kafka payloads as comma-separated values. If no format is given, Kafka message payloads will be treated as bytes. |
32+
| **Kerberos Principal** | **N** | N/A | The kerberos principal used for the source when kerberos security is enabled for kafka. |
33+
| **Keytab Location** | **N** | N/A | The keytab location for the kerberos principal when kerberos security is enabled for kafka. |
34+
35+
36+
Build
37+
-----
38+
To build this plugin:
39+
40+
```
41+
mvn clean package
42+
```
43+
44+
The build will create a .jar and .json file under the ``target`` directory.
45+
These files can be used to deploy your plugins.
46+
47+
Deployment
48+
----------
49+
You can deploy your plugins using the CDAP CLI:
50+
51+
> load artifact <target/kafka-plugins-<version>.jar config-file <target/kafka-plugins<version>.json>
52+
53+
For example, if your artifact is named 'kafka-plugins-<version>':
54+
55+
> load artifact target/kafka-plugins-<version>.jar config-file target/kafka-plugins-<version>.json
56+
57+
## Mailing Lists
58+
59+
CDAP User Group and Development Discussions:
60+
61+
* `cdap-user@googlegroups.com <https://groups.google.com/d/forum/cdap-user>`
62+
63+
The *cdap-user* mailing list is primarily for users using the product to develop
64+
applications or building plugins for appplications. You can expect questions from
65+
users, release announcements, and any other discussions that we think will be helpful
66+
to the users.
67+
68+
## License and Trademarks
69+
70+
Copyright © 2018 Cask Data, Inc.
71+
72+
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
73+
in compliance with the License. You may obtain a copy of the License at
74+
75+
http://www.apache.org/licenses/LICENSE-2.0
76+
77+
Unless required by applicable law or agreed to in writing, software distributed under the
78+
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
79+
either express or implied. See the License for the specific language governing permissions
80+
and limitations under the License.
81+
82+
Cask is a trademark of Cask Data, Inc. All rights reserved.
83+
84+
Apache, Apache HBase, and HBase are trademarks of The Apache Software Foundation. Used with
85+
permission. No endorsement by The Apache Software Foundation is implied by the use of these marks.

kafka-plugins-0.9/docs/KAFKAWRITER-SINK.md renamed to kafka-plugins-0.10/docs/KAFKAWRITER-SINK.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,14 @@ The sink also allows you to write events into kerberos-enabled kafka.
1212
Usage Notes
1313
-----------
1414

15-
Kafka sink emits events in realtime to configured kafka topic and partition. It uses kafka producer [0.8.2 apis](https://kafka.apache.org/082/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html) to write events into kafka.
15+
Kafka sink emits events in realtime to configured kafka topic and partition. It uses kafka producer [0.10.2 apis](https://kafka.apache.org/0100/documentation.html) to write events into kafka.
1616

1717
This sink can be configured to operate in synchronous or asynchronous mode. In synchronous mode, each event will be sent to the broker synchronously on the thread that calls it. This is not sufficient on most of the high volume environments.
1818
In async mode, the kafka producer will batch together all the kafka events for greater throughput. But that makes it open for the possibility of dropping unsent events in case of client machine failure. Since kafka producer by default uses synchronous mode, this sink also uses Synchronous producer by default.
1919

2020
It uses String partitioner and String serializer for key and value to write events to kafka. Optionally if kafka key is provided, producer will use that key to partition events accross multiple partitions in a given topic. This sink also allows compression configuration. By default compression is none.
2121

22-
Kafka producer can be tuned using many properties as shown [here](https://kafka.apache.org/082/javadoc/org/apache/kafka/clients/producer/ProducerConfig.html). This sink allows user to configure any property supported by kafka 0.8.2 Producer.
22+
Kafka producer can be tuned using many properties as shown [here](https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/ProducerConfig.html). This sink allows user to configure any property supported by kafka 0.10.2.0 Producer.
2323

2424

2525
Plugin Configuration
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
# kafka-alert-plugin
2+
3+
<a href="https://cdap-users.herokuapp.com/"><img alt="Join CDAP community" src="https://cdap-users.herokuapp.com/badge.svg?t=kafka-alert-plugin"/></a> [![Build Status](https://travis-ci.org/hydrator/kafka-alert-plugin.svg?branch=master)](https://travis-ci.org/hydrator/kafka-alert-plugin) [![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) []() <img src="https://cdap-users.herokuapp.com/assets/cm-available.svg"/>
4+
5+
Kafka Alert Publisher that allows you to publish alerts to kafka as json objects. The plugin internally uses kafka producer apis to publish alerts.
6+
The plugin allows to specify kafka topic to use for publishing and other additional kafka producer properties.
7+
This plugin uses kafka 0.10.2 java apis.
8+
9+
Build
10+
-----
11+
To build this plugin:
12+
13+
```
14+
mvn clean package
15+
```
16+
17+
The build will create a .jar and .json file under the ``target`` directory.
18+
These files can be used to deploy your plugins.
19+
20+
Deployment
21+
----------
22+
You can deploy your plugins using the CDAP CLI:
23+
24+
> load artifact <target/kafka-alert-plugin-<version>.jar config-file <target/kafka-alert-plugin<version>.json>
25+
26+
For example, if your artifact is named 'kafka-alert-plugin-<version>':
27+
28+
> load artifact target/kafka-alert-plugin-<version>.jar config-file target/kafka-alert-plugin-<version>.json
29+
30+
## Mailing Lists
31+
32+
CDAP User Group and Development Discussions:
33+
34+
* `cdap-user@googlegroups.com <https://groups.google.com/d/forum/cdap-user>`
35+
36+
The *cdap-user* mailing list is primarily for users using the product to develop
37+
applications or building plugins for appplications. You can expect questions from
38+
users, release announcements, and any other discussions that we think will be helpful
39+
to the users.
40+
41+
## IRC Channel
42+
43+
CDAP IRC Channel: #cdap on irc.freenode.net
44+
45+
46+
## License and Trademarks
47+
48+
Copyright © 2018 Cask Data, Inc.
49+
50+
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
51+
in compliance with the License. You may obtain a copy of the License at
52+
53+
http://www.apache.org/licenses/LICENSE-2.0
54+
55+
Unless required by applicable law or agreed to in writing, software distributed under the
56+
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
57+
either express or implied. See the License for the specific language governing permissions
58+
and limitations under the License.
59+
60+
Cask is a trademark of Cask Data, Inc. All rights reserved.
61+
62+
Apache, Apache HBase, and HBase are trademarks of The Apache Software Foundation. Used with
63+
permission. No endorsement by The Apache Software Foundation is implied by the use of these marks.

kafka-plugins-0.9/docs/Kafka-batchsink.md renamed to kafka-plugins-0.10/docs/Kafka-batchsink.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ Kafka sink that allows you to write events into CSV or JSON to kafka.
77
Plugin has the capability to push the data to a Kafka topic. It can also be
88
configured to partition events being written to kafka based on a configurable key.
99
The sink can also be configured to operate in sync or async mode and apply different
10-
compression types to events. Kafka sink is compatible with Kafka 0.9 and 0.10
10+
compression types to events. This plugin uses kafka 0.10.2 java apis.
1111

1212

1313
Configuration
@@ -55,4 +55,4 @@ Additional properties like number of acknowledgements and client id can also be
5555
"kafkaProperties": "acks:2,client.id:myclient",
5656
"key": "message"
5757
}
58-
}
58+
}

kafka-plugins-0.9/docs/Kafka-batchsource.md renamed to kafka-plugins-0.10/docs/Kafka-batchsource.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ Kafka batch source. Emits the record from kafka. It will emit a record based on
77
you use, or if no schema or format is specified, the message payload will be emitted. The source will
88
remember the offset it read last run and continue from that offset for the next run.
99
The Kafka batch source supports providing additional kafka properties for the kafka consumer,
10-
reading from kerberos-enabled kafka and limiting the number of records read
10+
reading from kerberos-enabled kafka and limiting the number of records read. This plugin uses kafka 0.10.2 java apis.
1111

1212
Use Case
1313
--------
@@ -106,4 +106,3 @@ For each Kafka message read, it will output a record with the schema:
106106
| count | int |
107107
| price | double |
108108
+================================+
109-
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
# Kafka Streaming Source
2+
3+
4+
Description
5+
-----------
6+
Kafka streaming source. Emits a record with the schema specified by the user. If no schema
7+
is specified, it will emit a record with two fields: 'key' (nullable string) and 'message'
8+
(bytes). This plugin uses kafka 0.10.2 java apis.
9+
10+
11+
Use Case
12+
--------
13+
This source is used whenever you want to read from Kafka. For example, you may want to read messages
14+
from Kafka and write them to a Table.
15+
16+
17+
Properties
18+
----------
19+
**referenceName:** This will be used to uniquely identify this source for lineage, annotating metadata, etc.
20+
21+
**brokers:** List of Kafka brokers specified in host1:port1,host2:port2 form. (Macro-enabled)
22+
23+
**topic:** The Kafka topic to read from. (Macro-enabled)
24+
25+
**partitions:** List of topic partitions to read from. If not specified, all partitions will be read. (Macro-enabled)
26+
27+
**defaultInitialOffset:** The default initial offset for all topic partitions.
28+
An offset of -2 means the smallest offset. An offset of -1 means the latest offset. Defaults to -1.
29+
Offsets are inclusive. If an offset of 5 is used, the message at offset 5 will be read.
30+
If you wish to set different initial offsets for different partitions, use the initialPartitionOffsets property. (Macro-enabled)
31+
32+
**initialPartitionOffsets:** The initial offset for each topic partition. If this is not specified,
33+
all partitions will use the same initial offset, which is determined by the defaultInitialOffset property.
34+
Any partitions specified in the partitions property, but not in this property will use the defaultInitialOffset.
35+
An offset of -2 means the smallest offset. An offset of -1 means the latest offset.
36+
Offsets are inclusive. If an offset of 5 is used, the message at offset 5 will be read. (Macro-enabled)
37+
38+
**schema:** Output schema of the source. If you would like the output records to contain a field with the
39+
Kafka message key, the schema must include a field of type bytes or nullable bytes, and you must set the
40+
keyField property to that field's name. Similarly, if you would like the output records to contain a field with
41+
the timestamp of when the record was read, the schema must include a field of type long or nullable long, and you
42+
must set the timeField property to that field's name. Any field that is not the timeField or keyField will be used
43+
in conjuction with the format to parse Kafka message payloads.
44+
45+
**format:** Optional format of the Kafka event message. Any format supported by CDAP is supported.
46+
For example, a value of 'csv' will attempt to parse Kafka payloads as comma-separated values.
47+
If no format is given, Kafka message payloads will be treated as bytes.
48+
49+
**timeField:** Optional name of the field containing the read time of the batch.
50+
If this is not set, no time field will be added to output records.
51+
If set, this field must be present in the schema property and must be a long.
52+
53+
**keyField:** Optional name of the field containing the message key.
54+
If this is not set, no key field will be added to output records.
55+
If set, this field must be present in the schema property and must be bytes.
56+
57+
**partitionField:** Optional name of the field containing the partition the message was read from.
58+
If this is not set, no partition field will be added to output records.
59+
If set, this field must be present in the schema property and must be an int.
60+
61+
**offsetField:** Optional name of the field containing the partition offset the message was read from.
62+
If this is not set, no offset field will be added to output records.
63+
If set, this field must be present in the schema property and must be a long.
64+
65+
**maxRatePerPartition:** Maximum number of records to read per second per partition. Defaults to 1000.
66+
67+
**principal** The kerberos principal used for the source when kerberos security is enabled for kafka.
68+
69+
**keytabLocation** The keytab location for the kerberos principal when kerberos security is enabled for kafka.
70+
71+
Example
72+
-------
73+
This example reads from the 'purchases' topic of a Kafka instance running
74+
on brokers host1.example.com:9092 and host2.example.com:9092. The source will add
75+
a time field named 'readTime' that contains a timestamp corresponding to the micro
76+
batch when the record was read. It will also contain a field named 'key' which will have
77+
the message key in it. It parses the Kafka messages using the 'csv' format
78+
with 'user', 'item', 'count', and 'price' as the message schema.
79+
80+
{
81+
"name": "Kafka",
82+
"type": "streamingsource",
83+
"properties": {
84+
"topics": "purchases",
85+
"brokers": "host1.example.com:9092,host2.example.com:9092",
86+
"format": "csv",
87+
"timeField": "readTime",
88+
"keyField": "key",
89+
"schema": "{
90+
\"type\":\"record\",
91+
\"name\":\"purchase\",
92+
\"fields\":[
93+
{\"name\":\"readTime\",\"type\":\"long\"},
94+
{\"name\":\"key\",\"type\":\"bytes\"},
95+
{\"name\":\"user\",\"type\":\"string\"},
96+
{\"name\":\"item\",\"type\":\"string\"},
97+
{\"name\":\"count\",\"type\":\"int\"},
98+
{\"name\":\"price\",\"type\":\"double\"}
99+
]
100+
}"
101+
}
102+
}
103+
104+
For each Kafka message read, it will output a record with the schema:
105+
106+
+================================+
107+
| field name | type |
108+
+================================+
109+
| readTime | long |
110+
| key | bytes |
111+
| user | string |
112+
| item | string |
113+
| count | int |
114+
| price | double |
115+
+================================+
116+
117+
Note that the readTime field is not derived from the Kafka message, but from the time that the
118+
message was read.

0 commit comments

Comments
 (0)