-
Notifications
You must be signed in to change notification settings - Fork 172
Kafka 4.x Queue Semantics support #189
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html) |
flink-connector-kafka/pom.xml
Outdated
| <groupId>org.apache.flink</groupId> | ||
| <artifactId>flink-streaming-java</artifactId> | ||
| <scope>provided</scope> | ||
| <scope>compile</scope> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes can be reverted.
| * <p>This test validates builder functionality, error handling, and property management | ||
| * for Kafka share group source construction. | ||
| */ | ||
| @DisplayName("KafkaShareGroupSourceBuilder Tests") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Testcases improvements is required, I will check and update them accordingly.
| <confluent.version>7.9.2</confluent.version> | ||
| <flink.version>2.0.0</flink.version> | ||
| <kafka.version>3.9.1</kafka.version> | ||
| <kafka.version>4.1.0</kafka.version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not yet released and expected to be available to download. Meantime the testing is done by adding this into class path.
| * Manages batches of records from Kafka share consumer for checkpoint persistence. | ||
| * Controls when new batches can be fetched to work within share consumer's auto-commit constraints. | ||
| */ | ||
| public class ShareGroupBatchManager<K, V> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ListCheckpointed will help to store the records which is polled but not yet processed in Flink persistent checkpoint state - this will make sure in case of any failure / crash we process the records that we read & ack.
| <flink.version>2.1.0</flink.version> | ||
| <kafka.version>4.0.0</kafka.version> | ||
| <flink.version>2.0.0</flink.version> | ||
| <kafka.version>4.1.0</kafka.version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR 190 is also upping the Kafka client level to 4.1. If we do it here then we should amend the NOTOCE as per pr 190. fyi @tomncooper
|
As a high-level note, since share groups do not use transactions, there will be some possibility for reprocessing messages. Is that ok for your use cases? Generally, do you have any performance numbers to show that this consumer is faster? (Of course, since transactions are not available, I could imagine it being a little bit faster anyhow...) |
997b91e to
2cdbb46
Compare
7c5d908 to
ef83f90
Compare
Note that this update will make the connector incompatible with Kafka clusters running Kafka version 2.0 and older. Signed-off-by: Thomas Cooper <code@tomcooper.dev>
KafkaEnumerator's state contains the TopicPartitions only but not the offsets, so it doesn't contain the full split state contrary to the design intent. There are a couple of issues with that approach. It implicitly assumes that splits are fully assigned to readers before the first checkpoint. Else the enumerator will invoke the offset initializer again on recovery from such a checkpoint leading to inconsistencies (LATEST may be initialized during the first attempt for some partitions and initialized during second attempt for others). Through addSplitBack callback, you may also get these scenarios later for BATCH which actually leads to duplicate rows (in case of EARLIEST or SPECIFIC-OFFSETS) or data loss (in case of LATEST). Finally, it's not possible to safely use KafkaSource as part of a HybridSource because the offset initializer cannot even be recreated on recovery. All cases are solved by also retaining the offset in the enumerator state. To that end, this commit merges the async discovery phases to immediately initialize the splits from the partitions. Any subsequent checkpoint will contain the proper start offset.
c381123 to
e25cc95
Compare


Ref https://issues.apache.org/jira/browse/FLINK-38287
This implementation adds Kafka 4.x share group semantics to Flink's Kafka connector while maintaining full backward compatibility with existing code. The code changes are following KIP-932 and FLIP-27 main source architecture and implicit mode acknowledgement.
This directly addresses use cases where:
Use cases where scaling Flink ML/LLM workload is critical - Shifting Kafka coordination and assignment logic to the broker side would simplify today’s complex Flink source management, making consumption more efficient, scalable, and far less error-prone.
Operational Benefits
Note:
Semantic Guarantee
AT-MOST-ONCE: Records acknowledged immediately after polling. Job failure after acknowledgment but
before checkpoint = permanent data loss.
Ref https://issues.apache.org/jira/browse/KAFKA-19883 implemented for no data loss in commit : ef83f90
Good for: Logs, monitoring, non-critical analyticsNot for: Financial transactions, critical data,
audit trails