Skip to content

Commit d54354d

Browse files
committed
fixed listener stop partition handling
1 parent 8ee0829 commit d54354d

File tree

5 files changed

+34
-24
lines changed

5 files changed

+34
-24
lines changed

examples/topic/topicreader/topicreader_handler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func (h *TopicEventsHandler) OnReadMessages(
5555

5656
func (h *TopicEventsHandler) OnStartPartitionSessionRequest(
5757
ctx context.Context,
58-
event *topiclistener.StartPartitionSessionRequest,
58+
event *topiclistener.StartPartitionSessionEvent,
5959
) error {
6060
lockID, offset, err := lockPartition(ctx, event.PartitionSession.TopicPath, event.PartitionSession.PartitionID)
6161

@@ -75,7 +75,7 @@ func (h *TopicEventsHandler) OnStartPartitionSessionRequest(
7575

7676
func (h *TopicEventsHandler) OnStopPartitionSessionRequest(
7777
ctx context.Context,
78-
event *topiclistener.StopPartitionSessionRequest,
78+
event *topiclistener.StopPartitionSessionEvent,
7979
) error {
8080
h.m.Lock()
8181
lockID := h.locks[event.PartitionSession.PartitionSessionID]

internal/topic/topiclistenerinternal/event_handler.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ func NewPublicStartPartitionSessionEvent(
6262
PartitionSession: session,
6363
CommittedOffset: committedOffset,
6464
PartitionOffsets: partitionOffsets,
65-
confirm: newConfirmStorage[PublicStartPartitionSessionConfirm](),
6665
}
6766
}
6867

@@ -126,12 +125,15 @@ type PublicStopPartitionSessionEvent struct {
126125
confirm confirmStorage[empty.Struct]
127126
}
128127

129-
func NewPublicStopPartitionSessionEvent(partitionSession topicreadercommon.PublicPartitionSession, graceful bool, committedOffset int64) *PublicStopPartitionSessionEvent {
128+
func NewPublicStopPartitionSessionEvent(
129+
partitionSession topicreadercommon.PublicPartitionSession,
130+
graceful bool,
131+
committedOffset int64,
132+
) *PublicStopPartitionSessionEvent {
130133
return &PublicStopPartitionSessionEvent{
131134
PartitionSession: partitionSession,
132135
Graceful: graceful,
133136
CommittedOffset: committedOffset,
134-
confirm: newConfirmStorage[empty.Struct](),
135137
}
136138
}
137139

internal/topic/topiclistenerinternal/stream_listener.go

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ func (l *streamListener) onStartPartitionRequest(
287287
select {
288288
case <-ctx.Done():
289289
return ctx.Err()
290-
case <-event.confirm.DoneChan:
290+
case <-event.confirm.Done():
291291
userResp, _ = event.confirm.Get()
292292
}
293293

@@ -344,15 +344,15 @@ func (l *streamListener) onStopPartitionRequest(
344344
// remove partition on the confirmation or on the listener closed
345345
select {
346346
case <-l.background.Done():
347-
case <-event.confirm.DoneChan:
347+
case <-event.confirm.Done():
348348
}
349349
_, _ = l.sessions.Remove(m.PartitionSessionID)
350350
}()
351351

352352
select {
353353
case <-ctx.Done():
354354
return ctx.Err()
355-
case <-event.confirm.DoneChan:
355+
case <-event.confirm.Done():
356356
// pass
357357
}
358358

@@ -398,27 +398,37 @@ func (l *streamListener) sendMessage(m rawtopicreader.ClientMessage) {
398398
}
399399

400400
type confirmStorage[T any] struct {
401-
DoneChan empty.Chan
401+
doneChan empty.Chan
402402
confirmed atomic.Bool
403403
val T
404404
confirmAction sync.Once
405+
initAction sync.Once
405406
}
406407

407-
func newConfirmStorage[T any]() confirmStorage[T] {
408-
return confirmStorage[T]{
409-
DoneChan: make(empty.Chan),
410-
}
408+
func (c *confirmStorage[T]) init() {
409+
c.initAction.Do(func() {
410+
c.doneChan = make(empty.Chan)
411+
})
411412
}
412413

413414
func (c *confirmStorage[T]) Set(val T) {
415+
c.init()
414416
c.confirmAction.Do(func() {
415417
c.val = val
416418
c.confirmed.Store(true)
417-
close(c.DoneChan)
419+
close(c.doneChan)
418420
})
419421
}
420422

423+
func (c *confirmStorage[T]) Done() empty.ChanReadonly {
424+
c.init()
425+
426+
return c.doneChan
427+
}
428+
421429
func (c *confirmStorage[T]) Get() (val T, ok bool) {
430+
c.init()
431+
422432
if c.confirmed.Load() {
423433
return c.val, true
424434
}

tests/integration/topic_listener_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ type TestTopicListener_Handler struct {
5151

5252
listener *topiclistener.TopicListener
5353
readMessages *topiclistener.ReadMessages
54-
onPartitionStart *topiclistener.StartPartitionSessionRequest
55-
onPartitionStop *topiclistener.StopPartitionSessionRequest
54+
onPartitionStart *topiclistener.StartPartitionSessionEvent
55+
onPartitionStop *topiclistener.StopPartitionSessionEvent
5656
done empty.Chan
5757
}
5858

@@ -63,7 +63,7 @@ func (h *TestTopicListener_Handler) OnReaderCreated(event *topiclistener.ReaderR
6363

6464
func (h *TestTopicListener_Handler) OnStartPartitionSessionRequest(
6565
ctx context.Context,
66-
event *topiclistener.StartPartitionSessionRequest,
66+
event *topiclistener.StartPartitionSessionEvent,
6767
) error {
6868
h.onPartitionStart = event
6969
event.Confirm()
@@ -72,7 +72,7 @@ func (h *TestTopicListener_Handler) OnStartPartitionSessionRequest(
7272

7373
func (h *TestTopicListener_Handler) OnStopPartitionSessionRequest(
7474
ctx context.Context,
75-
event *topiclistener.StopPartitionSessionRequest,
75+
event *topiclistener.StopPartitionSessionEvent,
7676
) error {
7777
h.onPartitionStop = event
7878
event.Confirm()

topic/topiclistener/event_handler.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func (b BaseHandler) OnReaderCreated(event *ReaderReady) error {
4747

4848
func (b BaseHandler) OnStartPartitionSessionRequest(
4949
ctx context.Context,
50-
event *StartPartitionSessionRequest,
50+
event *StartPartitionSessionEvent,
5151
) error {
5252
event.Confirm()
5353

@@ -61,7 +61,7 @@ func (b BaseHandler) OnStartPartitionSessionRequest(
6161
// not
6262
func (b BaseHandler) OnStopPartitionSessionRequest(
6363
ctx context.Context,
64-
event *StopPartitionSessionRequest,
64+
event *StopPartitionSessionEvent,
6565
) error {
6666
event.Confirm()
6767

@@ -76,10 +76,8 @@ func (b BaseHandler) OnReadMessages(
7676
}
7777

7878
type (
79-
StartPartitionSessionRequest = topiclistenerinternal.PublicStartPartitionSessionEvent
80-
StartPartitionSessionConfirm = topiclistenerinternal.PublicStartPartitionSessionConfirm
81-
StopPartitionSessionRequest = topiclistenerinternal.PublicStopPartitionSessionEvent
82-
StopPartitionSessionConfirm = topiclistenerinternal.PublicStopPartitionSessionConfirm
79+
StartPartitionSessionEvent = topiclistenerinternal.PublicStartPartitionSessionEvent
80+
StopPartitionSessionEvent = topiclistenerinternal.PublicStopPartitionSessionEvent
8381
)
8482

8583
type PartitionSession = topicreadercommon.PublicPartitionSession

0 commit comments

Comments
 (0)