Skip to content

Commit fea4d79

Browse files
authored
Merge pull request #40 from data-integrations/feature/CDAP-14422-offset-file
Change to use file to store offset information instead of table
2 parents 804de80 + 59404de commit fea4d79

File tree

34 files changed

+1592
-2344
lines changed

34 files changed

+1592
-2344
lines changed

kafka-plugins-0.10/docs/Kafka-batchsource.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ Properties
2323

2424
**topic:** The Kafka topic to read from. (Macro-enabled)
2525

26-
**tableName:** Optional table name to track the latest offset we read from kafka. It is recommended to name it same as the
27-
pipeline name to avoid conflict on table names. By default it will be the topic name. (Macro-enabled)
26+
**offsetDir:** Optional directory path to track the latest offset we read from kafka. It is useful for incrementally
27+
processing data from Kafka across subsequent runs. (Macro-enabled)
2828

2929
**partitions:** List of topic partitions to read from. If not specified, all partitions will be read. (Macro-enabled)
3030

kafka-plugins-0.10/pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,11 @@
1414
<version>${project.parent.version}-0.10.2.0</version>
1515

1616
<dependencies>
17+
<dependency>
18+
<groupId>co.cask.hydrator</groupId>
19+
<artifactId>kafka-plugins-common</artifactId>
20+
<version>${project.parent.version}</version>
21+
</dependency>
1722
<dependency>
1823
<groupId>org.apache.kafka</groupId>
1924
<artifactId>kafka_2.11</artifactId>
@@ -128,6 +133,13 @@
128133
<version>${cdap.version}</version>
129134
<scope>test</scope>
130135
</dependency>
136+
<dependency>
137+
<groupId>co.cask.hydrator</groupId>
138+
<artifactId>kafka-plugins-common</artifactId>
139+
<version>${project.parent.version}</version>
140+
<type>test-jar</type>
141+
<scope>test</scope>
142+
</dependency>
131143
<dependency>
132144
<groupId>io.netty</groupId>
133145
<artifactId>netty-all</artifactId>

kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaReader.java renamed to kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batch/source/Kafka10Reader.java

Lines changed: 35 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,18 @@
2626
import org.slf4j.Logger;
2727
import org.slf4j.LoggerFactory;
2828

29-
import java.io.IOException;
3029
import java.nio.ByteBuffer;
3130
import java.util.Iterator;
31+
import java.util.NoSuchElementException;
3232
import java.util.Properties;
3333
import java.util.concurrent.TimeUnit;
3434

3535

3636
/**
3737
* A class which reads from the fetch results from kafka.
3838
*/
39-
public class KafkaReader {
40-
private static final Logger LOG = LoggerFactory.getLogger(KafkaReader.class);
39+
final class Kafka10Reader implements KafkaReader {
40+
private static final Logger LOG = LoggerFactory.getLogger(Kafka10Reader.class);
4141
private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
4242

4343
// index of context
@@ -50,61 +50,58 @@ public class KafkaReader {
5050

5151

5252
/**
53-
* Construct using the json representation of the kafka request
53+
* Construct a reader based on the given {@link KafkaRequest}.
5454
*/
55-
public KafkaReader(KafkaRequest request) {
55+
Kafka10Reader(KafkaRequest request) {
5656
kafkaRequest = request;
57-
currentOffset = request.getOffset();
58-
lastOffset = request.getLastOffset();
57+
currentOffset = request.getStartOffset();
58+
lastOffset = request.getEndOffset();
5959

6060
// read data from queue
6161
Properties properties = new Properties();
6262
properties.putAll(request.getConf());
6363
consumer = new KafkaConsumer<>(properties, new ByteArrayDeserializer(), new ByteArrayDeserializer());
64-
fetch();
6564
}
6665

67-
public boolean hasNext() throws IOException {
66+
@Override
67+
public boolean hasNext() {
6868
if (currentOffset >= lastOffset) {
6969
return false;
7070
}
7171
if (messageIter != null && messageIter.hasNext()) {
7272
return true;
73-
} else {
74-
return fetch();
7573
}
74+
return fetch();
7675
}
7776

7877
/**
79-
* Fetches the next Kafka message and stuffs the results into the key and value.
78+
* Fetches the next Kafka message. The message key will be set into the given {@link KafkaKey} object, and the message
79+
* payload will be returned.
8080
*/
81-
public KafkaMessage getNext(KafkaKey kafkaKey) throws IOException {
82-
if (hasNext()) {
83-
ConsumerRecord<byte[], byte[]> consumerRecord = messageIter.next();
84-
85-
byte[] keyBytes = consumerRecord.key();
86-
byte[] value = consumerRecord.value();
87-
if (value == null) {
88-
LOG.warn("Received message with null message.payload with topic {} and partition {}",
89-
kafkaKey.getTopic(), kafkaKey.getPartition());
90-
}
91-
92-
ByteBuffer payload = value == null ? ByteBuffer.wrap(EMPTY_BYTE_ARRAY) : ByteBuffer.wrap(value);
93-
ByteBuffer key = keyBytes == null ? ByteBuffer.wrap(EMPTY_BYTE_ARRAY) : ByteBuffer.wrap(keyBytes);
94-
95-
kafkaKey.clear();
96-
kafkaKey.set(kafkaRequest.getTopic(), kafkaRequest.getPartition(), currentOffset,
97-
consumerRecord.offset() + 1);
98-
kafkaKey.setMessageSize(value == null ? -1 : value.length);
99-
currentOffset = consumerRecord.offset() + 1; // increase offset
100-
return new KafkaMessage(payload, key);
101-
} else {
102-
return null;
81+
@Override
82+
public KafkaMessage getNext(KafkaKey kafkaKey) {
83+
if (!hasNext()) {
84+
throw new NoSuchElementException("No message is available");
10385
}
86+
87+
ConsumerRecord<byte[], byte[]> consumerRecord = messageIter.next();
88+
89+
byte[] keyBytes = consumerRecord.key();
90+
byte[] value = consumerRecord.value();
91+
92+
ByteBuffer key = keyBytes == null ? ByteBuffer.wrap(EMPTY_BYTE_ARRAY) : ByteBuffer.wrap(keyBytes);
93+
ByteBuffer payload = value == null ? ByteBuffer.wrap(EMPTY_BYTE_ARRAY) : ByteBuffer.wrap(value);
94+
95+
kafkaKey.set(currentOffset, consumerRecord.offset() + 1,
96+
consumerRecord.serializedKeySize() + consumerRecord.serializedValueSize(), consumerRecord.checksum());
97+
currentOffset = consumerRecord.offset() + 1; // increase offset
98+
return new KafkaMessage(payload, key);
10499
}
105100

106101
/**
107-
* Creates a fetch request.
102+
* Fetch messages from Kafka.
103+
*
104+
* @return {@code true} if there is some messages available, {@code false} otherwise
108105
*/
109106
private boolean fetch() {
110107
if (currentOffset >= lastOffset) {
@@ -124,9 +121,10 @@ private boolean fetch() {
124121
}
125122

126123
/**
127-
* Closes this context
124+
* Closes this reader.
128125
*/
129-
public void close() throws IOException {
126+
@Override
127+
public void close() {
130128
if (consumer != null) {
131129
consumer.close();
132130
}

0 commit comments

Comments
 (0)