Skip to content

Commit e11c361

Browse files
committed
add batch flusher
1 parent deaca20 commit e11c361

File tree

4 files changed

+150
-12
lines changed

4 files changed

+150
-12
lines changed

internal/topic/topicreaderinternal/batcher.go

Lines changed: 59 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xslices"
78
"sync/atomic"
89

910
"github.com/ydb-platform/ydb-go-sdk/v3/internal/empty"
@@ -26,6 +27,7 @@ type batcher struct {
2627
closed bool
2728
closeChan empty.Chan
2829
messages batcherMessagesMap
30+
sessionsForFlush []*topicreadercommon.PartitionSession
2931
}
3032

3133
func newBatcher() *batcher {
@@ -81,6 +83,20 @@ func (b *batcher) PushRawMessage(session *topicreadercommon.PartitionSession, m
8183
return b.addNeedLock(session, newBatcherItemRawMessage(m))
8284
}
8385

86+
func (b *batcher) FlushPartitionSession(session *topicreadercommon.PartitionSession) {
87+
b.m.WithLock(func() {
88+
if b.closed {
89+
return
90+
}
91+
92+
if _, ok := b.messages[session]; !ok {
93+
return
94+
}
95+
96+
b.sessionsForFlush = append(b.sessionsForFlush, session)
97+
})
98+
}
99+
84100
func (b *batcher) addNeedLock(session *topicreadercommon.PartitionSession, item batcherMessageOrderItem) error {
85101
var currentItems batcherMessageOrderItems
86102
var ok bool
@@ -250,31 +266,59 @@ func (b *batcher) findNeedLock(filter batcherGetOptions) batcherResultCandidate
250266
return batcherResultCandidate{}
251267
}
252268

253-
rawMessageOpts := batcherGetOptions{rawMessagesOnly: true}
254-
255269
var batchResult batcherResultCandidate
256270
needBatchResult := true
257271

258-
for k, items := range b.messages {
259-
head, rest, ok := rawMessageOpts.cutBatchItemsHead(items)
260-
if ok {
261-
return newBatcherResultCandidate(k, head, rest, true)
272+
for _, session := range b.sessionsForFlush {
273+
items := b.messages[session]
274+
candidate := b.findNeedLockProcessItem(filter, session, items, true)
275+
if !candidate.Result.IsEmpty() {
276+
return candidate
262277
}
278+
}
263279

264-
if needBatchResult {
265-
head, rest, ok = b.applyForceFlagToOptions(filter).cutBatchItemsHead(items)
266-
if !ok {
267-
continue
268-
}
280+
for session, items := range b.messages {
281+
candidate := b.findNeedLockProcessItem(filter, session, items, needBatchResult)
282+
283+
if candidate.Result.IsRawMessage() {
284+
return candidate
285+
}
269286

287+
if candidate.Result.IsBatch() {
288+
batchResult = candidate
270289
needBatchResult = false
271-
batchResult = newBatcherResultCandidate(k, head, rest, true)
272290
}
273291
}
274292

275293
return batchResult
276294
}
277295

296+
func (b *batcher) findNeedLockProcessItem(
297+
filter batcherGetOptions,
298+
k *topicreadercommon.PartitionSession,
299+
items batcherMessageOrderItems,
300+
needBatchResult bool,
301+
) (
302+
result batcherResultCandidate,
303+
) {
304+
rawMessageOpts := batcherGetOptions{rawMessagesOnly: true}
305+
head, rest, ok := rawMessageOpts.cutBatchItemsHead(items)
306+
if ok {
307+
return newBatcherResultCandidate(k, head, rest, true)
308+
}
309+
310+
if needBatchResult {
311+
head, rest, ok = b.applyForceFlagToOptions(filter).cutBatchItemsHead(items)
312+
if !ok {
313+
return batcherResultCandidate{}
314+
}
315+
316+
return newBatcherResultCandidate(k, head, rest, true)
317+
}
318+
319+
return batcherResultCandidate{}
320+
}
321+
278322
func (b *batcher) applyForceFlagToOptions(options batcherGetOptions) batcherGetOptions {
279323
if !b.forceIgnoreMinRestrictionsOnNextMessagesBatch {
280324
return options
@@ -289,6 +333,9 @@ func (b *batcher) applyForceFlagToOptions(options batcherGetOptions) batcherGetO
289333
func (b *batcher) applyNeedLock(res *batcherResultCandidate) {
290334
if res.Rest.IsEmpty() && res.WaiterIndex >= 0 {
291335
delete(b.messages, res.Key)
336+
if len(b.sessionsForFlush) > 0 && b.sessionsForFlush[0] == res.Key {
337+
b.sessionsForFlush = xslices.Delete(b.sessionsForFlush, 0, 1)
338+
}
292339
} else {
293340
b.messages[res.Key] = res.Rest
294341
}

internal/topic/topicreaderinternal/batcher_test.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,62 @@ func TestBatcher_PopMinIgnored(t *testing.T) {
316316
})
317317
}
318318

319+
func TestBatcher_PopFlushed(t *testing.T) {
320+
ctx := context.Background()
321+
//rnd := rand.New(rand.NewSource(0))
322+
323+
s1 := topicreadercommon.NewPartitionSession(ctx,
324+
"test",
325+
1,
326+
-1,
327+
"test",
328+
partitionSessionID(1),
329+
1,
330+
0,
331+
)
332+
s2 := topicreadercommon.NewPartitionSession(ctx,
333+
"test",
334+
2,
335+
-1,
336+
"test",
337+
partitionSessionID(1),
338+
1,
339+
0,
340+
)
341+
342+
sessions := []*topicreadercommon.PartitionSession{s1, s2}
343+
for flushIndex := range sessions {
344+
b := newBatcher()
345+
err := b.PushBatches(
346+
xtest.Must(
347+
topicreadercommon.NewBatch(
348+
s1,
349+
[]*topicreadercommon.PublicMessage{
350+
topicreadercommon.NewPublicMessageBuilder().PartitionSession(s1).Build(),
351+
},
352+
),
353+
),
354+
xtest.Must(
355+
topicreadercommon.NewBatch(
356+
s2,
357+
[]*topicreadercommon.PublicMessage{
358+
topicreadercommon.NewPublicMessageBuilder().PartitionSession(s2).Build(),
359+
},
360+
),
361+
),
362+
)
363+
require.NoError(t, err)
364+
365+
s := sessions[flushIndex]
366+
b.FlushPartitionSession(s)
367+
368+
res, err := b.Pop(ctx, batcherGetOptions{})
369+
require.NoError(t, err)
370+
371+
require.Same(t, s, topicreadercommon.BatchGetPartitionSession(res.Batch))
372+
}
373+
}
374+
319375
func TestBatcherConcurency(t *testing.T) {
320376
xtest.TestManyTimesWithName(t, "OneBatch", func(tb testing.TB) {
321377
b := newBatcher()

internal/xslices/delete.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
//go:build go1.22
2+
3+
package xslices
4+
5+
import "slices"
6+
7+
// Delete removes the elements s[i:j] from s, returning the modified slice.
8+
// Delete panics if j > len(s) or s[i:j] is not a valid slice of s.
9+
// Delete is O(len(s)-i), so if many items must be deleted, it is better to
10+
// make a single call deleting them all together than to delete one at a time.
11+
// Delete zeroes the elements s[len(s)-(j-i):len(s)].
12+
func Delete[S ~[]E, E any](s S, i, j int) S {
13+
return slices.Delete(s, i, j)
14+
}

internal/xslices/delete_old.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
//go:build !go1.22
2+
3+
package xslices
4+
5+
// Delete removes the elements s[i:j] from s, returning the modified slice.
6+
// Delete panics if j > len(s) or s[i:j] is not a valid slice of s.
7+
// Delete is O(len(s)-i), so if many items must be deleted, it is better to
8+
// make a single call deleting them all together than to delete one at a time.
9+
// Delete zeroes the elements s[len(s)-(j-i):len(s)].
10+
func Delete[S ~[]E, E any](s S, i, j int) S {
11+
_ = s[i:j:len(s)] // bounds check
12+
13+
if i == j {
14+
return s
15+
}
16+
17+
oldlen := len(s)
18+
s = append(s[:i], s[j:]...)
19+
clear(s[len(s):oldlen]) // zero/nil out the obsolete elements, for GC
20+
return s
21+
}

0 commit comments

Comments
 (0)