Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.

Commit 4e8f101

Browse files
authored
Merge pull request #491 from rangadi/splits
KafkaIO should return one split for each of partition.
2 parents 678fb7a + 668a3e2 commit 4e8f101

File tree

2 files changed

+40
-47
lines changed
  • contrib/kafka/src

2 files changed

+40
-47
lines changed

contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java

Lines changed: 33 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import com.google.cloud.dataflow.sdk.values.PInput;
4545
import com.google.common.annotations.VisibleForTesting;
4646
import com.google.common.base.Function;
47-
import com.google.common.base.Joiner;
4847
import com.google.common.base.Optional;
4948
import com.google.common.collect.ComparisonChain;
5049
import com.google.common.collect.ImmutableList;
@@ -714,24 +713,12 @@ public UnboundedKafkaSource(
714713
this.consumerConfig = consumerConfig;
715714
}
716715

717-
/**
718-
* The partitions are evenly distributed among the splits. The number of splits returned is
719-
* {@code min(desiredNumSplits, totalNumPartitions)}, though better not to depend on the exact
720-
* count.
721-
*
722-
* <p> It is important to assign the partitions deterministically so that we can support
723-
* resuming a split from last checkpoint. The Kafka partitions are sorted by {@code <topic,
724-
* partition>} and then assigned to splits in round-robin order.
725-
*/
726-
@Override
727-
public List<UnboundedKafkaSource<K, V>> generateInitialSplits(
728-
int desiredNumSplits, PipelineOptions options) throws Exception {
716+
private List<TopicPartition> fetchKafkaPartitions() {
729717

730718
List<TopicPartition> partitions = new ArrayList<>(assignedPartitions);
731719

732720
// (a) fetch partitions for each topic
733721
// (b) sort by <topic, partition>
734-
// (c) round-robin assign the partitions to splits
735722

736723
if (partitions.isEmpty()) {
737724
try (Consumer<?, ?> consumer = consumerFactoryFn.apply(consumerConfig)) {
@@ -754,37 +741,39 @@ public int compare(TopicPartition tp1, TopicPartition tp2) {
754741
}
755742
});
756743

757-
checkArgument(desiredNumSplits > 0);
758744
checkState(
759745
partitions.size() > 0,
760746
"Could not find any partitions. Please check Kafka configuration and topic names");
761747

762-
int numSplits = Math.min(desiredNumSplits, partitions.size());
763-
List<List<TopicPartition>> assignments = new ArrayList<>(numSplits);
748+
return partitions;
749+
}
764750

765-
for (int i = 0; i < numSplits; i++) {
766-
assignments.add(new ArrayList<TopicPartition>());
767-
}
768-
for (int i = 0; i < partitions.size(); i++) {
769-
assignments.get(i % numSplits).add(partitions.get(i));
770-
}
751+
/**
752+
* Returns one split for each of the Kafka partitions.
753+
*
754+
* <p> It is important to sort the partitions deterministically so that we can support
755+
* resuming a split from last checkpoint. The Kafka partitions are sorted by {@code <topic,
756+
* partition>}.
757+
*/
758+
@Override
759+
public List<UnboundedKafkaSource<K, V>> generateInitialSplits(
760+
int desiredNumSplits, PipelineOptions options) throws Exception {
771761

772-
List<UnboundedKafkaSource<K, V>> result = new ArrayList<>(numSplits);
762+
List<TopicPartition> partitions = fetchKafkaPartitions();
773763

774-
for (int i = 0; i < numSplits; i++) {
775-
List<TopicPartition> assignedToSplit = assignments.get(i);
764+
List<UnboundedKafkaSource<K, V>> result = new ArrayList<>(partitions.size());
765+
766+
// one split for each partition.
767+
for (int i = 0; i < partitions.size(); i++) {
768+
TopicPartition partition = partitions.get(i);
776769

777-
LOG.info(
778-
"Partitions assigned to split {} (total {}): {}",
779-
i,
780-
assignedToSplit.size(),
781-
Joiner.on(",").join(assignedToSplit));
770+
LOG.info("Partition assigned to split {} : {}", i, partition);
782771

783772
result.add(
784-
new UnboundedKafkaSource<K, V>(
773+
new UnboundedKafkaSource<>(
785774
i,
786775
this.topics,
787-
assignedToSplit,
776+
ImmutableList.of(partition),
788777
this.keyCoder,
789778
this.valueCoder,
790779
this.timestampFn,
@@ -804,7 +793,17 @@ public UnboundedKafkaReader<K, V> createReader(
804793
LOG.warn("Looks like generateSplits() is not called. Generate single split.");
805794
try {
806795
return new UnboundedKafkaReader<K, V>(
807-
generateInitialSplits(1, options).get(0), checkpointMark);
796+
new UnboundedKafkaSource<>(
797+
0,
798+
this.topics,
799+
fetchKafkaPartitions(),
800+
this.keyCoder,
801+
this.valueCoder,
802+
this.timestampFn,
803+
this.watermarkFn,
804+
this.consumerFactoryFn,
805+
this.consumerConfig),
806+
checkpointMark);
808807
} catch (Exception e) {
809808
throw new RuntimeException(e);
810809
}

contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/KafkaIOTest.java

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import com.google.cloud.dataflow.sdk.io.Read;
2727
import com.google.cloud.dataflow.sdk.io.UnboundedSource;
2828
import com.google.cloud.dataflow.sdk.io.UnboundedSource.UnboundedReader;
29-
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
3029
import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
3130
import com.google.cloud.dataflow.sdk.testing.TestPipeline;
3231
import com.google.cloud.dataflow.sdk.transforms.Count;
@@ -172,8 +171,8 @@ public Consumer<byte[], byte[]> apply(Map<String, Object> config) {
172171
}
173172

174173
/**
175-
* Creates a consumer with two topics, with 5 partitions each. numElements are (round-robin)
176-
* assigned all the 10 partitions.
174+
* Creates a consumer with two topics, with 10 partitions each. numElements are (round-robin)
175+
* assigned all the 20 partitions.
177176
*/
178177
private static KafkaIO.TypedRead<Integer, Long> mkKafkaReadTransform(
179178
int numElements, @Nullable SerializableFunction<KV<Integer, Long>, Instant> timestampFn) {
@@ -309,16 +308,13 @@ public void processElement(ProcessContext ctx) throws Exception {
309308
public void testUnboundedSourceSplits() throws Exception {
310309
Pipeline p = TestPipeline.create();
311310
int numElements = 1000;
312-
int numSplits = 10;
311+
int numSplits = 20;
313312

314313
UnboundedSource<KafkaRecord<Integer, Long>, ?> initial =
315314
mkKafkaReadTransform(numElements, null).makeSource();
316-
List<
317-
? extends
318-
UnboundedSource<
319-
com.google.cloud.dataflow.contrib.kafka.KafkaRecord<Integer, Long>, ?>>
320-
splits = initial.generateInitialSplits(numSplits, p.getOptions());
321-
assertEquals("Expected exact splitting", numSplits, splits.size());
315+
List<? extends UnboundedSource<KafkaRecord<Integer, Long>, ?>> splits =
316+
initial.generateInitialSplits(1, p.getOptions());
317+
assertEquals("KafkaIO should ignore desiredNumSplits", numSplits, splits.size());
322318

323319
long elementsPerSplit = numElements / numSplits;
324320
assertEquals("Expected even splits", numElements, elementsPerSplit * numSplits);
@@ -368,9 +364,7 @@ public void testUnboundedSourceCheckpointMark() throws Exception {
368364
com.google.cloud.dataflow.contrib.kafka.KafkaCheckpointMark>
369365
source =
370366
mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
371-
.makeSource()
372-
.generateInitialSplits(1, PipelineOptionsFactory.fromArgs(new String[0]).create())
373-
.get(0);
367+
.makeSource();
374368

375369
UnboundedReader<com.google.cloud.dataflow.contrib.kafka.KafkaRecord<Integer, Long>> reader =
376370
source.createReader(null, null);

0 commit comments

Comments
 (0)