Skip to content

Commit c381123

Browse files
committed
revert back to existing implementation: ack and commitSync
1 parent b34840e commit c381123

File tree

2 files changed

+72
-89
lines changed

2 files changed

+72
-89
lines changed

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaShareGroupSourceReader.java

Lines changed: 12 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -352,46 +352,41 @@ protected ShareGroupSubscriptionState toSplitType(
352352

353353
@Override
354354
public List<ShareGroupSubscriptionState> snapshotState(long checkpointId) {
355-
// Update current checkpoint ID for record association
356355
currentCheckpointId.set(checkpointId);
357356

358-
// Ensure share consumer is set in transaction manager
359357
ShareConsumer<byte[], byte[]> consumer = getShareConsumer();
360358
if (consumer != null && transactionManager != null) {
361359
transactionManager.setShareConsumer(consumer);
362360
}
363361

364-
// Get records for this checkpoint (checkpoint subsuming)
365362
Set<RecordMetadata> recordsToAck = acknowledgmentBuffer.getRecordsUpTo(checkpointId);
366363

367-
// Phase 1 of 2PC: Prepare acknowledgments
364+
// Phase 1: Mark records ready (DO NOT send to broker yet)
368365
if (!recordsToAck.isEmpty()) {
369366
try {
370-
transactionManager.prepareAcknowledgments(checkpointId, recordsToAck);
367+
transactionManager.markReadyForAcknowledgment(checkpointId, recordsToAck);
371368
LOG.info(
372-
"Share group '{}': CHECKPOINT {} PREPARED - {} records marked for acknowledgment",
369+
"Share group '{}': CHECKPOINT {} READY - {} records marked (not sent to broker)",
373370
shareGroupId,
374371
checkpointId,
375372
recordsToAck.size());
376373
} catch (Exception e) {
377374
LOG.error(
378-
"Share group '{}': CHECKPOINT {} PREPARE FAILED - transaction will be aborted",
375+
"Share group '{}': CHECKPOINT {} MARK FAILED",
379376
shareGroupId,
380377
checkpointId,
381378
e);
382-
throw new RuntimeException("Failed to prepare checkpoint " + checkpointId, e);
379+
throw new RuntimeException("Failed to mark checkpoint " + checkpointId, e);
383380
}
384381
} else {
385382
LOG.debug(
386-
"Share group '{}': CHECKPOINT {} SNAPSHOT - No records to prepare",
383+
"Share group '{}': CHECKPOINT {} SNAPSHOT - No records to mark",
387384
shareGroupId,
388385
checkpointId);
389386
}
390387

391-
// Get the current subscription state from parent
392388
List<ShareGroupSubscriptionState> states = super.snapshotState(checkpointId);
393389

394-
// Log checkpoint snapshot statistics
395390
AcknowledgmentBuffer.BufferStatistics stats = acknowledgmentBuffer.getStatistics();
396391
LOG.info(
397392
"Share group '{}': CHECKPOINT {} SNAPSHOT - {} records buffered across {} checkpoints (memory: {} bytes)",
@@ -401,16 +396,14 @@ public List<ShareGroupSubscriptionState> snapshotState(long checkpointId) {
401396
stats.getCheckpointCount(),
402397
stats.getMemoryUsageBytes());
403398

404-
// Return minimal subscription state - no offset tracking needed
405399
return states;
406400
}
407401

408402
/**
409403
* Callback when a checkpoint completes successfully.
410404
*
411-
* Phase 2 of 2PC: Commit transaction.
412-
* The broker applies acknowledgments atomically when checkpoint completes.
413-
* This ensures no data loss - records remain locked until checkpoint succeeds.
405+
* Phase 2 of 2PC: NOW send acknowledgments to broker.
406+
* Records stay locked until this method succeeds - ensuring no data loss.
414407
*
415408
* @param checkpointId the ID of the checkpoint that completed
416409
* @throws Exception if commit fails
@@ -419,7 +412,6 @@ public List<ShareGroupSubscriptionState> snapshotState(long checkpointId) {
419412
public void notifyCheckpointComplete(long checkpointId) throws Exception {
420413
final long startTime = System.currentTimeMillis();
421414

422-
// Get all records up to this checkpoint for statistics
423415
Set<RecordMetadata> processedRecords = acknowledgmentBuffer.getRecordsUpTo(checkpointId);
424416

425417
if (processedRecords.isEmpty()) {
@@ -432,17 +424,15 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
432424
}
433425

434426
LOG.info(
435-
"Share group '{}': CHECKPOINT {} COMPLETE - Committing transaction for {} records",
427+
"Share group '{}': CHECKPOINT {} COMPLETE - NOW sending {} acknowledgments to broker",
436428
shareGroupId,
437429
checkpointId,
438430
processedRecords.size());
439431

440432
try {
441-
// Phase 2 of 2PC: Commit transaction
442-
// Broker applies prepared acknowledgments atomically
433+
// Phase 2: Send acknowledgments to broker (ONLY when checkpoint completes)
443434
transactionManager.commitTransaction(checkpointId);
444435

445-
// Update metrics
446436
final long duration = System.currentTimeMillis() - startTime;
447437
if (shareGroupMetrics != null) {
448438
shareGroupMetrics.recordSuccessfulCommit();
@@ -452,11 +442,10 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
452442
}
453443
}
454444

455-
// Clean up buffer - remove processed record metadata
456445
int removedCount = acknowledgmentBuffer.removeUpTo(checkpointId);
457446

458447
LOG.info(
459-
"Share group '{}': CHECKPOINT {} SUCCESS - Committed {} records, cleaned up {} metadata entries in {}ms",
448+
"Share group '{}': CHECKPOINT {} SUCCESS - Committed {} records to broker, cleaned up {} metadata entries in {}ms",
460449
shareGroupId,
461450
checkpointId,
462451
processedRecords.size(),
@@ -465,7 +454,7 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
465454

466455
} catch (Exception e) {
467456
LOG.error(
468-
"Share group '{}': CHECKPOINT {} COMMIT FAILED",
457+
"Share group '{}': CHECKPOINT {} COMMIT FAILED - Records remain locked at broker",
469458
shareGroupId,
470459
checkpointId,
471460
e);
@@ -475,7 +464,6 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
475464
throw e;
476465
}
477466

478-
// Call parent implementation
479467
super.notifyCheckpointComplete(checkpointId);
480468
}
481469

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/transaction/FlinkTransactionManager.java

Lines changed: 60 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -32,16 +32,19 @@
3232
import java.util.concurrent.ConcurrentHashMap;
3333

3434
/**
35-
* Manages transactional acknowledgments for Flink share group source.
35+
* Coordinates acknowledgments with Flink checkpoint lifecycle for at-least-once semantics.
3636
*
37-
* Implements two-phase commit (2PC) to ensure no data loss:
38-
* - Phase 1 (Prepare): Send acks to broker on snapshotState
39-
* - Phase 2 (Commit): Broker applies acks on notifyCheckpointComplete
37+
* Two-phase commit coordinated with Flink checkpoints:
38+
* - Phase 1 (snapshotState): Buffer acks locally, records stay locked at broker
39+
* - Phase 2 (notifyCheckpointComplete): Send acks via commitSync(), uses Kafka's built-in 2PC
4040
*
41-
* Recovery logic:
42-
* - On restore, query broker for transaction state
43-
* - If PREPARED → commit (checkpoint was written)
44-
* - If ACTIVE → abort (checkpoint incomplete)
41+
* At-least-once guarantee:
42+
* - Records stay IN_FLIGHT (locked) at broker until checkpoint completes
43+
* - If checkpoint fails: locks timeout → records automatically redelivered
44+
* - If checkpoint succeeds: commitSync() atomically acknowledges records
45+
*
46+
* Note: Kafka's built-in commitSync() handles PREPARED→COMMITTED atomically (milliseconds).
47+
* This manager coordinates the TIMING of commitSync() with Flink's checkpoint lifecycle.
4548
*/
4649
@Internal
4750
public class FlinkTransactionManager {
@@ -50,11 +53,13 @@ public class FlinkTransactionManager {
5053
private final String shareGroupId;
5154
private ShareConsumer<?, ?> shareConsumer;
5255
private final Map<Long, TransactionState> checkpointTransactions;
56+
private final Map<Long, Set<RecordMetadata>> readyForAcknowledgment;
5357

5458
public FlinkTransactionManager(String shareGroupId, ShareConsumer<?, ?> shareConsumer) {
5559
this.shareGroupId = shareGroupId;
5660
this.shareConsumer = shareConsumer;
5761
this.checkpointTransactions = new ConcurrentHashMap<>();
62+
this.readyForAcknowledgment = new ConcurrentHashMap<>();
5863
}
5964

6065
/**
@@ -65,29 +70,57 @@ public void setShareConsumer(ShareConsumer<?, ?> shareConsumer) {
6570
}
6671

6772
/**
68-
* Prepare acknowledgments (Phase 1 of 2PC).
69-
* Called during snapshotState before checkpoint barrier.
73+
* Mark acknowledgments ready (Phase 1).
74+
* Stores records locally - does NOT send to broker yet.
75+
* Records remain locked (IN_FLIGHT) at broker until commitTransaction().
7076
*/
71-
public void prepareAcknowledgments(long checkpointId, Set<RecordMetadata> records) throws Exception {
77+
public void markReadyForAcknowledgment(long checkpointId, Set<RecordMetadata> records) {
7278
if (records.isEmpty()) {
73-
LOG.debug("Share group '{}': No records to prepare for checkpoint {}",
79+
LOG.debug("Share group '{}': No records to mark for checkpoint {}",
80+
shareGroupId, checkpointId);
81+
return;
82+
}
83+
84+
LOG.info("Share group '{}': Marking {} records ready for checkpoint {} (NOT sending to broker yet)",
85+
shareGroupId, records.size(), checkpointId);
86+
87+
readyForAcknowledgment.put(checkpointId, records);
88+
checkpointTransactions.put(checkpointId, TransactionState.READY);
89+
}
90+
91+
/**
92+
* Commit transaction (Phase 2).
93+
* Sends acks to broker using Kafka's built-in atomic commitSync().
94+
* Kafka internally: acknowledge() marks PREPARED, commitSync() applies atomically.
95+
*/
96+
public void commitTransaction(long checkpointId) throws Exception {
97+
Set<RecordMetadata> records = readyForAcknowledgment.remove(checkpointId);
98+
99+
if (records == null || records.isEmpty()) {
100+
LOG.debug("Share group '{}': No records to commit for checkpoint {}",
74101
shareGroupId, checkpointId);
75-
return
76-
;
102+
checkpointTransactions.remove(checkpointId);
103+
return;
104+
}
105+
106+
TransactionState state = checkpointTransactions.get(checkpointId);
107+
if (state != TransactionState.READY) {
108+
LOG.warn("Share group '{}': Cannot commit checkpoint {} in state {}",
109+
shareGroupId, checkpointId, state);
110+
return;
77111
}
78112

79-
LOG.info("Share group '{}': Preparing {} records for checkpoint {}",
113+
LOG.info("Share group '{}': Committing {} records for checkpoint {}",
80114
shareGroupId, records.size(), checkpointId);
81115

82116
try {
83-
// Group by partition for efficient acknowledgment
117+
// Send acknowledgments using Kafka's built-in atomic commit
84118
Map<TopicPartition, java.util.List<RecordMetadata>> byPartition = new ConcurrentHashMap<>();
85119
for (RecordMetadata meta : records) {
86120
TopicPartition tp = new TopicPartition(meta.getTopic(), meta.getPartition());
87121
byPartition.computeIfAbsent(tp, k -> new java.util.ArrayList<>()).add(meta);
88122
}
89123

90-
// Acknowledge records (marks them as prepared in broker)
91124
for (Map.Entry<TopicPartition, java.util.List<RecordMetadata>> entry : byPartition.entrySet()) {
92125
for (RecordMetadata meta : entry.getValue()) {
93126
shareConsumer.acknowledge(
@@ -97,59 +130,30 @@ public void prepareAcknowledgments(long checkpointId, Set<RecordMetadata> record
97130
}
98131
}
99132

100-
// Sync to ensure broker received acknowledgments
133+
// commitSync() atomically applies all acknowledgments at broker
101134
shareConsumer.commitSync(Duration.ofSeconds(30));
102135

103-
// Track transaction state
104-
checkpointTransactions.put(checkpointId, TransactionState.PREPARED);
136+
checkpointTransactions.put(checkpointId, TransactionState.COMMITTED);
137+
cleanupOldTransactions(checkpointId);
105138

106-
LOG.info("Share group '{}': Prepared checkpoint {} successfully",
139+
LOG.info("Share group '{}': Successfully committed checkpoint {}",
107140
shareGroupId, checkpointId);
108141

109142
} catch (Exception e) {
110-
LOG.error("Share group '{}': Failed to prepare checkpoint {}",
143+
LOG.error("Share group '{}': Failed to commit checkpoint {}",
111144
shareGroupId, checkpointId, e);
112145
checkpointTransactions.put(checkpointId, TransactionState.FAILED);
113146
throw e;
114147
}
115148
}
116149

117150
/**
118-
* Commit transaction (Phase 2 of 2PC).
119-
* Called on notifyCheckpointComplete - broker applies acknowledgments atomically.
120-
*/
121-
public void commitTransaction(long checkpointId) {
122-
TransactionState state = checkpointTransactions.get(checkpointId);
123-
if (state == null) {
124-
LOG.debug("Share group '{}': No transaction for checkpoint {}",
125-
shareGroupId, checkpointId);
126-
return;
127-
}
128-
129-
if (state != TransactionState.PREPARED) {
130-
LOG.warn("Share group '{}': Cannot commit checkpoint {} in state {}",
131-
shareGroupId, checkpointId, state);
132-
return;
133-
}
134-
135-
LOG.info("Share group '{}': Committing checkpoint {}", shareGroupId, checkpointId);
136-
137-
// Broker automatically applies prepared acknowledgments on checkpoint complete
138-
// No additional action needed - this is handled by Kafka coordinator
139-
140-
checkpointTransactions.put(checkpointId, TransactionState.COMMITTED);
141-
cleanupOldTransactions(checkpointId);
142-
}
143-
144-
/**
145-
* Abort transaction.
146-
* Called on notifyCheckpointAborted - releases record locks.
151+
* Abort transaction - releases record locks for redelivery.
147152
*/
148153
public void abortTransaction(long checkpointId, Set<RecordMetadata> records) {
149154
LOG.info("Share group '{}': Aborting checkpoint {}", shareGroupId, checkpointId);
150155

151156
try {
152-
// Release records back for redelivery
153157
for (RecordMetadata meta : records) {
154158
shareConsumer.acknowledge(
155159
meta.getConsumerRecord(),
@@ -158,32 +162,23 @@ public void abortTransaction(long checkpointId, Set<RecordMetadata> records) {
158162
}
159163

160164
shareConsumer.commitSync(Duration.ofSeconds(10));
161-
162165
checkpointTransactions.put(checkpointId, TransactionState.ABORTED);
163166

164167
} catch (Exception e) {
165168
LOG.error("Share group '{}': Failed to abort checkpoint {}",
166169
shareGroupId, checkpointId, e);
167-
// Records will timeout and be redelivered automatically
168170
}
169171

170172
cleanupOldTransactions(checkpointId);
171173
}
172174

173175
/**
174-
* Handle recovery after task restart.
175-
* Queries broker for transaction state and makes recovery decision.
176+
* Recovery is handled automatically by Kafka's lock timeout mechanism.
177+
* If task fails, locks expire and records are redelivered - no explicit action needed.
176178
*/
177179
public void recoverFromCheckpoint(long restoredCheckpointId) {
178-
LOG.info("Share group '{}': Recovering from checkpoint {}",
180+
LOG.info("Share group '{}': Recovering from checkpoint {} - relying on Kafka lock timeout for redelivery",
179181
shareGroupId, restoredCheckpointId);
180-
181-
// Query broker for transaction state
182-
// In actual implementation, this would use admin client to query broker
183-
// For now, conservative approach: assume need to restart
184-
185-
LOG.info("Share group '{}': Recovery complete - ready for new checkpoints",
186-
shareGroupId);
187182
}
188183

189184
private void cleanupOldTransactions(long completedCheckpointId) {
@@ -193,7 +188,7 @@ private void cleanupOldTransactions(long completedCheckpointId) {
193188
}
194189

195190
private enum TransactionState {
196-
PREPARED,
191+
READY,
197192
COMMITTED,
198193
ABORTED,
199194
FAILED

0 commit comments

Comments
 (0)