Skip to content

Commit 1c58211

Browse files
authored
Merge pull request #1363 Fixed listener stop partition
2 parents ee03640 + 98c2414 commit 1c58211

File tree

8 files changed

+120
-58
lines changed

8 files changed

+120
-58
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Fixed experimental topic listener handle stop partition event
2+
13
## v3.76.1
24
* Fixed `query.WithCommit()` flag behaviour for `tx.Execute` in query service
35
* OAuth 2.0 token exchange: allowed multiple resource parameters in according to https://www.rfc-editor.org/rfc/rfc8693

examples/topic/topicreader/topicreader_handler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func (h *TopicEventsHandler) OnReadMessages(
5555

5656
func (h *TopicEventsHandler) OnStartPartitionSessionRequest(
5757
ctx context.Context,
58-
event *topiclistener.StartPartitionSessionRequest,
58+
event *topiclistener.EventStartPartitionSession,
5959
) error {
6060
lockID, offset, err := lockPartition(ctx, event.PartitionSession.TopicPath, event.PartitionSession.PartitionID)
6161

@@ -75,7 +75,7 @@ func (h *TopicEventsHandler) OnStartPartitionSessionRequest(
7575

7676
func (h *TopicEventsHandler) OnStopPartitionSessionRequest(
7777
ctx context.Context,
78-
event *topiclistener.StopPartitionSessionRequest,
78+
event *topiclistener.EventStopPartitionSession,
7979
) error {
8080
h.m.Lock()
8181
lockID := h.locks[event.PartitionSession.PartitionSessionID]

internal/topic/topiclistenerinternal/event_handler.go

Lines changed: 38 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package topiclistenerinternal
33
import (
44
"context"
55

6+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/empty"
67
"github.com/ydb-platform/ydb-go-sdk/v3/internal/topic/topicreadercommon"
78
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader"
89
)
@@ -16,7 +17,7 @@ type EventHandler interface {
1617
// You can set topiclistener.StartPartitionSessionConfirm for change default settings.
1718
//
1819
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
19-
OnStartPartitionSessionRequest(ctx context.Context, event *PublicStartPartitionSessionEvent) error
20+
OnStartPartitionSessionRequest(ctx context.Context, event *PublicEventStartPartitionSession) error
2021

2122
// OnReadMessages called with batch of messages. Max count of messages limited by internal buffer size
2223
//
@@ -31,7 +32,7 @@ type EventHandler interface {
3132
// It is guaranteed about the method will be called least once.
3233
//
3334
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
34-
OnStopPartitionSessionRequest(ctx context.Context, event *PublicStopPartitionSessionEvent) error
35+
OnStopPartitionSessionRequest(ctx context.Context, event *PublicEventStopPartitionSession) error
3536
}
3637

3738
// PublicReadMessages
@@ -42,25 +43,37 @@ type PublicReadMessages struct {
4243
Batch *topicreader.Batch
4344
}
4445

45-
// PublicStartPartitionSessionEvent
46+
// PublicEventStartPartitionSession
4647
//
4748
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
48-
type PublicStartPartitionSessionEvent struct {
49+
type PublicEventStartPartitionSession struct {
4950
PartitionSession topicreadercommon.PublicPartitionSession
5051
CommittedOffset int64
5152
PartitionOffsets PublicOffsetsRange
52-
respChan chan PublicStartPartitionSessionConfirm
53+
confirm confirmStorage[PublicStartPartitionSessionConfirm]
54+
}
55+
56+
func NewPublicStartPartitionSessionEvent(
57+
session topicreadercommon.PublicPartitionSession,
58+
committedOffset int64,
59+
partitionOffsets PublicOffsetsRange,
60+
) *PublicEventStartPartitionSession {
61+
return &PublicEventStartPartitionSession{
62+
PartitionSession: session,
63+
CommittedOffset: committedOffset,
64+
PartitionOffsets: partitionOffsets,
65+
}
5366
}
5467

5568
// Confirm
5669
//
5770
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
58-
func (e *PublicStartPartitionSessionEvent) Confirm() {
71+
func (e *PublicEventStartPartitionSession) Confirm() {
5972
e.ConfirmWithParams(PublicStartPartitionSessionConfirm{})
6073
}
6174

62-
func (e *PublicStartPartitionSessionEvent) ConfirmWithParams(p PublicStartPartitionSessionConfirm) {
63-
e.respChan <- p
75+
func (e *PublicEventStartPartitionSession) ConfirmWithParams(p PublicStartPartitionSessionConfirm) {
76+
e.confirm.Set(p)
6477
}
6578

6679
// PublicStartPartitionSessionConfirm
@@ -97,10 +110,10 @@ type PublicOffsetsRange struct {
97110
End int64
98111
}
99112

100-
// PublicStopPartitionSessionEvent
113+
// PublicEventStopPartitionSession
101114
//
102115
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
103-
type PublicStopPartitionSessionEvent struct {
116+
type PublicEventStopPartitionSession struct {
104117
PartitionSession topicreadercommon.PublicPartitionSession
105118

106119
// Graceful mean about server is waiting for client finish work with the partition and confirm stop the work
@@ -109,17 +122,24 @@ type PublicStopPartitionSessionEvent struct {
109122
Graceful bool
110123
CommittedOffset int64
111124

112-
resp chan PublicStopPartitionSessionConfirm
125+
confirm confirmStorage[empty.Struct]
113126
}
114127

115-
// Confirm
116-
//
117-
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
118-
func (e *PublicStopPartitionSessionEvent) Confirm() {
119-
e.resp <- PublicStopPartitionSessionConfirm{}
128+
func NewPublicStopPartitionSessionEvent(
129+
partitionSession topicreadercommon.PublicPartitionSession,
130+
graceful bool,
131+
committedOffset int64,
132+
) *PublicEventStopPartitionSession {
133+
return &PublicEventStopPartitionSession{
134+
PartitionSession: partitionSession,
135+
Graceful: graceful,
136+
CommittedOffset: committedOffset,
137+
}
120138
}
121139

122-
// PublicStopPartitionSessionConfirm
140+
// Confirm
123141
//
124142
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
125-
type PublicStopPartitionSessionConfirm struct{}
143+
func (e *PublicEventStopPartitionSession) Confirm() {
144+
e.confirm.Set(empty.Struct{})
145+
}

internal/topic/topiclistenerinternal/event_handler_mock_test.go

Lines changed: 6 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/topic/topiclistenerinternal/stream_listener.go

Lines changed: 55 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"fmt"
77
"reflect"
8+
"sync"
89
"sync/atomic"
910
"time"
1011

@@ -268,15 +269,15 @@ func (l *streamListener) onStartPartitionRequest(
268269
PartitionSessionID: m.PartitionSession.PartitionSessionID,
269270
}
270271

271-
event := &PublicStartPartitionSessionEvent{
272-
PartitionSession: session.ToPublic(),
273-
CommittedOffset: m.CommittedOffset.ToInt64(),
274-
PartitionOffsets: PublicOffsetsRange{
272+
event := NewPublicStartPartitionSessionEvent(
273+
session.ToPublic(),
274+
m.CommittedOffset.ToInt64(),
275+
PublicOffsetsRange{
275276
Start: m.PartitionOffsets.Start.ToInt64(),
276277
End: m.PartitionOffsets.End.ToInt64(),
277278
},
278-
respChan: make(chan PublicStartPartitionSessionConfirm, 1),
279-
}
279+
)
280+
280281
err := l.handler.OnStartPartitionSessionRequest(ctx, event)
281282
if err != nil {
282283
return err
@@ -286,8 +287,8 @@ func (l *streamListener) onStartPartitionRequest(
286287
select {
287288
case <-ctx.Done():
288289
return ctx.Err()
289-
case userResp = <-event.respChan:
290-
// pass
290+
case <-event.confirm.Done():
291+
userResp, _ = event.confirm.Get()
291292
}
292293

293294
if userResp.readOffset != nil {
@@ -329,12 +330,11 @@ func (l *streamListener) onStopPartitionRequest(
329330
handlerCtx = session.Context()
330331
}
331332

332-
event := &PublicStopPartitionSessionEvent{
333-
PartitionSession: session.ToPublic(),
334-
Graceful: m.Graceful,
335-
CommittedOffset: m.CommittedOffset.ToInt64(),
336-
resp: make(chan PublicStopPartitionSessionConfirm, 1),
337-
}
333+
event := NewPublicStopPartitionSessionEvent(
334+
session.ToPublic(),
335+
m.Graceful,
336+
m.CommittedOffset.ToInt64(),
337+
)
338338

339339
if err = l.handler.OnStopPartitionSessionRequest(handlerCtx, event); err != nil {
340340
return err
@@ -344,15 +344,15 @@ func (l *streamListener) onStopPartitionRequest(
344344
// remove partition on the confirmation or on the listener closed
345345
select {
346346
case <-l.background.Done():
347-
case <-event.resp:
347+
case <-event.confirm.Done():
348348
}
349349
_, _ = l.sessions.Remove(m.PartitionSessionID)
350350
}()
351351

352352
select {
353353
case <-ctx.Done():
354354
return ctx.Err()
355-
case <-event.resp:
355+
case <-event.confirm.Done():
356356
// pass
357357
}
358358

@@ -396,3 +396,42 @@ func (l *streamListener) sendMessage(m rawtopicreader.ClientMessage) {
396396
default:
397397
}
398398
}
399+
400+
type confirmStorage[T any] struct {
401+
doneChan empty.Chan
402+
confirmed atomic.Bool
403+
val T
404+
confirmAction sync.Once
405+
initAction sync.Once
406+
}
407+
408+
func (c *confirmStorage[T]) init() {
409+
c.initAction.Do(func() {
410+
c.doneChan = make(empty.Chan)
411+
})
412+
}
413+
414+
func (c *confirmStorage[T]) Set(val T) {
415+
c.init()
416+
c.confirmAction.Do(func() {
417+
c.val = val
418+
c.confirmed.Store(true)
419+
close(c.doneChan)
420+
})
421+
}
422+
423+
func (c *confirmStorage[T]) Done() empty.ChanReadonly {
424+
c.init()
425+
426+
return c.doneChan
427+
}
428+
429+
func (c *confirmStorage[T]) Get() (val T, ok bool) {
430+
c.init()
431+
432+
if c.confirmed.Load() {
433+
return c.val, true
434+
}
435+
436+
return val, false
437+
}

internal/topic/topiclistenerinternal/stream_listener_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,15 @@ import (
1515
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicreader"
1616
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawydb"
1717
"github.com/ydb-platform/ydb-go-sdk/v3/internal/topic/topicreadercommon"
18+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest"
1819
)
1920

2021
func TestStreamListener_OnReceiveServerMessage(t *testing.T) {
2122
const batchBytes = 100
2223

2324
const seqNo int64 = 4
2425

25-
t.Run("onReadResponse", func(t *testing.T) {
26+
xtest.TestManyTimesWithName(t, "onReadResponse", func(t testing.TB) {
2627
e := fixenv.New(t)
2728
ctx := sf.Context(e)
2829

@@ -69,7 +70,7 @@ func TestStreamListener_OnReceiveServerMessage(t *testing.T) {
6970
},
7071
})
7172
})
72-
t.Run("onStartPartitionSession", func(t *testing.T) {
73+
xtest.TestManyTimesWithName(t, "onStartPartitionSession", func(t testing.TB) {
7374
e := fixenv.New(t)
7475

7576
respReadOffset := int64(16)
@@ -78,7 +79,7 @@ func TestStreamListener_OnReceiveServerMessage(t *testing.T) {
7879
EventHandlerMock(e).EXPECT().OnStartPartitionSessionRequest(
7980
gomock.Any(),
8081
gomock.Any(),
81-
).DoAndReturn(func(ctx context.Context, event *PublicStartPartitionSessionEvent) error {
82+
).DoAndReturn(func(ctx context.Context, event *PublicEventStartPartitionSession) error {
8283
require.Equal(t, topicreadercommon.PublicPartitionSession{
8384
PartitionSessionID: 1, // ClientPartitionSessionID
8485
TopicPath: "asd",
@@ -130,7 +131,7 @@ func TestStreamListener_OnReceiveServerMessage(t *testing.T) {
130131
require.NoError(t, err)
131132
require.NotNil(t, session)
132133
})
133-
t.Run("onStopPartitionRequest", func(t *testing.T) {
134+
xtest.TestManyTimesWithName(t, "onStopPartitionRequest", func(t testing.TB) {
134135
e := fixenv.New(t)
135136
ctx := sf.Context(e)
136137

@@ -139,7 +140,7 @@ func TestStreamListener_OnReceiveServerMessage(t *testing.T) {
139140
EventHandlerMock(e).EXPECT().OnStopPartitionSessionRequest(
140141
PartitionSession(e).Context(),
141142
gomock.Any(),
142-
).DoAndReturn(func(ctx context.Context, event *PublicStopPartitionSessionEvent) error {
143+
).DoAndReturn(func(ctx context.Context, event *PublicEventStopPartitionSession) error {
143144
require.Equal(t, PartitionSession(e).ClientPartitionSessionID, event.PartitionSession.PartitionSessionID)
144145
require.True(t, event.Graceful)
145146
require.Equal(t, int64(5), event.CommittedOffset)
@@ -173,7 +174,7 @@ func TestStreamListener_CloseSessionsOnCloseListener(t *testing.T) {
173174
EventHandlerMock(e).EXPECT().OnStopPartitionSessionRequest(
174175
PartitionSession(e).Context(),
175176
gomock.Any(),
176-
).Do(func(ctx context.Context, event *PublicStopPartitionSessionEvent) error {
177+
).Do(func(ctx context.Context, event *PublicEventStopPartitionSession) error {
177178
require.Equal(t, PartitionSession(e).ClientPartitionSessionID, event.PartitionSession.PartitionSessionID)
178179
require.False(t, event.Graceful)
179180
require.Equal(t, PartitionSession(e).CommittedOffset().ToInt64(), event.CommittedOffset)

tests/integration/topic_listener_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ type TestTopicListener_Handler struct {
5151

5252
listener *topiclistener.TopicListener
5353
readMessages *topiclistener.ReadMessages
54-
onPartitionStart *topiclistener.StartPartitionSessionRequest
55-
onPartitionStop *topiclistener.StopPartitionSessionRequest
54+
onPartitionStart *topiclistener.EventStartPartitionSession
55+
onPartitionStop *topiclistener.EventStopPartitionSession
5656
done empty.Chan
5757
}
5858

@@ -63,7 +63,7 @@ func (h *TestTopicListener_Handler) OnReaderCreated(event *topiclistener.ReaderR
6363

6464
func (h *TestTopicListener_Handler) OnStartPartitionSessionRequest(
6565
ctx context.Context,
66-
event *topiclistener.StartPartitionSessionRequest,
66+
event *topiclistener.EventStartPartitionSession,
6767
) error {
6868
h.onPartitionStart = event
6969
event.Confirm()
@@ -72,7 +72,7 @@ func (h *TestTopicListener_Handler) OnStartPartitionSessionRequest(
7272

7373
func (h *TestTopicListener_Handler) OnStopPartitionSessionRequest(
7474
ctx context.Context,
75-
event *topiclistener.StopPartitionSessionRequest,
75+
event *topiclistener.EventStopPartitionSession,
7676
) error {
7777
h.onPartitionStop = event
7878
event.Confirm()

0 commit comments

Comments
 (0)