Skip to content

Commit cb5c5c0

Browse files
committed
[FLINK-38453] Add full splits to KafkaSourceEnumState
KafkaEnumerator's state contains the TopicPartitions only but not the offsets, so it doesn't contain the full split state contrary to the design intent. There are a couple of issues with that approach. It implicitly assumes that splits are fully assigned to readers before the first checkpoint. Else the enumerator will invoke the offset initializer again on recovery from such a checkpoint leading to inconsistencies (LATEST may be initialized during the first attempt for some partitions and initialized during second attempt for others). Through addSplitBack callback, you may also get these scenarios later for BATCH which actually leads to duplicate rows (in case of EARLIEST or SPECIFIC-OFFSETS) or data loss (in case of LATEST). Finally, it's not possible to safely use KafkaSource as part of a HybridSource because the offset initializer cannot even be recreated on recovery. All cases are solved by also retaining the offset in the enumerator state. To that end, this commit merges the async discovery phases to immediately initialize the splits from the partitions. Any subsequent checkpoint will contain the proper start offset.
1 parent d39c079 commit cb5c5c0

File tree

12 files changed

+618
-308
lines changed

12 files changed

+618
-308
lines changed

flink-connector-kafka/archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@ Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourc
2323
Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.syncAvailabilityHelperWithReaders()> calls method <org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.anyOf(int, java.util.concurrent.CompletableFuture)> in (DynamicKafkaSourceReader.java:500)
2424
Method <org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter.getProducerPool()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (ExactlyOnceKafkaWriter.java:0)
2525
Method <org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter.getTransactionalIdPrefix()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (ExactlyOnceKafkaWriter.java:0)
26-
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getCoLocationGroupKey()> in (KafkaSink.java:178)
27-
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getInputs()> in (KafkaSink.java:181)
28-
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getOutputType()> in (KafkaSink.java:177)
29-
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.setCoLocationGroupKey(java.lang.String)> in (KafkaSink.java:180)
30-
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> checks instanceof <org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo> in (KafkaSink.java:177)
26+
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getCoLocationGroupKey()> in (KafkaSink.java:183)
27+
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getInputs()> in (KafkaSink.java:186)
28+
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getOutputType()> in (KafkaSink.java:182)
29+
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.setCoLocationGroupKey(java.lang.String)> in (KafkaSink.java:185)
30+
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> checks instanceof <org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo> in (KafkaSink.java:182)
3131
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> has generic parameter type <org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<org.apache.flink.connector.kafka.sink.KafkaCommittable>>> with type argument depending on <org.apache.flink.streaming.api.connector.sink2.CommittableMessage> in (KafkaSink.java:0)
3232
Method <org.apache.flink.connector.kafka.sink.KafkaSink.getKafkaProducerConfig()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSink.java:0)
3333
Method <org.apache.flink.connector.kafka.sink.KafkaSinkBuilder.setRecordSerializer(org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema)> calls method <org.apache.flink.api.java.ClosureCleaner.clean(java.lang.Object, org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel, boolean)> in (KafkaSinkBuilder.java:154)
@@ -39,9 +39,12 @@ Method <org.apache.flink.connector.kafka.source.KafkaSource.createReader(org.apa
3939
Method <org.apache.flink.connector.kafka.source.KafkaSource.getConfiguration()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSource.java:0)
4040
Method <org.apache.flink.connector.kafka.source.KafkaSource.getKafkaSubscriber()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSource.java:0)
4141
Method <org.apache.flink.connector.kafka.source.KafkaSource.getStoppingOffsetsInitializer()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSource.java:0)
42-
Method <org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer.serializeTopicPartitions(java.util.Collection)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSourceEnumStateSerializer.java:0)
42+
Method <org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer.serializeV1(java.util.Collection)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSourceEnumStateSerializer.java:0)
43+
Method <org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer.serializeV2(java.util.Collection, boolean)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSourceEnumStateSerializer.java:0)
44+
Method <org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer.serializeV3(org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSourceEnumStateSerializer.java:0)
4345
Method <org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.deepCopyProperties(java.util.Properties, java.util.Properties)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSourceEnumerator.java:0)
4446
Method <org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getPartitionChange(java.util.Set)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSourceEnumerator.java:0)
47+
Method <org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getPendingPartitionSplitAssignment()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSourceEnumerator.java:0)
4548
Method <org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getSplitOwner(org.apache.kafka.common.TopicPartition, int)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSourceEnumerator.java:0)
4649
Method <org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.consumer()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaPartitionSplitReader.java:0)
4750
Method <org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.setConsumerClientRack(java.util.Properties, java.lang.String)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaPartitionSplitReader.java:0)

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,13 @@
3535
import org.apache.flink.connector.kafka.source.KafkaPropertiesUtil;
3636
import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState;
3737
import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator;
38-
import org.apache.flink.connector.kafka.source.enumerator.TopicPartitionAndAssignmentStatus;
38+
import org.apache.flink.connector.kafka.source.enumerator.SplitAndAssignmentStatus;
3939
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
4040
import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
4141
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
4242
import org.apache.flink.util.Preconditions;
4343

4444
import org.apache.kafka.common.KafkaException;
45-
import org.apache.kafka.common.TopicPartition;
4645
import org.slf4j.Logger;
4746
import org.slf4j.LoggerFactory;
4847

@@ -174,8 +173,8 @@ public DynamicKafkaSourceEnumerator(
174173
dynamicKafkaSourceEnumState.getClusterEnumeratorStates().entrySet()) {
175174
this.latestClusterTopicsMap.put(
176175
clusterEnumState.getKey(),
177-
clusterEnumState.getValue().assignedPartitions().stream()
178-
.map(TopicPartition::topic)
176+
clusterEnumState.getValue().assignedSplits().stream()
177+
.map(KafkaPartitionSplit::getTopic)
179178
.collect(Collectors.toSet()));
180179

181180
createEnumeratorWithAssignedTopicPartitions(
@@ -291,9 +290,9 @@ private void onHandleSubscribedStreamsFetch(Set<KafkaStream> fetchedKafkaStreams
291290
final Set<String> activeTopics = activeClusterTopics.getValue();
292291

293292
// filter out removed topics
294-
Set<TopicPartitionAndAssignmentStatus> partitions =
295-
kafkaSourceEnumState.partitions().stream()
296-
.filter(tp -> activeTopics.contains(tp.topicPartition().topic()))
293+
Set<SplitAndAssignmentStatus> partitions =
294+
kafkaSourceEnumState.splits().stream()
295+
.filter(tp -> activeTopics.contains(tp.split().getTopic()))
297296
.collect(Collectors.toSet());
298297

299298
newKafkaSourceEnumState =

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/AssignmentStatus.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,8 @@ public enum AssignmentStatus {
2626

2727
/** Partitions that have been assigned to readers. */
2828
ASSIGNED(0),
29-
/**
30-
* The partitions that have been discovered during initialization but not assigned to readers
31-
* yet.
32-
*/
33-
UNASSIGNED_INITIAL(1);
29+
/** The partitions that have been discovered but not assigned to readers yet. */
30+
UNASSIGNED(1);
3431
private final int statusCode;
3532

3633
AssignmentStatus(int statusCode) {

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumState.java

Lines changed: 27 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -19,76 +19,73 @@
1919
package org.apache.flink.connector.kafka.source.enumerator;
2020

2121
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
2223

23-
import org.apache.kafka.common.TopicPartition;
24-
24+
import java.util.Collection;
2525
import java.util.HashSet;
2626
import java.util.Set;
2727
import java.util.stream.Collectors;
2828

2929
/** The state of Kafka source enumerator. */
3030
@Internal
3131
public class KafkaSourceEnumState {
32-
/** Partitions with status: ASSIGNED or UNASSIGNED_INITIAL. */
33-
private final Set<TopicPartitionAndAssignmentStatus> partitions;
32+
/** Splits with status: ASSIGNED or UNASSIGNED_INITIAL. */
33+
private final Set<SplitAndAssignmentStatus> splits;
3434
/**
3535
* this flag will be marked as true if initial partitions are discovered after enumerator
3636
* starts.
3737
*/
3838
private final boolean initialDiscoveryFinished;
3939

4040
public KafkaSourceEnumState(
41-
Set<TopicPartitionAndAssignmentStatus> partitions, boolean initialDiscoveryFinished) {
42-
this.partitions = partitions;
41+
Set<SplitAndAssignmentStatus> splits, boolean initialDiscoveryFinished) {
42+
this.splits = splits;
4343
this.initialDiscoveryFinished = initialDiscoveryFinished;
4444
}
4545

4646
public KafkaSourceEnumState(
47-
Set<TopicPartition> assignPartitions,
48-
Set<TopicPartition> unassignedInitialPartitions,
47+
Collection<KafkaPartitionSplit> assignedSplits,
48+
Collection<KafkaPartitionSplit> unassignedSplits,
4949
boolean initialDiscoveryFinished) {
50-
this.partitions = new HashSet<>();
51-
partitions.addAll(
52-
assignPartitions.stream()
50+
this.splits = new HashSet<>();
51+
splits.addAll(
52+
assignedSplits.stream()
5353
.map(
5454
topicPartition ->
55-
new TopicPartitionAndAssignmentStatus(
55+
new SplitAndAssignmentStatus(
5656
topicPartition, AssignmentStatus.ASSIGNED))
5757
.collect(Collectors.toSet()));
58-
partitions.addAll(
59-
unassignedInitialPartitions.stream()
58+
splits.addAll(
59+
unassignedSplits.stream()
6060
.map(
6161
topicPartition ->
62-
new TopicPartitionAndAssignmentStatus(
63-
topicPartition,
64-
AssignmentStatus.UNASSIGNED_INITIAL))
62+
new SplitAndAssignmentStatus(
63+
topicPartition, AssignmentStatus.UNASSIGNED))
6564
.collect(Collectors.toSet()));
6665
this.initialDiscoveryFinished = initialDiscoveryFinished;
6766
}
6867

69-
public Set<TopicPartitionAndAssignmentStatus> partitions() {
70-
return partitions;
68+
public Set<SplitAndAssignmentStatus> splits() {
69+
return splits;
7170
}
7271

73-
public Set<TopicPartition> assignedPartitions() {
74-
return filterPartitionsByAssignmentStatus(AssignmentStatus.ASSIGNED);
72+
public Collection<KafkaPartitionSplit> assignedSplits() {
73+
return filterByAssignmentStatus(AssignmentStatus.ASSIGNED);
7574
}
7675

77-
public Set<TopicPartition> unassignedInitialPartitions() {
78-
return filterPartitionsByAssignmentStatus(AssignmentStatus.UNASSIGNED_INITIAL);
76+
public Collection<KafkaPartitionSplit> unassignedSplits() {
77+
return filterByAssignmentStatus(AssignmentStatus.UNASSIGNED);
7978
}
8079

8180
public boolean initialDiscoveryFinished() {
8281
return initialDiscoveryFinished;
8382
}
8483

85-
private Set<TopicPartition> filterPartitionsByAssignmentStatus(
84+
private Collection<KafkaPartitionSplit> filterByAssignmentStatus(
8685
AssignmentStatus assignmentStatus) {
87-
return partitions.stream()
88-
.filter(
89-
partitionWithStatus ->
90-
partitionWithStatus.assignmentStatus().equals(assignmentStatus))
91-
.map(TopicPartitionAndAssignmentStatus::topicPartition)
92-
.collect(Collectors.toSet());
86+
return splits.stream()
87+
.filter(split -> split.assignmentStatus().equals(assignmentStatus))
88+
.map(SplitAndAssignmentStatus::split)
89+
.collect(Collectors.toList());
9390
}
9491
}

0 commit comments

Comments
 (0)