Skip to content

Commit 028cc9f

Browse files
rekbyasmyasnikov
andauthored
Fixed race condition between close writer queue and receive acks in background stream. (#1054)
Co-authored-by: Aleksey Myasnikov <79263256394@ya.ru>
1 parent f838b39 commit 028cc9f

File tree

4 files changed

+26
-1
lines changed

4 files changed

+26
-1
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
* Fixed sometime panic on topic writer closing
12
* Added experimental query parameters builder `ydb.ParamsBuilder()`
23
* Changed types of `table/table.{QueryParameters,ParameterOption}` to aliases on `internal/params.{Parameters,NamedValue}`
34
* Fixed bug with optional decimal serialization

internal/topic/topicwriterinternal/queue.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
var (
1616
errCloseClosedMessageQueue = xerrors.Wrap(errors.New("ydb: close closed message queue"))
17+
errAckOnClosedMessageQueue = xerrors.Wrap(errors.New("ydb: ack on closed message queue"))
1718
errGetMessageFromClosedQueue = xerrors.Wrap(errors.New("ydb: get message from closed message queue"))
1819
errAddUnorderedMessages = xerrors.Wrap(errors.New("ydb: add unordered messages"))
1920
errAckUnexpectedMessage = xerrors.Wrap(errors.New("ydb: ack unexpected message"))
@@ -151,6 +152,9 @@ func (q *messageQueue) AcksReceived(acks []rawtopicwriter.WriteAck) error {
151152
q.OnAckReceived(ackReceivedCounter)
152153
}
153154
}()
155+
if q.closed {
156+
return xerrors.WithStackTrace(errAckOnClosedMessageQueue)
157+
}
154158

155159
for i := range acks {
156160
if err := q.ackReceivedNeedLock(acks[i].SeqNo); err != nil {

internal/topic/topicwriterinternal/queue_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,26 @@ func TestQueuePanicOnOverflow(t *testing.T) {
407407
})
408408
}
409409

410+
func TestRegressionIssue1038_ReceiveAckAfterCloseQueue(t *testing.T) {
411+
counter := 0
412+
413+
q := newMessageQueue()
414+
q.OnAckReceived = func(count int) {
415+
counter -= count
416+
}
417+
require.NoError(t, q.AddMessages(newTestMessagesWithContent(1)))
418+
counter++
419+
420+
require.NoError(t, q.Close(errors.New("test err")))
421+
require.ErrorIs(t, q.AcksReceived([]rawtopicwriter.WriteAck{
422+
{
423+
SeqNo: 1,
424+
MessageWriteStatus: rawtopicwriter.MessageWriteStatus{},
425+
},
426+
}), errAckOnClosedMessageQueue)
427+
require.Zero(t, counter)
428+
}
429+
410430
func TestQueue_Ack(t *testing.T) {
411431
t.Run("First", func(t *testing.T) {
412432
q := newMessageQueue()

internal/topic/topicwriterinternal/writer_single_stream.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ func (w *SingleStreamWriter) receiveMessagesLoop(ctx context.Context) {
189189

190190
switch m := mess.(type) {
191191
case *rawtopicwriter.WriteResult:
192-
if err = w.cfg.queue.AcksReceived(m.Acks); err != nil {
192+
if err = w.cfg.queue.AcksReceived(m.Acks); err != nil && !errors.Is(err, errCloseClosedMessageQueue) {
193193
reason := xerrors.WithStackTrace(err)
194194
closeCtx, closeCtxCancel := xcontext.WithCancel(ctx)
195195
closeCtxCancel()

0 commit comments

Comments
 (0)