2929import org .apache .flink .connector .kafka .source .reader .acknowledgment .RecordMetadata ;
3030import org .apache .flink .connector .kafka .source .reader .deserializer .KafkaRecordDeserializationSchema ;
3131import org .apache .flink .connector .kafka .source .reader .fetcher .KafkaShareGroupFetcherManager ;
32+ import org .apache .flink .connector .kafka .source .reader .transaction .FlinkTransactionManager ;
3233import org .apache .flink .connector .kafka .source .split .ShareGroupSubscriptionState ;
3334
3435import org .apache .kafka .clients .consumer .ConsumerRecord ;
@@ -162,6 +163,9 @@ public class KafkaShareGroupSourceReader<T>
162163 */
163164 private final AcknowledgmentBuffer acknowledgmentBuffer ;
164165
166+ /** Transaction manager for 2PC acknowledgments (Phase 1: prepare, Phase 2: commit) */
167+ private final FlinkTransactionManager transactionManager ;
168+
165169 /**
166170 * Reference to the Kafka 4.1 ShareConsumer for acknowledgment operations. Obtained from the
167171 * fetcher manager.
@@ -230,8 +234,14 @@ record -> {
230234 this .shareGroupId = consumerProps .getProperty ("group.id" , "unknown-share-group" );
231235 this .shareGroupMetrics = shareGroupMetrics ;
232236
237+ // Initialize transaction manager for 2PC
238+ this .transactionManager = new FlinkTransactionManager (
239+ this .shareGroupId ,
240+ null // ShareConsumer will be set after fetcher manager starts
241+ );
242+
233243 LOG .info (
234- "Created KafkaShareGroupSourceReader for share group '{}' on subtask {} with CheckpointListener pattern " ,
244+ "Created KafkaShareGroupSourceReader for share group '{}' on subtask {} with transactional 2PC " ,
235245 shareGroupId ,
236246 context .getIndexOfSubtask ());
237247 }
@@ -286,6 +296,19 @@ public void registerReleaseHookIfAbsent(
286296
287297 // Call parent start
288298 super .start ();
299+
300+ // Set share consumer reference in transaction manager after fetcher starts
301+ ShareConsumer <byte [], byte []> consumer = getShareConsumer ();
302+ if (consumer != null ) {
303+ transactionManager .setShareConsumer (consumer );
304+ LOG .info (
305+ "Share group '{}': Transaction manager initialized with ShareConsumer" ,
306+ shareGroupId );
307+ } else {
308+ LOG .warn (
309+ "Share group '{}': ShareConsumer not available yet - will retry on first checkpoint" ,
310+ shareGroupId );
311+ }
289312 }
290313
291314 // ===========================================================================================
@@ -332,6 +355,39 @@ public List<ShareGroupSubscriptionState> snapshotState(long checkpointId) {
332355 // Update current checkpoint ID for record association
333356 currentCheckpointId .set (checkpointId );
334357
358+ // Ensure share consumer is set in transaction manager
359+ ShareConsumer <byte [], byte []> consumer = getShareConsumer ();
360+ if (consumer != null && transactionManager != null ) {
361+ transactionManager .setShareConsumer (consumer );
362+ }
363+
364+ // Get records for this checkpoint (checkpoint subsuming)
365+ Set <RecordMetadata > recordsToAck = acknowledgmentBuffer .getRecordsUpTo (checkpointId );
366+
367+ // Phase 1 of 2PC: Prepare acknowledgments
368+ if (!recordsToAck .isEmpty ()) {
369+ try {
370+ transactionManager .prepareAcknowledgments (checkpointId , recordsToAck );
371+ LOG .info (
372+ "Share group '{}': CHECKPOINT {} PREPARED - {} records marked for acknowledgment" ,
373+ shareGroupId ,
374+ checkpointId ,
375+ recordsToAck .size ());
376+ } catch (Exception e ) {
377+ LOG .error (
378+ "Share group '{}': CHECKPOINT {} PREPARE FAILED - transaction will be aborted" ,
379+ shareGroupId ,
380+ checkpointId ,
381+ e );
382+ throw new RuntimeException ("Failed to prepare checkpoint " + checkpointId , e );
383+ }
384+ } else {
385+ LOG .debug (
386+ "Share group '{}': CHECKPOINT {} SNAPSHOT - No records to prepare" ,
387+ shareGroupId ,
388+ checkpointId );
389+ }
390+
335391 // Get the current subscription state from parent
336392 List <ShareGroupSubscriptionState > states = super .snapshotState (checkpointId );
337393
@@ -352,20 +408,12 @@ public List<ShareGroupSubscriptionState> snapshotState(long checkpointId) {
352408 /**
353409 * Callback when a checkpoint completes successfully.
354410 *
355- * <p>This method tracks checkpoint completion for monitoring purposes. Note that actual record
356- * acknowledgments happen immediately in the SplitReader after polling to satisfy ShareConsumer
357- * requirements (records must be acknowledged before next poll).
358- *
359- * <p>This callback is used for:
360- *
361- * <ol>
362- * <li>Logging checkpoint statistics
363- * <li>Cleaning up acknowledged record metadata from buffer
364- * <li>Updating metrics
365- * </ol>
411+ * Phase 2 of 2PC: Commit transaction.
412+ * The broker applies acknowledgments atomically when checkpoint completes.
413+ * This ensures no data loss - records remain locked until checkpoint succeeds.
366414 *
367415 * @param checkpointId the ID of the checkpoint that completed
368- * @throws Exception if cleanup fails
416+ * @throws Exception if commit fails
369417 */
370418 @ Override
371419 public void notifyCheckpointComplete (long checkpointId ) throws Exception {
@@ -384,14 +432,15 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
384432 }
385433
386434 LOG .info (
387- "Share group '{}': CHECKPOINT {} COMPLETE - Processed {} records (already acknowledged in SplitReader) " ,
435+ "Share group '{}': CHECKPOINT {} COMPLETE - Committing transaction for {} records" ,
388436 shareGroupId ,
389437 checkpointId ,
390438 processedRecords .size ());
391439
392440 try {
393- // Records are already acknowledged in SplitReader immediately after polling
394- // Here we just update metrics and clean up the buffer
441+ // Phase 2 of 2PC: Commit transaction
442+ // Broker applies prepared acknowledgments atomically
443+ transactionManager .commitTransaction (checkpointId );
395444
396445 // Update metrics
397446 final long duration = System .currentTimeMillis () - startTime ;
@@ -407,15 +456,16 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
407456 int removedCount = acknowledgmentBuffer .removeUpTo (checkpointId );
408457
409458 LOG .info (
410- "Share group '{}': CHECKPOINT {} SUCCESS - Cleaned up {} record metadata entries in {}ms" ,
459+ "Share group '{}': CHECKPOINT {} SUCCESS - Committed {} records, cleaned up {} metadata entries in {}ms" ,
411460 shareGroupId ,
412461 checkpointId ,
462+ processedRecords .size (),
413463 removedCount ,
414464 duration );
415465
416466 } catch (Exception e ) {
417467 LOG .error (
418- "Share group '{}': CHECKPOINT {} FAILED - Error during cleanup " ,
468+ "Share group '{}': CHECKPOINT {} COMMIT FAILED " ,
419469 shareGroupId ,
420470 checkpointId ,
421471 e );
@@ -432,28 +482,50 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
432482 /**
433483 * Callback when a checkpoint is aborted.
434484 *
435- * <p>For share groups, when a checkpoint is aborted, we should release the records back to the
436- * share group coordinator so they can be redelivered. However, following the Checkpoint
437- * Subsuming Contract, we don't actually discard anything - the next successful checkpoint will
438- * cover a longer time span.
439- *
440- * <p>We use RELEASE acknowledgment type to indicate we didn't process these records and they
441- * should be made available to other consumers.
485+ * Abort transaction and release records back to share group for redelivery.
486+ * Following checkpoint subsuming pattern - next successful checkpoint will handle these records.
442487 *
443488 * @param checkpointId the ID of the checkpoint that was aborted
444- * @throws Exception if release operation fails
489+ * @throws Exception if abort operation fails
445490 */
446491 @ Override
447492 public void notifyCheckpointAborted (long checkpointId ) throws Exception {
448- LOG .info (
449- "Share group '{}': CHECKPOINT {} ABORTED - Records will be subsumed by next successful checkpoint" ,
450- shareGroupId ,
451- checkpointId );
493+ // Get records for this checkpoint
494+ Set <RecordMetadata > recordsToRelease = acknowledgmentBuffer .getRecordsUpTo (checkpointId );
495+
496+ if (!recordsToRelease .isEmpty ()) {
497+ LOG .info (
498+ "Share group '{}': CHECKPOINT {} ABORTED - Releasing {} records for redelivery" ,
499+ shareGroupId ,
500+ checkpointId ,
501+ recordsToRelease .size ());
502+
503+ try {
504+ // Abort transaction - releases record locks for redelivery
505+ transactionManager .abortTransaction (checkpointId , recordsToRelease );
452506
453- // Following the Checkpoint Subsuming Contract: we don't discard anything
454- // The next successful checkpoint will handle these records
455- // We could optionally release records for earlier redelivery, but it's not required
507+ LOG .info (
508+ "Share group '{}': CHECKPOINT {} ABORTED - Released {} records" ,
509+ shareGroupId ,
510+ checkpointId ,
511+ recordsToRelease .size ());
512+
513+ } catch (Exception e ) {
514+ LOG .warn (
515+ "Share group '{}': Failed to abort checkpoint {} - records will timeout and be redelivered" ,
516+ shareGroupId ,
517+ checkpointId ,
518+ e );
519+ // Non-fatal - records will timeout and be redelivered automatically
520+ }
521+ } else {
522+ LOG .debug (
523+ "Share group '{}': CHECKPOINT {} ABORTED - No records to release" ,
524+ shareGroupId ,
525+ checkpointId );
526+ }
456527
528+ // Following Checkpoint Subsuming Contract: next successful checkpoint will handle these records
457529 super .notifyCheckpointAborted (checkpointId );
458530 }
459531
0 commit comments