Skip to content

Commit 293a26b

Browse files
committed
[CDAP-15787] Add failure collector for Kafka streaming source
1 parent 9d0afe2 commit 293a26b

File tree

12 files changed

+992
-611
lines changed

12 files changed

+992
-611
lines changed

kafka-plugins-0.10/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,16 @@
1919
<artifactId>kafka-plugins-common</artifactId>
2020
<version>${project.parent.version}</version>
2121
</dependency>
22+
<dependency>
23+
<groupId>io.cdap.plugin</groupId>
24+
<artifactId>format-common</artifactId>
25+
<version>${cdap.plugin.version}</version>
26+
</dependency>
27+
<dependency>
28+
<groupId>io.cdap.plugin</groupId>
29+
<artifactId>format-avro</artifactId>
30+
<version>${cdap.plugin.version}</version>
31+
</dependency>
2232
<dependency>
2333
<groupId>org.apache.kafka</groupId>
2434
<artifactId>kafka_2.11</artifactId>

kafka-plugins-0.10/src/main/java/io/cdap/plugin/source/KafkaConfig.java

Lines changed: 169 additions & 56 deletions
Large diffs are not rendered by default.
Lines changed: 290 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,290 @@
1+
/*
2+
* Copyright © 2019 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.source;
18+
19+
import com.google.common.base.Joiner;
20+
import com.google.common.collect.Iterables;
21+
import com.google.common.collect.Sets;
22+
import io.cdap.cdap.api.data.format.FormatSpecification;
23+
import io.cdap.cdap.api.data.format.RecordFormat;
24+
import io.cdap.cdap.api.data.format.StructuredRecord;
25+
import io.cdap.cdap.api.data.schema.Schema;
26+
import io.cdap.cdap.etl.api.FailureCollector;
27+
import io.cdap.cdap.etl.api.streaming.StreamingContext;
28+
import io.cdap.cdap.format.RecordFormats;
29+
import io.cdap.plugin.common.KafkaHelpers;
30+
import kafka.api.OffsetRequest;
31+
import org.apache.kafka.clients.consumer.Consumer;
32+
import org.apache.kafka.clients.consumer.ConsumerConfig;
33+
import org.apache.kafka.clients.consumer.ConsumerRecord;
34+
import org.apache.kafka.clients.consumer.KafkaConsumer;
35+
import org.apache.kafka.common.PartitionInfo;
36+
import org.apache.kafka.common.TopicPartition;
37+
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
38+
import org.apache.spark.api.java.JavaRDD;
39+
import org.apache.spark.api.java.function.Function;
40+
import org.apache.spark.api.java.function.Function2;
41+
import org.apache.spark.streaming.Time;
42+
import org.apache.spark.streaming.api.java.JavaDStream;
43+
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
44+
import org.apache.spark.streaming.kafka010.KafkaUtils;
45+
import org.apache.spark.streaming.kafka010.LocationStrategies;
46+
import org.slf4j.Logger;
47+
import org.slf4j.LoggerFactory;
48+
49+
import java.nio.ByteBuffer;
50+
import java.util.ArrayList;
51+
import java.util.Collections;
52+
import java.util.HashMap;
53+
import java.util.HashSet;
54+
import java.util.List;
55+
import java.util.Map;
56+
import java.util.Properties;
57+
import java.util.Set;
58+
59+
/**
60+
* Util method for {@link KafkaStreamingSource}.
61+
*
62+
* This class contains methods for {@link KafkaStreamingSource} that require spark classes because during validation
63+
* spark classes are not available. Refer CDAP-15912 for more information.
64+
*/
65+
final class KafkaStreaminSourceUtil {
66+
private static final Logger LOG = LoggerFactory.getLogger(KafkaStreaminSourceUtil.class);
67+
68+
/**
69+
* Returns {@link JavaDStream} for {@link KafkaStreamingSource}.
70+
*
71+
* @param context streaming context
72+
* @param conf kafka conf
73+
* @param collector failure collector
74+
*/
75+
static JavaDStream<StructuredRecord> getStructuredRecordJavaDStream(
76+
StreamingContext context, KafkaConfig conf, FailureCollector collector) {
77+
Map<String, Object> kafkaParams = new HashMap<>();
78+
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, conf.getBrokers());
79+
// Spark saves the offsets in checkpoints, no need for Kafka to save them
80+
kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
81+
kafkaParams.put("key.deserializer", ByteArrayDeserializer.class.getCanonicalName());
82+
kafkaParams.put("value.deserializer", ByteArrayDeserializer.class.getCanonicalName());
83+
KafkaHelpers.setupKerberosLogin(kafkaParams, conf.getPrincipal(), conf.getKeytabLocation());
84+
// Create a unique string for the group.id using the pipeline name and the topic.
85+
// group.id is a Kafka consumer property that uniquely identifies the group of
86+
// consumer processes to which this consumer belongs.
87+
kafkaParams.put("group.id", Joiner.on("-").join(context.getPipelineName().length(), conf.getTopic().length(),
88+
context.getPipelineName(), conf.getTopic()));
89+
kafkaParams.putAll(conf.getKafkaProperties());
90+
91+
Properties properties = new Properties();
92+
properties.putAll(kafkaParams);
93+
// change the request timeout to fetch the metadata to be 15 seconds or 1 second greater than session time out ms,
94+
// since this config has to be greater than the session time out, which is by default 10 seconds
95+
// the KafkaConsumer at runtime should still use the default timeout 305 seconds or whataver the user provides in
96+
// kafkaConf
97+
int requestTimeout = 15 * 1000;
98+
if (conf.getKafkaProperties().containsKey(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG)) {
99+
requestTimeout =
100+
Math.max(requestTimeout,
101+
Integer.valueOf(conf.getKafkaProperties().get(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG) + 1000));
102+
}
103+
properties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout);
104+
try (Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(properties, new ByteArrayDeserializer(),
105+
new ByteArrayDeserializer())) {
106+
Map<TopicPartition, Long> offsets = conf.getInitialPartitionOffsets(
107+
getPartitions(consumer, conf, collector), collector);
108+
collector.getOrThrowException();
109+
110+
// KafkaUtils doesn't understand -1 and -2 as smallest offset and latest offset.
111+
// so we have to replace them with the actual smallest and latest
112+
List<TopicPartition> earliestOffsetRequest = new ArrayList<>();
113+
List<TopicPartition> latestOffsetRequest = new ArrayList<>();
114+
for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
115+
TopicPartition topicAndPartition = entry.getKey();
116+
Long offset = entry.getValue();
117+
if (offset == OffsetRequest.EarliestTime()) {
118+
earliestOffsetRequest.add(topicAndPartition);
119+
} else if (offset == OffsetRequest.LatestTime()) {
120+
latestOffsetRequest.add(topicAndPartition);
121+
}
122+
}
123+
124+
Set<TopicPartition> allOffsetRequest =
125+
Sets.newHashSet(Iterables.concat(earliestOffsetRequest, latestOffsetRequest));
126+
Map<TopicPartition, Long> offsetsFound = new HashMap<>();
127+
offsetsFound.putAll(KafkaHelpers.getEarliestOffsets(consumer, earliestOffsetRequest));
128+
offsetsFound.putAll(KafkaHelpers.getLatestOffsets(consumer, latestOffsetRequest));
129+
for (TopicPartition topicAndPartition : allOffsetRequest) {
130+
offsets.put(topicAndPartition, offsetsFound.get(topicAndPartition));
131+
}
132+
133+
Set<TopicPartition> missingOffsets = Sets.difference(allOffsetRequest, offsetsFound.keySet());
134+
if (!missingOffsets.isEmpty()) {
135+
throw new IllegalStateException(String.format(
136+
"Could not find offsets for %s. Please check all brokers were included in the broker list.", missingOffsets));
137+
}
138+
LOG.info("Using initial offsets {}", offsets);
139+
140+
return KafkaUtils.createDirectStream(
141+
context.getSparkStreamingContext(), LocationStrategies.PreferConsistent(),
142+
ConsumerStrategies.<byte[], byte[]>Subscribe(Collections.singleton(conf.getTopic()), kafkaParams, offsets)
143+
).transform(new RecordTransform(conf));
144+
}
145+
}
146+
147+
/**
148+
* Applies the format function to each rdd.
149+
*/
150+
private static class RecordTransform
151+
implements Function2<JavaRDD<ConsumerRecord<byte[], byte[]>>, Time, JavaRDD<StructuredRecord>> {
152+
153+
private final KafkaConfig conf;
154+
155+
RecordTransform(KafkaConfig conf) {
156+
this.conf = conf;
157+
}
158+
159+
@Override
160+
public JavaRDD<StructuredRecord> call(JavaRDD<ConsumerRecord<byte[], byte[]>> input, Time batchTime) {
161+
Function<ConsumerRecord<byte[], byte[]>, StructuredRecord> recordFunction = conf.getFormat() == null ?
162+
new BytesFunction(batchTime.milliseconds(), conf) :
163+
new FormatFunction(batchTime.milliseconds(), conf);
164+
return input.map(recordFunction);
165+
}
166+
}
167+
168+
private static Set<Integer> getPartitions(Consumer<byte[], byte[]> consumer, KafkaConfig conf,
169+
FailureCollector collector) {
170+
Set<Integer> partitions = conf.getPartitions(collector);
171+
collector.getOrThrowException();
172+
173+
if (!partitions.isEmpty()) {
174+
return partitions;
175+
}
176+
177+
partitions = new HashSet<>();
178+
for (PartitionInfo partitionInfo : consumer.partitionsFor(conf.getTopic())) {
179+
partitions.add(partitionInfo.partition());
180+
}
181+
return partitions;
182+
}
183+
184+
/**
185+
* Common logic for transforming kafka key, message, partition, and offset into a structured record.
186+
* Everything here should be serializable, as Spark Streaming will serialize all functions.
187+
*/
188+
private abstract static class BaseFunction implements Function<ConsumerRecord<byte[], byte[]>, StructuredRecord> {
189+
private final long ts;
190+
protected final KafkaConfig conf;
191+
private transient String messageField;
192+
private transient String timeField;
193+
private transient String keyField;
194+
private transient String partitionField;
195+
private transient String offsetField;
196+
private transient Schema schema;
197+
198+
BaseFunction(long ts, KafkaConfig conf) {
199+
this.ts = ts;
200+
this.conf = conf;
201+
}
202+
203+
@Override
204+
public StructuredRecord call(ConsumerRecord<byte[], byte[]> in) throws Exception {
205+
// first time this was called, initialize schema and time, key, and message fields.
206+
if (schema == null) {
207+
schema = conf.getSchema();
208+
timeField = conf.getTimeField();
209+
keyField = conf.getKeyField();
210+
partitionField = conf.getPartitionField();
211+
offsetField = conf.getOffsetField();
212+
for (Schema.Field field : schema.getFields()) {
213+
String name = field.getName();
214+
if (!name.equals(timeField) && !name.equals(keyField)) {
215+
messageField = name;
216+
break;
217+
}
218+
}
219+
}
220+
221+
StructuredRecord.Builder builder = StructuredRecord.builder(schema);
222+
if (timeField != null) {
223+
builder.set(timeField, ts);
224+
}
225+
if (keyField != null) {
226+
builder.set(keyField, in.key());
227+
}
228+
if (partitionField != null) {
229+
builder.set(partitionField, in.partition());
230+
}
231+
if (offsetField != null) {
232+
builder.set(offsetField, in.offset());
233+
}
234+
addMessage(builder, messageField, in.value());
235+
return builder.build();
236+
}
237+
238+
protected abstract void addMessage(StructuredRecord.Builder builder, String messageField,
239+
byte[] message) throws Exception;
240+
}
241+
242+
/**
243+
* Transforms kafka key and message into a structured record when message format is not given.
244+
* Everything here should be serializable, as Spark Streaming will serialize all functions.
245+
*/
246+
private static class BytesFunction extends BaseFunction {
247+
248+
BytesFunction(long ts, KafkaConfig conf) {
249+
super(ts, conf);
250+
}
251+
252+
@Override
253+
protected void addMessage(StructuredRecord.Builder builder, String messageField, byte[] message) {
254+
builder.set(messageField, message);
255+
}
256+
}
257+
258+
/**
259+
* Transforms kafka key and message into a structured record when message format and schema are given.
260+
* Everything here should be serializable, as Spark Streaming will serialize all functions.
261+
*/
262+
private static class FormatFunction extends BaseFunction {
263+
private transient RecordFormat<ByteBuffer, StructuredRecord> recordFormat;
264+
265+
FormatFunction(long ts, KafkaConfig conf) {
266+
super(ts, conf);
267+
}
268+
269+
@Override
270+
protected void addMessage(StructuredRecord.Builder builder, String messageField, byte[] message) throws Exception {
271+
// first time this was called, initialize record format
272+
if (recordFormat == null) {
273+
Schema messageSchema = conf.getMessageSchema();
274+
FormatSpecification spec =
275+
new FormatSpecification(conf.getFormat(), messageSchema, new HashMap<>());
276+
recordFormat = RecordFormats.createInitializedFormat(spec);
277+
}
278+
279+
StructuredRecord messageRecord = recordFormat.read(ByteBuffer.wrap(message));
280+
for (Schema.Field field : messageRecord.getSchema().getFields()) {
281+
String fieldName = field.getName();
282+
builder.set(fieldName, messageRecord.get(fieldName));
283+
}
284+
}
285+
}
286+
287+
private KafkaStreaminSourceUtil() {
288+
// no-op
289+
}
290+
}

0 commit comments

Comments
 (0)