Skip to content

Commit 4400dcd

Browse files
feat: introduce ack trackers
1 parent 953d9ea commit 4400dcd

File tree

2 files changed

+104
-0
lines changed

2 files changed

+104
-0
lines changed

pulsar/consumer_partition.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ type partitionConsumer struct {
178178
chunkedMsgCtxMap *chunkedMsgCtxMap
179179
unAckChunksTracker *unAckChunksTracker
180180
ackGroupingTracker ackGroupingTracker
181+
ackTrackers *ackTrackers
181182

182183
lastMessageInBroker *trackingMessageID
183184

@@ -375,6 +376,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
375376
pc.decryptor = decryptor
376377

377378
pc.nackTracker = newNegativeAcksTracker(pc, options.nackRedeliveryDelay, options.nackBackoffPolicy, pc.log)
379+
pc.ackTrackers = newAckTrackers()
378380

379381
err := pc.grabConn("")
380382
if err != nil {
@@ -443,6 +445,9 @@ func (pc *partitionConsumer) ackIDCommon(msgID MessageID, withResponse bool, txn
443445
}
444446

445447
trackingID := toTrackingMessageID(msgID)
448+
if trackingID != nil && trackingID.tracker == nil {
449+
trackingID.tracker = pc.ackTrackers.tracker(trackingID)
450+
}
446451

447452
if trackingID != nil && trackingID.ack() {
448453
// All messages in the same batch have been acknowledged, we only need to acknowledge the
@@ -453,6 +458,7 @@ func (pc *partitionConsumer) ackIDCommon(msgID MessageID, withResponse bool, txn
453458
entryID: trackingID.entryID,
454459
},
455460
}
461+
pc.ackTrackers.remove(trackingID)
456462
pc.metrics.AcksCounter.Inc()
457463
pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-trackingID.receivedTime.UnixNano()) / 1.0e9)
458464
} else if !pc.options.enableBatchIndexAck {
@@ -712,6 +718,9 @@ func (pc *partitionConsumer) internalAckIDCumulative(msgID MessageID, withRespon
712718
if trackingID == nil {
713719
return errors.New("failed to convert trackingMessageID")
714720
}
721+
if trackingID.tracker == nil {
722+
trackingID.tracker = pc.ackTrackers.tracker(trackingID)
723+
}
715724

716725
var msgIDToAck *trackingMessageID
717726
if trackingID.ackCumulative() || pc.options.enableBatchIndexAck {
@@ -725,6 +734,7 @@ func (pc *partitionConsumer) internalAckIDCumulative(msgID MessageID, withRespon
725734
return nil
726735
}
727736

737+
pc.ackTrackers.remove(msgIDToAck)
728738
pc.metrics.AcksCounter.Inc()
729739
pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-trackingID.receivedTime.UnixNano()) / 1.0e9)
730740

@@ -1162,6 +1172,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
11621172
ackTracker)
11631173
// set the consumer so we know how to ack the message id
11641174
trackingMsgID.consumer = pc
1175+
pc.ackTrackers.add(trackingMsgID, ackTracker)
11651176

11661177
if pc.messageShouldBeDiscarded(trackingMsgID) {
11671178
pc.AckID(trackingMsgID)
@@ -2366,3 +2377,32 @@ func (u *unAckChunksTracker) nack(cmid *chunkMessageID) {
23662377
}
23672378
u.remove(cmid)
23682379
}
2380+
2381+
type ackTrackers struct {
2382+
mu sync.RWMutex
2383+
trackers map[[2]int64]*ackTracker
2384+
}
2385+
2386+
func newAckTrackers() *ackTrackers {
2387+
return &ackTrackers{
2388+
trackers: make(map[[2]int64]*ackTracker),
2389+
}
2390+
}
2391+
2392+
func (a *ackTrackers) tracker(id MessageID) *ackTracker {
2393+
a.mu.RLock()
2394+
defer a.mu.RUnlock()
2395+
return a.trackers[[2]int64{id.LedgerID(), id.EntryID()}]
2396+
}
2397+
2398+
func (a *ackTrackers) add(id MessageID, tracker *ackTracker) {
2399+
a.mu.Lock()
2400+
defer a.mu.Unlock()
2401+
a.trackers[[2]int64{id.LedgerID(), id.EntryID()}] = tracker
2402+
}
2403+
2404+
func (a *ackTrackers) remove(id MessageID) {
2405+
a.mu.Lock()
2406+
defer a.mu.Unlock()
2407+
delete(a.trackers, [2]int64{id.LedgerID(), id.EntryID()})
2408+
}

pulsar/consumer_partition_test.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
3636
options: &partitionConsumerOpts{},
3737
metrics: newTestMetrics(),
3838
decryptor: crypto.NewNoopDecryptor(),
39+
ackTrackers: newAckTrackers(),
3940
}
4041
pc.availablePermits = &availablePermits{pc: &pc}
4142
pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
@@ -75,6 +76,7 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
7576
options: &partitionConsumerOpts{},
7677
metrics: newTestMetrics(),
7778
decryptor: crypto.NewNoopDecryptor(),
79+
ackTrackers: newAckTrackers(),
7880
}
7981
pc.availablePermits = &availablePermits{pc: &pc}
8082
pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
@@ -111,6 +113,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
111113
options: &partitionConsumerOpts{},
112114
metrics: newTestMetrics(),
113115
decryptor: crypto.NewNoopDecryptor(),
116+
ackTrackers: newAckTrackers(),
114117
}
115118
pc.availablePermits = &availablePermits{pc: &pc}
116119
pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
@@ -150,6 +153,67 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
150153
}
151154
}
152155

156+
func TestBatchMessageIDWithAckTrackers(t *testing.T) {
157+
eventsCh := make(chan interface{}, 1)
158+
pc := partitionConsumer{
159+
queueCh: make(chan []*message, 1),
160+
eventsCh: eventsCh,
161+
compressionProviders: sync.Map{},
162+
options: &partitionConsumerOpts{},
163+
metrics: newTestMetrics(),
164+
decryptor: crypto.NewNoopDecryptor(),
165+
ackTrackers: newAckTrackers(),
166+
}
167+
pc.availablePermits = &availablePermits{pc: &pc}
168+
pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
169+
func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil)
170+
171+
headersAndPayload := internal.NewBufferWrapper(rawBatchMessage10)
172+
if err := pc.MessageReceived(nil, headersAndPayload); err != nil {
173+
t.Fatal(err)
174+
}
175+
176+
// ensure the tracker was set on the message id
177+
messages := <-pc.queueCh
178+
for _, m := range messages {
179+
assert.NotNil(t, m.ID().(*trackingMessageID).tracker)
180+
}
181+
182+
noAckTrackerMessages := make([]MessageID, 10)
183+
for i, m := range messages {
184+
tmp := m.ID().Serialize()
185+
mid, err := DeserializeMessageID(tmp)
186+
if err != nil {
187+
t.Fatal(err)
188+
}
189+
noAckTrackerMessages[i] = mid
190+
}
191+
192+
// ack all message ids except the last one
193+
for i := 0; i < 9; i++ {
194+
_, ok := noAckTrackerMessages[i].(*trackingMessageID)
195+
assert.False(t, ok)
196+
err := pc.AckID(noAckTrackerMessages[i])
197+
assert.Nil(t, err)
198+
}
199+
200+
select {
201+
case <-eventsCh:
202+
t.Error("The message id should not be acked!")
203+
default:
204+
}
205+
206+
// ack last message
207+
err := pc.AckID(noAckTrackerMessages[9])
208+
assert.Nil(t, err)
209+
210+
select {
211+
case <-eventsCh:
212+
default:
213+
t.Error("Expected an ack request to be triggered!")
214+
}
215+
}
216+
153217
// Raw single message in old format
154218
// metadata properties:<key:"a" value:"1" > properties:<key:"b" value:"2" >
155219
// payload = "hello"

0 commit comments

Comments
 (0)