Skip to content

Conversation

@Shekharrajak
Copy link

@Shekharrajak Shekharrajak commented Sep 1, 2025

Ref https://issues.apache.org/jira/browse/FLINK-38287

This implementation adds Kafka 4.x share group semantics to Flink's Kafka connector while maintaining full backward compatibility with existing code. The code changes are following KIP-932 and FLIP-27 main source architecture and implicit mode acknowledgement.

This directly addresses use cases where:

  1. Multiple consumers need to process items efficiently in parallel from a single/multiple topic(s).
  2. Messages need explicit acknowledgment/release (to avoid reprocessing or allow retries).
    Use cases where scaling Flink ML/LLM workload is critical - Shifting Kafka coordination and assignment logic to the broker side would simplify today’s complex Flink source management, making consumption more efficient, scalable, and far less error-prone.
    Operational Benefits
  • Higher Throughput: ShareGroupHeartbeat helps in Queue-like workloads, maximum throughput scenarios. Share groups distribute messages at the record level, not partition level, so multiple readers can consume from the same topic with Kafka coordinating message distribution.
  • Better Availability and Flexible Scaling: consumers assignment logic is simpler in server side and rebalancing frequency is minimised.

Note:

Semantic Guarantee
AT-MOST-ONCE: Records acknowledged immediately after polling. Job failure after acknowledgment but
before checkpoint = permanent data loss.

Ref https://issues.apache.org/jira/browse/KAFKA-19883 implemented for no data loss in commit : ef83f90

Good for: Logs, monitoring, non-critical analyticsNot for: Financial transactions, critical data,
audit trails

@boring-cyborg
Copy link

boring-cyborg bot commented Sep 1, 2025

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<scope>provided</scope>
<scope>compile</scope>
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes can be reverted.

* <p>This test validates builder functionality, error handling, and property management
* for Kafka share group source construction.
*/
@DisplayName("KafkaShareGroupSourceBuilder Tests")
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Testcases improvements is required, I will check and update them accordingly.

<confluent.version>7.9.2</confluent.version>
<flink.version>2.0.0</flink.version>
<kafka.version>3.9.1</kafka.version>
<kafka.version>4.1.0</kafka.version>
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not yet released and expected to be available to download. Meantime the testing is done by adding this into class path.

* Manages batches of records from Kafka share consumer for checkpoint persistence.
* Controls when new batches can be fetched to work within share consumer's auto-commit constraints.
*/
public class ShareGroupBatchManager<K, V>
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ListCheckpointed will help to store the records which is polled but not yet processed in Flink persistent checkpoint state - this will make sure in case of any failure / crash we process the records that we read & ack.

@Shekharrajak Shekharrajak changed the title [WIP] Kafka 4.x Queue Semantics support in Flink Connector Kafka [WIP] Kafka 4.x Queue Semantics support Sep 1, 2025
@Shekharrajak
Copy link
Author

Using Flink SQL, we can have some validation :

CREATE TABLE kafka_share_source (
      message STRING
  ) WITH (
      'connector' = 'kafka-sharegroup',
      'bootstrap.servers' = 'localhost:9092',
      'share-group-id' = 'flink-sql-test-group',
      'topic' = 'test-topic',
      'format' = 'raw',
      'source.parallelism' = '4'  -- 4 subtasks regardless of partition count
  );
 
select * from kafka_share_source;

dafd0c9c-ec40-4af2-a929-ca27007076d8

<flink.version>2.1.0</flink.version>
<kafka.version>4.0.0</kafka.version>
<flink.version>2.0.0</flink.version>
<kafka.version>4.1.0</kafka.version>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR 190 is also upping the Kafka client level to 4.1. If we do it here then we should amend the NOTOCE as per pr 190. fyi @tomncooper

@jnh5y
Copy link

jnh5y commented Sep 16, 2025

As a high-level note, since share groups do not use transactions, there will be some possibility for reprocessing messages. Is that ok for your use cases?

Generally, do you have any performance numbers to show that this consumer is faster? (Of course, since transactions are not available, I could imagine it being a little bit faster anyhow...)

@Shekharrajak
Copy link
Author

Flink-SQL:

 //  {"order_id":1001,"customer_id":"C001","quantity":2,"price":25.50}
   CREATE TABLE orders_share_group (
      order_id BIGINT,
      customer_id STRING,
      quantity INT,
      price DECIMAL(10, 2)
  ) WITH (
      'connector' = 'kafka-sharegroup',
      'topic' = 'orders',
      'bootstrap.servers' = 'localhost:9092',
      'share-group-id' = 'flink-orders-sharegroup',
      'format' = 'json'

  );

  select * from orders_share_group;
image

@Shekharrajak Shekharrajak changed the title [WIP] Kafka 4.x Queue Semantics support Kafka 4.x Queue Semantics support Nov 7, 2025
Shekharrajak and others added 7 commits November 19, 2025 09:27
Note that this update will make the connector incompatible with Kafka
clusters running Kafka version 2.0 and older.

Signed-off-by: Thomas Cooper <code@tomcooper.dev>
KafkaEnumerator's state contains the TopicPartitions only but not the offsets, so it doesn't contain the full split state contrary to the design intent.

There are a couple of issues with that approach. It implicitly assumes that splits are fully assigned to readers before the first checkpoint. Else the enumerator will invoke the offset initializer again on recovery from such a checkpoint leading to inconsistencies (LATEST may be initialized during the first attempt for some partitions and initialized during second attempt for others).

Through addSplitBack callback, you may also get these scenarios later for BATCH which actually leads to duplicate rows (in case of EARLIEST or SPECIFIC-OFFSETS) or data loss (in case of LATEST). Finally, it's not possible to safely use KafkaSource as part of a HybridSource because the offset initializer cannot even be recreated on recovery.

All cases are solved by also retaining the offset in the enumerator state. To that end, this commit merges the async discovery phases to immediately initialize the splits from the partitions. Any subsequent checkpoint will contain the proper start offset.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants