Skip to content

Commit ded0c0b

Browse files
authored
Merge pull request #54 from data-integrations/feature/update-record-format
Update to use a RecordFormat that doesn’t involve StreamEvent
2 parents bee05d0 + 24939bc commit ded0c0b

File tree

4 files changed

+12
-14
lines changed

4 files changed

+12
-14
lines changed

kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaBatchSource.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import co.cask.cdap.api.data.format.StructuredRecord;
2727
import co.cask.cdap.api.data.schema.Schema;
2828
import co.cask.cdap.api.dataset.lib.KeyValue;
29-
import co.cask.cdap.api.flow.flowlet.StreamEvent;
3029
import co.cask.cdap.common.io.ByteBuffers;
3130
import co.cask.cdap.etl.api.Emitter;
3231
import co.cask.cdap.etl.api.PipelineConfigurer;
@@ -50,6 +49,7 @@
5049

5150
import java.io.IOException;
5251
import java.net.URI;
52+
import java.nio.ByteBuffer;
5353
import java.util.HashMap;
5454
import java.util.List;
5555
import java.util.Map;
@@ -68,7 +68,7 @@ public class KafkaBatchSource extends BatchSource<KafkaKey, KafkaMessage, Struct
6868
private final Kafka10BatchConfig config;
6969
private List<KafkaRequest> kafkaRequests;
7070
private Schema schema;
71-
private RecordFormat<StreamEvent, StructuredRecord> recordFormat;
71+
private RecordFormat<ByteBuffer, StructuredRecord> recordFormat;
7272
private String messageField;
7373
private FileContext fileContext;
7474
private Path offsetsFile;
@@ -231,7 +231,7 @@ public void transform(KeyValue<KafkaKey, KafkaMessage> input, Emitter<Structured
231231
if (config.getFormat() == null) {
232232
builder.set(messageField, ByteBuffers.getByteArray(input.getValue().getPayload()));
233233
} else {
234-
StructuredRecord messageRecord = recordFormat.read(new StreamEvent(input.getValue().getPayload()));
234+
StructuredRecord messageRecord = recordFormat.read(input.getValue().getPayload());
235235
for (Schema.Field field : messageRecord.getSchema().getFields()) {
236236
String fieldName = field.getName();
237237
builder.set(fieldName, messageRecord.get(fieldName));

kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/source/KafkaStreamingSource.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import co.cask.cdap.api.data.format.RecordFormat;
2424
import co.cask.cdap.api.data.format.StructuredRecord;
2525
import co.cask.cdap.api.data.schema.Schema;
26-
import co.cask.cdap.api.flow.flowlet.StreamEvent;
2726
import co.cask.cdap.etl.api.PipelineConfigurer;
2827
import co.cask.cdap.etl.api.streaming.StreamingContext;
2928
import co.cask.cdap.etl.api.streaming.StreamingSource;
@@ -271,7 +270,7 @@ protected void addMessage(StructuredRecord.Builder builder, String messageField,
271270
* Everything here should be serializable, as Spark Streaming will serialize all functions.
272271
*/
273272
private static class FormatFunction extends BaseFunction {
274-
private transient RecordFormat<StreamEvent, StructuredRecord> recordFormat;
273+
private transient RecordFormat<ByteBuffer, StructuredRecord> recordFormat;
275274

276275
FormatFunction(long ts, KafkaConfig conf) {
277276
super(ts, conf);
@@ -283,11 +282,11 @@ protected void addMessage(StructuredRecord.Builder builder, String messageField,
283282
if (recordFormat == null) {
284283
Schema messageSchema = conf.getMessageSchema();
285284
FormatSpecification spec =
286-
new FormatSpecification(conf.getFormat(), messageSchema, new HashMap<String, String>());
285+
new FormatSpecification(conf.getFormat(), messageSchema, new HashMap<>());
287286
recordFormat = RecordFormats.createInitializedFormat(spec);
288287
}
289288

290-
StructuredRecord messageRecord = recordFormat.read(new StreamEvent(ByteBuffer.wrap(message)));
289+
StructuredRecord messageRecord = recordFormat.read(ByteBuffer.wrap(message));
291290
for (Schema.Field field : messageRecord.getSchema().getFields()) {
292291
String fieldName = field.getName();
293292
builder.set(fieldName, messageRecord.get(fieldName));

kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaBatchSource.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import co.cask.cdap.api.data.format.StructuredRecord;
2626
import co.cask.cdap.api.data.schema.Schema;
2727
import co.cask.cdap.api.dataset.lib.KeyValue;
28-
import co.cask.cdap.api.flow.flowlet.StreamEvent;
2928
import co.cask.cdap.common.io.ByteBuffers;
3029
import co.cask.cdap.etl.api.Emitter;
3130
import co.cask.cdap.etl.api.PipelineConfigurer;
@@ -45,6 +44,7 @@
4544

4645
import java.io.IOException;
4746
import java.net.URI;
47+
import java.nio.ByteBuffer;
4848
import java.util.HashMap;
4949
import java.util.List;
5050
import java.util.stream.Collectors;
@@ -64,7 +64,7 @@ public class KafkaBatchSource extends BatchSource<KafkaKey, KafkaMessage, Struct
6464
private Path offsetsFile;
6565
private List<KafkaRequest> kafkaRequests;
6666
private Schema schema;
67-
private RecordFormat<StreamEvent, StructuredRecord> recordFormat;
67+
private RecordFormat<ByteBuffer, StructuredRecord> recordFormat;
6868
private String messageField;
6969

7070
public KafkaBatchSource(KafkaBatchConfig config) {
@@ -164,7 +164,7 @@ public void transform(KeyValue<KafkaKey, KafkaMessage> input, Emitter<Structured
164164
if (config.getFormat() == null) {
165165
builder.set(messageField, ByteBuffers.getByteArray(input.getValue().getPayload()));
166166
} else {
167-
StructuredRecord messageRecord = recordFormat.read(new StreamEvent(input.getValue().getPayload()));
167+
StructuredRecord messageRecord = recordFormat.read(input.getValue().getPayload());
168168
for (Schema.Field field : messageRecord.getSchema().getFields()) {
169169
String fieldName = field.getName();
170170
builder.set(fieldName, messageRecord.get(fieldName));

kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/source/KafkaStreamingSource.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import co.cask.cdap.api.data.format.RecordFormat;
2424
import co.cask.cdap.api.data.format.StructuredRecord;
2525
import co.cask.cdap.api.data.schema.Schema;
26-
import co.cask.cdap.api.flow.flowlet.StreamEvent;
2726
import co.cask.cdap.etl.api.PipelineConfigurer;
2827
import co.cask.cdap.etl.api.streaming.StreamingContext;
2928
import co.cask.cdap.etl.api.streaming.StreamingSource;
@@ -271,7 +270,7 @@ protected void addMessage(StructuredRecord.Builder builder, String messageField,
271270
* Everything here should be serializable, as Spark Streaming will serialize all functions.
272271
*/
273272
private static class FormatFunction extends BaseFunction {
274-
private transient RecordFormat<StreamEvent, StructuredRecord> recordFormat;
273+
private transient RecordFormat<ByteBuffer, StructuredRecord> recordFormat;
275274

276275
FormatFunction(long ts, KafkaConfig conf) {
277276
super(ts, conf);
@@ -283,11 +282,11 @@ protected void addMessage(StructuredRecord.Builder builder, String messageField,
283282
if (recordFormat == null) {
284283
Schema messageSchema = conf.getMessageSchema();
285284
FormatSpecification spec =
286-
new FormatSpecification(conf.getFormat(), messageSchema, new HashMap<String, String>());
285+
new FormatSpecification(conf.getFormat(), messageSchema, new HashMap<>());
287286
recordFormat = RecordFormats.createInitializedFormat(spec);
288287
}
289288

290-
StructuredRecord messageRecord = recordFormat.read(new StreamEvent(ByteBuffer.wrap(message)));
289+
StructuredRecord messageRecord = recordFormat.read(ByteBuffer.wrap(message));
291290
for (Schema.Field field : messageRecord.getSchema().getFields()) {
292291
String fieldName = field.getName();
293292
builder.set(fieldName, messageRecord.get(fieldName));

0 commit comments

Comments
 (0)