Skip to content

Commit 1583621

Browse files
committed
splitmerge work with mock test
1 parent e11c361 commit 1583621

File tree

9 files changed

+354
-28
lines changed

9 files changed

+354
-28
lines changed

internal/grpcwrapper/rawtopic/rawtopicreader/messages.go

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ var (
2020
errUnexpectedProtoNilStartPartitionSessionRequest = xerrors.Wrap(errors.New("ydb: unexpected proto nil start partition session request")) //nolint:lll
2121
errUnexpectedNilPartitionSession = xerrors.Wrap(errors.New("ydb: unexpected proto nil partition session in start partition session request")) //nolint:lll
2222
errUnexpectedGrpcNilStopPartitionSessionRequest = xerrors.Wrap(errors.New("ydb: unexpected grpc nil stop partition session request")) //nolint:lll
23+
errUnexpectedGrpcNilEndPartitionSession = xerrors.Wrap(errors.New("ydb: unexpected grpc nil end partition session")) //nolint:lll
2324
)
2425

2526
type PartitionSessionID int64
@@ -92,12 +93,14 @@ type InitRequest struct {
9293

9394
TopicsReadSettings []TopicReadSettings
9495

95-
Consumer string
96+
Consumer string
97+
AutoPartitioningSupport bool
9698
}
9799

98100
func (r *InitRequest) toProto() *Ydb_Topic.StreamReadMessage_InitRequest {
99101
p := &Ydb_Topic.StreamReadMessage_InitRequest{
100-
Consumer: r.Consumer,
102+
Consumer: r.Consumer,
103+
AutoPartitioningSupport: r.AutoPartitioningSupport,
101104
}
102105

103106
p.TopicsReadSettings = make([]*Ydb_Topic.StreamReadMessage_InitRequest_TopicReadSettings, len(r.TopicsReadSettings))
@@ -477,3 +480,25 @@ func (r *StopPartitionSessionResponse) toProto() *Ydb_Topic.StreamReadMessage_St
477480
PartitionSessionId: r.PartitionSessionID.ToInt64(),
478481
}
479482
}
483+
484+
type EndPartitionSession struct {
485+
serverMessageImpl
486+
487+
rawtopiccommon.ServerMessageMetadata
488+
489+
PartitionSessionID PartitionSessionID
490+
AdjacentPartitionIDs []int64
491+
ChildPartitionIDs []int64
492+
}
493+
494+
func (r *EndPartitionSession) fromProto(proto *Ydb_Topic.StreamReadMessage_EndPartitionSession) error {
495+
if proto == nil {
496+
return xerrors.WithStackTrace(errUnexpectedGrpcNilEndPartitionSession)
497+
}
498+
499+
r.PartitionSessionID.FromInt64(proto.GetPartitionSessionId())
500+
r.AdjacentPartitionIDs = proto.GetAdjacentPartitionIds()
501+
r.ChildPartitionIDs = proto.GetChildPartitionIds()
502+
503+
return nil
504+
}

internal/grpcwrapper/rawtopic/rawtopicreader/rawtopicreader.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,15 @@ func (s StreamReader) Recv() (ServerMessage, error) {
8282
}
8383

8484
return req, nil
85+
86+
case *Ydb_Topic.StreamReadMessage_FromServer_EndPartitionSession:
87+
req := &EndPartitionSession{}
88+
req.ServerMessageMetadata = meta
89+
if err = req.fromProto(m.EndPartitionSession); err != nil {
90+
return nil, err
91+
}
92+
return req, nil
93+
8594
case *Ydb_Topic.StreamReadMessage_FromServer_CommitOffsetResponse:
8695
resp := &CommitOffsetResponse{}
8796
resp.ServerMessageMetadata = meta

internal/topic/topicreadercommon/init_message.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ import "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawt
44

55
func CreateInitMessage(consumer string, selectors []*PublicReadSelector) *rawtopicreader.InitRequest {
66
res := &rawtopicreader.InitRequest{
7-
Consumer: consumer,
7+
Consumer: consumer,
8+
AutoPartitioningSupport: true,
89
}
910

1011
res.TopicsReadSettings = make([]rawtopicreader.TopicReadSettings, len(selectors))

internal/topic/topicreadercommon/read_partition_session.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ type PartitionSession struct {
2323

2424
lastReceivedOffsetEndVal atomic.Int64
2525
committedOffsetVal atomic.Int64
26+
noMoreMessages atomic.Bool
2627
}
2728

2829
func NewPartitionSession(
@@ -102,6 +103,14 @@ func (s *PartitionSession) SetLastReceivedMessageOffset(v rawtopiccommon.Offset)
102103
s.lastReceivedOffsetEndVal.Store(v.ToInt64())
103104
}
104105

106+
func (s *PartitionSession) NoMoreMessages() bool {
107+
return s.noMoreMessages.Load()
108+
}
109+
110+
func (s *PartitionSession) SetNoMoreMessages() {
111+
s.noMoreMessages.Store(true)
112+
}
113+
105114
func (s *PartitionSession) ToPublic() PublicPartitionSession {
106115
return PublicPartitionSession{
107116
PartitionSessionID: s.ClientPartitionSessionID,

internal/topic/topicreaderinternal/batcher_test.go

Lines changed: 26 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package topicreaderinternal
33
import (
44
"context"
55
"errors"
6+
"math/rand"
67
"sync/atomic"
78
"testing"
89
"time"
@@ -318,30 +319,29 @@ func TestBatcher_PopMinIgnored(t *testing.T) {
318319

319320
func TestBatcher_PopFlushed(t *testing.T) {
320321
ctx := context.Background()
321-
//rnd := rand.New(rand.NewSource(0))
322-
323-
s1 := topicreadercommon.NewPartitionSession(ctx,
324-
"test",
325-
1,
326-
-1,
327-
"test",
328-
partitionSessionID(1),
329-
1,
330-
0,
331-
)
332-
s2 := topicreadercommon.NewPartitionSession(ctx,
333-
"test",
334-
2,
335-
-1,
336-
"test",
337-
partitionSessionID(1),
338-
1,
339-
0,
340-
)
341-
342-
sessions := []*topicreadercommon.PartitionSession{s1, s2}
343-
for flushIndex := range sessions {
322+
rnd := rand.New(rand.NewSource(0))
323+
324+
xtest.TestManyTimes(t, func(t testing.TB) {
344325
b := newBatcher()
326+
s1 := topicreadercommon.NewPartitionSession(ctx,
327+
"test",
328+
1,
329+
-1,
330+
"test",
331+
partitionSessionID(1),
332+
1,
333+
0,
334+
)
335+
s2 := topicreadercommon.NewPartitionSession(ctx,
336+
"test",
337+
2,
338+
-1,
339+
"test",
340+
partitionSessionID(1),
341+
1,
342+
0,
343+
)
344+
345345
err := b.PushBatches(
346346
xtest.Must(
347347
topicreadercommon.NewBatch(
@@ -362,14 +362,15 @@ func TestBatcher_PopFlushed(t *testing.T) {
362362
)
363363
require.NoError(t, err)
364364

365-
s := sessions[flushIndex]
365+
sessions := []*topicreadercommon.PartitionSession{s1, s2}
366+
s := sessions[rnd.Intn(len(sessions))]
366367
b.FlushPartitionSession(s)
367368

368369
res, err := b.Pop(ctx, batcherGetOptions{})
369370
require.NoError(t, err)
370371

371372
require.Same(t, s, topicreadercommon.BatchGetPartitionSession(res.Batch))
372-
}
373+
}, xtest.StopAfter(time.Millisecond))
373374
}
374375

375376
func TestBatcherConcurency(t *testing.T) {

internal/topic/topicreaderinternal/stream_reader_impl.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -702,6 +702,10 @@ func (r *topicStreamReaderImpl) readMessagesLoop(ctx context.Context) {
702702

703703
return
704704
}
705+
case *rawtopicreader.EndPartitionSession:
706+
if err = r.onEndPartitionSession(m); err != nil {
707+
_ = r.CloseWithError(ctx, err)
708+
}
705709
case *rawtopicreader.CommitOffsetResponse:
706710
if err = r.onCommitResponse(m); err != nil {
707711
_ = r.CloseWithError(ctx, err)
@@ -971,9 +975,31 @@ func (r *topicStreamReaderImpl) onStopPartitionSessionRequest(m *rawtopicreader.
971975
return err
972976
}
973977

978+
session.SetNoMoreMessages()
974979
if !m.Graceful {
975980
session.Close()
976981
}
977982

978983
return r.batcher.PushRawMessage(session, m)
979984
}
985+
986+
func (r *topicStreamReaderImpl) onEndPartitionSession(m *rawtopicreader.EndPartitionSession) error {
987+
if session, err := r.sessionController.Get(m.PartitionSessionID); err == nil {
988+
trace.TopicOnReaderEndPartitionSession(
989+
r.cfg.Trace,
990+
r.readConnectionID,
991+
session.Context(),
992+
session.Topic,
993+
session.PartitionID,
994+
m.PartitionSessionID.ToInt64(),
995+
m.AdjacentPartitionIDs,
996+
m.ChildPartitionIDs,
997+
)
998+
session.SetNoMoreMessages()
999+
r.batcher.FlushPartitionSession(session)
1000+
1001+
return nil
1002+
} else {
1003+
return xerrors.Retryable(xerrors.Wrap(fmt.Errorf("ydb: unknown partition for end partition session: %w", err)))
1004+
}
1005+
}

internal/topic/topicreaderinternal/stream_reader_impl_test.go

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,177 @@ func TestStreamReaderImpl_OnPartitionCloseHandle(t *testing.T) {
475475
})
476476
}
477477

478+
func TestStreamReaderImpl_TestEndSessionForSplitMergePartitions(t *testing.T) {
479+
t.Run("Split", func(t *testing.T) {
480+
xtest.TestManyTimes(t, func(t testing.TB) {
481+
482+
e := newTopicReaderTestEnv(t)
483+
484+
// doesn't check sends
485+
e.stream.EXPECT().Send(gomock.Any()).Return(nil).MinTimes(0)
486+
487+
e.Start()
488+
489+
activePartitionID := e.partitionSession.PartitionID
490+
491+
e.SendFromServer(&rawtopicreader.ReadResponse{
492+
ServerMessageMetadata: rawtopiccommon.ServerMessageMetadata{Status: rawydb.StatusSuccess},
493+
PartitionData: []rawtopicreader.PartitionData{
494+
{
495+
PartitionSessionID: e.partitionSessionID,
496+
Batches: []rawtopicreader.Batch{{
497+
Codec: rawtopiccommon.CodecRaw,
498+
MessageData: []rawtopicreader.MessageData{{Offset: 1, Data: []byte("OK")}},
499+
}},
500+
},
501+
},
502+
})
503+
504+
e.SendFromServer(&rawtopicreader.EndPartitionSession{
505+
ServerMessageMetadata: rawtopiccommon.ServerMessageMetadata{Status: rawydb.StatusSuccess},
506+
PartitionSessionID: e.partitionSessionID,
507+
ChildPartitionIDs: []int64{activePartitionID + 1, activePartitionID + 2},
508+
})
509+
510+
e.SendFromServer(&rawtopicreader.StartPartitionSessionRequest{
511+
ServerMessageMetadata: rawtopiccommon.ServerMessageMetadata{Status: rawydb.StatusSuccess},
512+
PartitionSession: rawtopicreader.PartitionSession{
513+
PartitionSessionID: e.partitionSessionID + 1,
514+
Path: e.partitionSession.Topic,
515+
PartitionID: activePartitionID + 1,
516+
},
517+
})
518+
519+
e.SendFromServer(&rawtopicreader.StartPartitionSessionRequest{
520+
ServerMessageMetadata: rawtopiccommon.ServerMessageMetadata{Status: rawydb.StatusSuccess},
521+
PartitionSession: rawtopicreader.PartitionSession{
522+
PartitionSessionID: e.partitionSessionID + 2,
523+
Path: e.partitionSession.Topic,
524+
PartitionID: activePartitionID + 2,
525+
},
526+
})
527+
528+
e.SendFromServer(&rawtopicreader.ReadResponse{
529+
ServerMessageMetadata: rawtopiccommon.ServerMessageMetadata{Status: rawydb.StatusSuccess},
530+
PartitionData: []rawtopicreader.PartitionData{
531+
{
532+
PartitionSessionID: e.partitionSessionID + 1,
533+
Batches: []rawtopicreader.Batch{{
534+
Codec: rawtopiccommon.CodecRaw,
535+
MessageData: []rawtopicreader.MessageData{{Offset: 1, Data: []byte("BAD-1")}},
536+
}},
537+
},
538+
},
539+
})
540+
541+
allMessagesSent := make(empty.Chan)
542+
e.SendFromServerAndSetNextCallback(&rawtopicreader.ReadResponse{
543+
ServerMessageMetadata: rawtopiccommon.ServerMessageMetadata{Status: rawydb.StatusSuccess},
544+
PartitionData: []rawtopicreader.PartitionData{
545+
{
546+
PartitionSessionID: e.partitionSessionID + 2,
547+
Batches: []rawtopicreader.Batch{{
548+
Codec: rawtopiccommon.CodecRaw,
549+
MessageData: []rawtopicreader.MessageData{{Offset: 1, Data: []byte("BAD-2")}},
550+
}},
551+
},
552+
},
553+
}, func() {
554+
close(allMessagesSent)
555+
})
556+
557+
<-allMessagesSent
558+
batch, err := e.reader.ReadMessageBatch(e.ctx, newReadMessageBatchOptions())
559+
require.NoError(t, err)
560+
require.Same(t, e.partitionSession, topicreadercommon.BatchGetPartitionSession(batch))
561+
})
562+
})
563+
t.Run("Merge", func(t *testing.T) {
564+
xtest.TestManyTimes(t, func(t testing.TB) {
565+
566+
e := newTopicReaderTestEnv(t)
567+
568+
// doesn't check sends
569+
e.stream.EXPECT().Send(gomock.Any()).Return(nil).MinTimes(0)
570+
571+
e.Start()
572+
573+
activePartitionID := e.partitionSession.PartitionID
574+
575+
e.SendFromServer(&rawtopicreader.ReadResponse{
576+
ServerMessageMetadata: rawtopiccommon.ServerMessageMetadata{Status: rawydb.StatusSuccess},
577+
PartitionData: []rawtopicreader.PartitionData{
578+
{
579+
PartitionSessionID: e.partitionSessionID,
580+
Batches: []rawtopicreader.Batch{{
581+
Codec: rawtopiccommon.CodecRaw,
582+
MessageData: []rawtopicreader.MessageData{{Offset: 1, Data: []byte("OK")}},
583+
}},
584+
},
585+
},
586+
})
587+
588+
e.SendFromServer(&rawtopicreader.EndPartitionSession{
589+
ServerMessageMetadata: rawtopiccommon.ServerMessageMetadata{Status: rawydb.StatusSuccess},
590+
PartitionSessionID: e.partitionSessionID,
591+
ChildPartitionIDs: []int64{activePartitionID + 1, activePartitionID + 2},
592+
})
593+
594+
e.SendFromServer(&rawtopicreader.StartPartitionSessionRequest{
595+
ServerMessageMetadata: rawtopiccommon.ServerMessageMetadata{Status: rawydb.StatusSuccess},
596+
PartitionSession: rawtopicreader.PartitionSession{
597+
PartitionSessionID: e.partitionSessionID + 1,
598+
Path: e.partitionSession.Topic,
599+
PartitionID: activePartitionID + 1,
600+
},
601+
})
602+
603+
e.SendFromServer(&rawtopicreader.StartPartitionSessionRequest{
604+
ServerMessageMetadata: rawtopiccommon.ServerMessageMetadata{Status: rawydb.StatusSuccess},
605+
PartitionSession: rawtopicreader.PartitionSession{
606+
PartitionSessionID: e.partitionSessionID + 2,
607+
Path: e.partitionSession.Topic,
608+
PartitionID: activePartitionID + 2,
609+
},
610+
})
611+
612+
e.SendFromServer(&rawtopicreader.ReadResponse{
613+
ServerMessageMetadata: rawtopiccommon.ServerMessageMetadata{Status: rawydb.StatusSuccess},
614+
PartitionData: []rawtopicreader.PartitionData{
615+
{
616+
PartitionSessionID: e.partitionSessionID + 1,
617+
Batches: []rawtopicreader.Batch{{
618+
Codec: rawtopiccommon.CodecRaw,
619+
MessageData: []rawtopicreader.MessageData{{Offset: 1, Data: []byte("BAD-1")}},
620+
}},
621+
},
622+
},
623+
})
624+
625+
allMessagesSent := make(empty.Chan)
626+
e.SendFromServerAndSetNextCallback(&rawtopicreader.ReadResponse{
627+
ServerMessageMetadata: rawtopiccommon.ServerMessageMetadata{Status: rawydb.StatusSuccess},
628+
PartitionData: []rawtopicreader.PartitionData{
629+
{
630+
PartitionSessionID: e.partitionSessionID + 2,
631+
Batches: []rawtopicreader.Batch{{
632+
Codec: rawtopiccommon.CodecRaw,
633+
MessageData: []rawtopicreader.MessageData{{Offset: 1, Data: []byte("BAD-2")}},
634+
}},
635+
},
636+
},
637+
}, func() {
638+
close(allMessagesSent)
639+
})
640+
641+
<-allMessagesSent
642+
batch, err := e.reader.ReadMessageBatch(e.ctx, newReadMessageBatchOptions())
643+
require.NoError(t, err)
644+
require.Same(t, e.partitionSession, topicreadercommon.BatchGetPartitionSession(batch))
645+
})
646+
})
647+
}
648+
478649
func TestTopicStreamReaderImpl_ReadMessages(t *testing.T) {
479650
t.Run("BufferSize", func(t *testing.T) {
480651
waitChangeRestBufferSizeBytes := func(r *topicStreamReaderImpl, old int64) {

0 commit comments

Comments
 (0)