1616
1717package io .cdap .plugin .source ;
1818
19- import com .google .common .base .Joiner ;
20- import com .google .common .collect .Iterables ;
21- import com .google .common .collect .Sets ;
2219import io .cdap .cdap .api .annotation .Description ;
2320import io .cdap .cdap .api .annotation .Name ;
2421import io .cdap .cdap .api .annotation .Plugin ;
25- import io .cdap .cdap .api .data .format .FormatSpecification ;
26- import io .cdap .cdap .api .data .format .RecordFormat ;
2722import io .cdap .cdap .api .data .format .StructuredRecord ;
2823import io .cdap .cdap .api .data .schema .Schema ;
24+ import io .cdap .cdap .etl .api .FailureCollector ;
2925import io .cdap .cdap .etl .api .PipelineConfigurer ;
26+ import io .cdap .cdap .etl .api .StageConfigurer ;
3027import io .cdap .cdap .etl .api .streaming .StreamingContext ;
3128import io .cdap .cdap .etl .api .streaming .StreamingSource ;
32- import io .cdap .cdap .format .RecordFormats ;
33- import io .cdap .plugin .common .KafkaHelpers ;
34- import kafka .api .OffsetRequest ;
35- import org .apache .kafka .clients .consumer .Consumer ;
36- import org .apache .kafka .clients .consumer .ConsumerConfig ;
37- import org .apache .kafka .clients .consumer .ConsumerRecord ;
38- import org .apache .kafka .clients .consumer .KafkaConsumer ;
39- import org .apache .kafka .common .PartitionInfo ;
40- import org .apache .kafka .common .TopicPartition ;
41- import org .apache .kafka .common .serialization .ByteArrayDeserializer ;
42- import org .apache .spark .api .java .JavaRDD ;
43- import org .apache .spark .api .java .function .Function ;
44- import org .apache .spark .api .java .function .Function2 ;
45- import org .apache .spark .streaming .Time ;
4629import org .apache .spark .streaming .api .java .JavaDStream ;
47- import org .apache .spark .streaming .kafka010 .ConsumerStrategies ;
48- import org .apache .spark .streaming .kafka010 .KafkaUtils ;
49- import org .apache .spark .streaming .kafka010 .LocationStrategies ;
50- import org .slf4j .Logger ;
51- import org .slf4j .LoggerFactory ;
5230
53- import java .nio .ByteBuffer ;
54- import java .util .ArrayList ;
55- import java .util .Collections ;
5631import java .util .HashMap ;
57- import java .util .HashSet ;
58- import java .util .List ;
5932import java .util .Map ;
60- import java .util .Properties ;
61- import java .util .Set ;
6233
6334/**
64- * Kafka Streaming source
35+ * Kafka Streaming source.
6536 */
6637@ Plugin (type = StreamingSource .PLUGIN_TYPE )
6738@ Name ("Kafka" )
6839@ Description ("Kafka streaming source." )
6940public class KafkaStreamingSource extends ReferenceStreamingSource <StructuredRecord > {
70- private static final Logger LOG = LoggerFactory .getLogger (KafkaStreamingSource .class );
7141 private final KafkaConfig conf ;
7242
7343 public KafkaStreamingSource (KafkaConfig conf ) {
@@ -78,8 +48,13 @@ public KafkaStreamingSource(KafkaConfig conf) {
7848 @ Override
7949 public void configurePipeline (PipelineConfigurer pipelineConfigurer ) throws IllegalArgumentException {
8050 super .configurePipeline (pipelineConfigurer );
81- conf .validate ();
82- pipelineConfigurer .getStageConfigurer ().setOutputSchema (conf .getSchema ());
51+ StageConfigurer stageConfigurer = pipelineConfigurer .getStageConfigurer ();
52+ FailureCollector collector = stageConfigurer .getFailureCollector ();
53+
54+ conf .validate (collector );
55+ Schema schema = conf .getSchema (collector );
56+ stageConfigurer .setOutputSchema (schema );
57+
8358 if (conf .getMaxRatePerPartition () != null && conf .getMaxRatePerPartition () > 0 ) {
8459 Map <String , String > pipelineProperties = new HashMap <>();
8560 pipelineProperties .put ("spark.streaming.kafka.maxRatePerPartition" , conf .getMaxRatePerPartition ().toString ());
@@ -89,209 +64,11 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws Ille
8964
9065 @ Override
9166 public JavaDStream <StructuredRecord > getStream (StreamingContext context ) throws Exception {
92- context .registerLineage (conf .referenceName );
93-
94- Map <String , Object > kafkaParams = new HashMap <>();
95- kafkaParams .put (ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG , conf .getBrokers ());
96- // Spark saves the offsets in checkpoints, no need for Kafka to save them
97- kafkaParams .put (ConsumerConfig .ENABLE_AUTO_COMMIT_CONFIG , "false" );
98- kafkaParams .put ("key.deserializer" , ByteArrayDeserializer .class .getCanonicalName ());
99- kafkaParams .put ("value.deserializer" , ByteArrayDeserializer .class .getCanonicalName ());
100- KafkaHelpers .setupKerberosLogin (kafkaParams , conf .getPrincipal (), conf .getKeytabLocation ());
101- // Create a unique string for the group.id using the pipeline name and the topic.
102- // group.id is a Kafka consumer property that uniquely identifies the group of
103- // consumer processes to which this consumer belongs.
104- kafkaParams .put ("group.id" , Joiner .on ("-" ).join (context .getPipelineName ().length (), conf .getTopic ().length (),
105- context .getPipelineName (), conf .getTopic ()));
106- kafkaParams .putAll (conf .getKafkaProperties ());
107-
108- Properties properties = new Properties ();
109- properties .putAll (kafkaParams );
110- // change the request timeout to fetch the metadata to be 15 seconds or 1 second greater than session time out ms,
111- // since this config has to be greater than the session time out, which is by default 10 seconds
112- // the KafkaConsumer at runtime should still use the default timeout 305 seconds or whataver the user provides in
113- // kafkaConf
114- int requestTimeout = 15 * 1000 ;
115- if (conf .getKafkaProperties ().containsKey (ConsumerConfig .SESSION_TIMEOUT_MS_CONFIG )) {
116- requestTimeout =
117- Math .max (requestTimeout ,
118- Integer .valueOf (conf .getKafkaProperties ().get (ConsumerConfig .SESSION_TIMEOUT_MS_CONFIG ) + 1000 ));
119- }
120- properties .put (ConsumerConfig .REQUEST_TIMEOUT_MS_CONFIG , requestTimeout );
121- try (Consumer <byte [], byte []> consumer = new KafkaConsumer <>(properties , new ByteArrayDeserializer (),
122- new ByteArrayDeserializer ())) {
123- Map <TopicPartition , Long > offsets = conf .getInitialPartitionOffsets (getPartitions (consumer ));
124- // KafkaUtils doesn't understand -1 and -2 as smallest offset and latest offset.
125- // so we have to replace them with the actual smallest and latest
126- List <TopicPartition > earliestOffsetRequest = new ArrayList <>();
127- List <TopicPartition > latestOffsetRequest = new ArrayList <>();
128- for (Map .Entry <TopicPartition , Long > entry : offsets .entrySet ()) {
129- TopicPartition topicAndPartition = entry .getKey ();
130- Long offset = entry .getValue ();
131- if (offset == OffsetRequest .EarliestTime ()) {
132- earliestOffsetRequest .add (topicAndPartition );
133- } else if (offset == OffsetRequest .LatestTime ()) {
134- latestOffsetRequest .add (topicAndPartition );
135- }
136- }
137-
138- Set <TopicPartition > allOffsetRequest =
139- Sets .newHashSet (Iterables .concat (earliestOffsetRequest , latestOffsetRequest ));
140- Map <TopicPartition , Long > offsetsFound = new HashMap <>();
141- offsetsFound .putAll (KafkaHelpers .getEarliestOffsets (consumer , earliestOffsetRequest ));
142- offsetsFound .putAll (KafkaHelpers .getLatestOffsets (consumer , latestOffsetRequest ));
143- for (TopicPartition topicAndPartition : allOffsetRequest ) {
144- offsets .put (topicAndPartition , offsetsFound .get (topicAndPartition ));
145- }
146-
147- Set <TopicPartition > missingOffsets = Sets .difference (allOffsetRequest , offsetsFound .keySet ());
148- if (!missingOffsets .isEmpty ()) {
149- throw new IllegalStateException (String .format (
150- "Could not find offsets for %s. Please check all brokers were included in the broker list." , missingOffsets ));
151- }
152- LOG .info ("Using initial offsets {}" , offsets );
153-
154- return KafkaUtils .createDirectStream (
155- context .getSparkStreamingContext (), LocationStrategies .PreferConsistent (),
156- ConsumerStrategies .<byte [], byte []>Subscribe (Collections .singleton (conf .getTopic ()), kafkaParams , offsets )
157- ).transform (new RecordTransform (conf ));
158- }
159- }
160-
161- private Set <Integer > getPartitions (Consumer <byte [], byte []> consumer ) {
162- Set <Integer > partitions = conf .getPartitions ();
163- if (!partitions .isEmpty ()) {
164- return partitions ;
165- }
166-
167- partitions = new HashSet <>();
168- for (PartitionInfo partitionInfo : consumer .partitionsFor (conf .getTopic ())) {
169- partitions .add (partitionInfo .partition ());
170- }
171- return partitions ;
172- }
173-
174- /**
175- * Applies the format function to each rdd.
176- */
177- private static class RecordTransform
178- implements Function2 <JavaRDD <ConsumerRecord <byte [], byte []>>, Time , JavaRDD <StructuredRecord >> {
179-
180- private final KafkaConfig conf ;
181-
182- RecordTransform (KafkaConfig conf ) {
183- this .conf = conf ;
184- }
185-
186- @ Override
187- public JavaRDD <StructuredRecord > call (JavaRDD <ConsumerRecord <byte [], byte []>> input , Time batchTime ) {
188- Function <ConsumerRecord <byte [], byte []>, StructuredRecord > recordFunction = conf .getFormat () == null ?
189- new BytesFunction (batchTime .milliseconds (), conf ) : new FormatFunction (batchTime .milliseconds (), conf );
190- return input .map (recordFunction );
191- }
192- }
193-
194- /**
195- * Common logic for transforming kafka key, message, partition, and offset into a structured record.
196- * Everything here should be serializable, as Spark Streaming will serialize all functions.
197- */
198- private abstract static class BaseFunction implements Function <ConsumerRecord <byte [], byte []>, StructuredRecord > {
199- private final long ts ;
200- protected final KafkaConfig conf ;
201- private transient String messageField ;
202- private transient String timeField ;
203- private transient String keyField ;
204- private transient String partitionField ;
205- private transient String offsetField ;
206- private transient Schema schema ;
207-
208- BaseFunction (long ts , KafkaConfig conf ) {
209- this .ts = ts ;
210- this .conf = conf ;
211- }
212-
213- @ Override
214- public StructuredRecord call (ConsumerRecord <byte [], byte []> in ) throws Exception {
215- // first time this was called, initialize schema and time, key, and message fields.
216- if (schema == null ) {
217- schema = conf .getSchema ();
218- timeField = conf .getTimeField ();
219- keyField = conf .getKeyField ();
220- partitionField = conf .getPartitionField ();
221- offsetField = conf .getOffsetField ();
222- for (Schema .Field field : schema .getFields ()) {
223- String name = field .getName ();
224- if (!name .equals (timeField ) && !name .equals (keyField )) {
225- messageField = name ;
226- break ;
227- }
228- }
229- }
230-
231- StructuredRecord .Builder builder = StructuredRecord .builder (schema );
232- if (timeField != null ) {
233- builder .set (timeField , ts );
234- }
235- if (keyField != null ) {
236- builder .set (keyField , in .key ());
237- }
238- if (partitionField != null ) {
239- builder .set (partitionField , in .partition ());
240- }
241- if (offsetField != null ) {
242- builder .set (offsetField , in .offset ());
243- }
244- addMessage (builder , messageField , in .value ());
245- return builder .build ();
246- }
247-
248- protected abstract void addMessage (StructuredRecord .Builder builder , String messageField ,
249- byte [] message ) throws Exception ;
250- }
251-
252- /**
253- * Transforms kafka key and message into a structured record when message format is not given.
254- * Everything here should be serializable, as Spark Streaming will serialize all functions.
255- */
256- private static class BytesFunction extends BaseFunction {
257-
258- BytesFunction (long ts , KafkaConfig conf ) {
259- super (ts , conf );
260- }
261-
262- @ Override
263- protected void addMessage (StructuredRecord .Builder builder , String messageField , byte [] message ) {
264- builder .set (messageField , message );
265- }
266- }
267-
268- /**
269- * Transforms kafka key and message into a structured record when message format and schema are given.
270- * Everything here should be serializable, as Spark Streaming will serialize all functions.
271- */
272- private static class FormatFunction extends BaseFunction {
273- private transient RecordFormat <ByteBuffer , StructuredRecord > recordFormat ;
67+ FailureCollector collector = context .getFailureCollector ();
68+ conf .getMessageSchema (collector );
69+ collector .getOrThrowException ();
27470
275- FormatFunction (long ts , KafkaConfig conf ) {
276- super (ts , conf );
277- }
278-
279- @ Override
280- protected void addMessage (StructuredRecord .Builder builder , String messageField , byte [] message ) throws Exception {
281- // first time this was called, initialize record format
282- if (recordFormat == null ) {
283- Schema messageSchema = conf .getMessageSchema ();
284- FormatSpecification spec =
285- new FormatSpecification (conf .getFormat (), messageSchema , new HashMap <>());
286- recordFormat = RecordFormats .createInitializedFormat (spec );
287- }
288-
289- StructuredRecord messageRecord = recordFormat .read (ByteBuffer .wrap (message ));
290- for (Schema .Field field : messageRecord .getSchema ().getFields ()) {
291- String fieldName = field .getName ();
292- builder .set (fieldName , messageRecord .get (fieldName ));
293- }
294- }
71+ context .registerLineage (conf .referenceName );
72+ return KafkaStreamingSourceUtil .getStructuredRecordJavaDStream (context , conf , collector );
29573 }
296-
29774}
0 commit comments