Skip to content

Commit 9e3d4eb

Browse files
authored
Merge pull request #57 from data-integrations/bugfix/CDAP-15105-do-not-create-external-resources-kafka
CDAP-15105 do not create directory for preview
2 parents f2a1cdd + 52a1e81 commit 9e3d4eb

File tree

4 files changed

+10
-4
lines changed

4 files changed

+10
-4
lines changed

kafka-plugins-0.10/docs/KAFKABATCHSOURCE.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ Plugin Configuration
2121
| :------------ | :------: | :----- | :---------- |
2222
| **Kafka Brokers** | **Y** | N/A | List of Kafka brokers specified in host1:port1,host2:port2 form. |
2323
| **Kafka Topic** | **Y** | N/A | The Kafka topic to read from. |
24-
| **Offset Table Name** | **Y** | N/A | Optional table name to track the latest offset we read from kafka. It is recommended to name it same as the pipeline name to avoid conflict on table names. By default it will be the topic name.
24+
| **Offset Directory** | **Y** | N/A | A directory path to store the latest Kafka offsets. A file named with the pipeline name will be created under the given directory.
2525
| **Topic Partition** | **N** | N/A | List of topic partitions to read from. If not specified, all partitions will be read. |
2626
| **Initial Partition Offsets** | **N** | N/A | The initial offset for each topic partition. If this is not specified, earliest offset will be used. This offset will only be used for the first run of the pipeline. Any subsequent run will read from the latest offset from previous run. Offsets are inclusive. If an offset of 5 is used, the message at offset 5 will be read. |
2727
| **Key Field** | **N** | N/A | Optional name of the field containing the message key. If this is not set, no key field will be added to output records. If set, this field must be present in the schema property and must be bytes. |

kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaBatchSource.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ public void prepareRun(BatchSourceContext context) throws Exception {
146146
KafkaPartitionOffsets partitionOffsets = config.getInitialPartitionOffsets();
147147

148148
// If the offset directory is provided, try to load the file
149-
if (config.getOffsetDir() != null) {
149+
if (!context.isPreviewEnabled() && config.getOffsetDir() != null) {
150150
Path offsetDir = new Path(URI.create(config.getOffsetDir()));
151151
fileContext = FileContext.getFileContext(offsetDir.toUri(), conf);
152152
try {
@@ -184,6 +184,9 @@ public void prepareRun(BatchSourceContext context) throws Exception {
184184

185185
@Override
186186
public void onRunFinish(boolean succeeded, BatchSourceContext context) {
187+
if (context.isPreviewEnabled()) {
188+
return;
189+
}
187190
if (succeeded && kafkaRequests != null && fileContext != null && offsetsFile != null) {
188191
KafkaPartitionOffsets partitionOffsets = new KafkaPartitionOffsets(
189192
kafkaRequests.stream().collect(Collectors.toMap(KafkaRequest::getPartition, KafkaRequest::getEndOffset)));

kafka-plugins-0.8/docs/KAFKABATCHSOURCE.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ Plugin Configuration
2121
| :------------ | :------: | :----- | :---------- |
2222
| **Kafka Brokers** | **Y** | N/A | List of Kafka brokers specified in host1:port1,host2:port2 form. |
2323
| **Kafka Topic** | **Y** | N/A | The Kafka topic to read from. |
24-
| **Offset Table Name** | **Y** | N/A | Optional table name to track the latest offset we read from kafka. It is recommended to name it same as the pipeline name to avoid conflict on table names. By default it will be the topic name.
24+
| **Offset Directory** | **Y** | N/A | A directory path to store the latest Kafka offsets. A file named with the pipeline name will be created under the given directory.
2525
| **Topic Partition** | **N** | N/A | List of topic partitions to read from. If not specified, all partitions will be read. |
2626
| **Initial Partition Offsets** | **N** | N/A | The initial offset for each topic partition. If this is not specified, earliest offset will be used. This offset will only be used for the first run of the pipeline. Any subsequent run will read from the latest offset from previous run. Offsets are inclusive. If an offset of 5 is used, the message at offset 5 will be read. |
2727
| **Key Field** | **N** | N/A | Optional name of the field containing the message key. If this is not set, no key field will be added to output records. If set, this field must be present in the schema property and must be bytes. |

kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaBatchSource.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public void prepareRun(BatchSourceContext context) throws Exception {
8585
KafkaPartitionOffsets partitionOffsets = config.getInitialPartitionOffsets();
8686

8787
// If the offset directory is provided, try to load the file
88-
if (config.getOffsetDir() != null) {
88+
if (!context.isPreviewEnabled() && config.getOffsetDir() != null) {
8989
Path offsetDir = new Path(URI.create(config.getOffsetDir()));
9090
fileContext = FileContext.getFileContext(offsetDir.toUri(), conf);
9191
try {
@@ -117,6 +117,9 @@ public void prepareRun(BatchSourceContext context) throws Exception {
117117

118118
@Override
119119
public void onRunFinish(boolean succeeded, BatchSourceContext context) {
120+
if (context.isPreviewEnabled()) {
121+
return;
122+
}
120123
if (succeeded && kafkaRequests != null && fileContext != null && offsetsFile != null) {
121124
KafkaPartitionOffsets partitionOffsets = new KafkaPartitionOffsets(
122125
kafkaRequests.stream().collect(Collectors.toMap(KafkaRequest::getPartition, KafkaRequest::getEndOffset)));

0 commit comments

Comments
 (0)