Skip to content

Commit e31a5d0

Browse files
committed
add split-merge support
1 parent 995c43e commit e31a5d0

File tree

10 files changed

+91
-219
lines changed

10 files changed

+91
-219
lines changed

internal/grpcwrapper/rawtopic/client.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,11 @@ func (c *Client) DropTopic(
8888
return res, err
8989
}
9090

91-
func (c *Client) StreamRead(ctxStreamLifeTime context.Context, readerID int64, tracer *trace.Topic) (rawtopicreader.StreamReader, error) {
91+
func (c *Client) StreamRead(
92+
ctxStreamLifeTime context.Context,
93+
readerID int64,
94+
tracer *trace.Topic,
95+
) (rawtopicreader.StreamReader, error) {
9296
protoResp, err := c.service.StreamRead(ctxStreamLifeTime)
9397
if err != nil {
9498
return rawtopicreader.StreamReader{}, xerrors.WithStackTrace(

internal/grpcwrapper/rawtopic/controlplane_types.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -112,10 +112,10 @@ func (s *AutoPartitioningSettings) FromProto(proto *Ydb_Topic.AutoPartitioningSe
112112
type AutoPartitioningStrategy int32
113113

114114
const (
115-
AutoPartitioningStrategyUnspecified = AutoPartitioningStrategy(Ydb_Topic.AutoPartitioningStrategy_AUTO_PARTITIONING_STRATEGY_UNSPECIFIED)
116-
AutoPartitioningStrategyDisabled = AutoPartitioningStrategy(Ydb_Topic.AutoPartitioningStrategy_AUTO_PARTITIONING_STRATEGY_DISABLED)
117-
AutoPartitioningStrategyScaleUpAndDown = AutoPartitioningStrategy(Ydb_Topic.AutoPartitioningStrategy_AUTO_PARTITIONING_STRATEGY_SCALE_UP_AND_DOWN)
118-
AutoPartitioningStrategyPaused = AutoPartitioningStrategy(Ydb_Topic.AutoPartitioningStrategy_AUTO_PARTITIONING_STRATEGY_PAUSED)
115+
AutoPartitioningStrategyUnspecified = AutoPartitioningStrategy(Ydb_Topic.AutoPartitioningStrategy_AUTO_PARTITIONING_STRATEGY_UNSPECIFIED) //nolint:lll
116+
AutoPartitioningStrategyDisabled = AutoPartitioningStrategy(Ydb_Topic.AutoPartitioningStrategy_AUTO_PARTITIONING_STRATEGY_DISABLED) //nolint:lll
117+
AutoPartitioningStrategyScaleUpAndDown = AutoPartitioningStrategy(Ydb_Topic.AutoPartitioningStrategy_AUTO_PARTITIONING_STRATEGY_SCALE_UP_AND_DOWN) //nolint:lll
118+
AutoPartitioningStrategyPaused = AutoPartitioningStrategy(Ydb_Topic.AutoPartitioningStrategy_AUTO_PARTITIONING_STRATEGY_PAUSED) //nolint:lll
119119
)
120120

121121
func (s AutoPartitioningStrategy) ToProto() Ydb_Topic.AutoPartitioningStrategy {
@@ -141,9 +141,9 @@ func (s *AutoPartitioningWriteSpeedStrategy) FromProto(speed *Ydb_Topic.AutoPart
141141
return xerrors.WithStackTrace(errUnexpectedNilAutoPartitionWriteSpeed)
142142
}
143143

144-
s.StabilizationWindow.MustFromProto(speed.StabilizationWindow)
145-
s.UpUtilizationPercent = speed.UpUtilizationPercent
146-
s.DownUtilizationPercent = speed.DownUtilizationPercent
144+
s.StabilizationWindow.MustFromProto(speed.GetStabilizationWindow())
145+
s.UpUtilizationPercent = speed.GetUpUtilizationPercent()
146+
s.DownUtilizationPercent = speed.GetDownUtilizationPercent()
147147

148148
return nil
149149
}

internal/grpcwrapper/rawtopic/rawtopicreader/rawtopicreader.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package rawtopicreader
33
import (
44
"errors"
55
"fmt"
6-
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
76
"io"
87
"reflect"
98

@@ -12,6 +11,7 @@ import (
1211
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon"
1312
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawydb"
1413
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
14+
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
1515
)
1616

1717
var ErrUnexpectedMessageType = errors.New("unexpected message type")
@@ -112,6 +112,7 @@ func (s StreamReader) Recv() (_ ServerMessage, resErr error) {
112112
if err = req.fromProto(m.EndPartitionSession); err != nil {
113113
return nil, err
114114
}
115+
115116
return req, nil
116117

117118
case *Ydb_Topic.StreamReadMessage_FromServer_CommitOffsetResponse:
@@ -145,6 +146,7 @@ func (s StreamReader) Recv() (_ ServerMessage, resErr error) {
145146
}
146147
}
147148

149+
//nolint:funlen
148150
func (s StreamReader) Send(msg ClientMessage) (resErr error) {
149151
defer func() {
150152
resErr = xerrors.Transport(resErr)

internal/topic/topicclientinternal/client.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,11 @@ func (c *Client) StartReader(
300300
readSelectors topicoptions.ReadSelectors,
301301
opts ...topicoptions.ReaderOption,
302302
) (*topicreader.Reader, error) {
303-
var connector topicreaderinternal.TopicSteamReaderConnect = func(ctx context.Context, readerID int64, tracer *trace.Topic) (
303+
var connector topicreaderinternal.TopicSteamReaderConnect = func(
304+
ctx context.Context,
305+
readerID int64,
306+
tracer *trace.Topic,
307+
) (
304308
topicreadercommon.RawTopicReaderStream, error,
305309
) {
306310
return c.rawClient.StreamRead(ctx, readerID, tracer)

internal/topic/topiclistenerinternal/stream_listener.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,9 @@ func newStreamListener(
5555
handler: eventListener,
5656
background: *background.NewWorker(xcontext.ValueOnly(connectionCtx), "topic reader stream listener"),
5757
sessionIDCounter: sessionIDCounter,
58-
tracer: &trace.Topic{}, // TODO: add read tracer
58+
59+
//nolint:godox
60+
tracer: &trace.Topic{}, // TODO: add read tracer
5961
}
6062

6163
res.initVars(sessionIDCounter)

internal/topic/topicreaderinternal/batcher.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@ import (
44
"context"
55
"errors"
66
"fmt"
7-
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xslices"
87
"sync/atomic"
98

109
"github.com/ydb-platform/ydb-go-sdk/v3/internal/empty"
1110
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicreader"
1211
"github.com/ydb-platform/ydb-go-sdk/v3/internal/topic/topicreadercommon"
1312
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
13+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xslices"
1414
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
1515
)
1616

internal/topic/topicreaderinternal/reader_test.go

Lines changed: 0 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -190,80 +190,6 @@ func TestReader_WaitInit(t *testing.T) {
190190
require.NoError(t, err)
191191
}
192192

193-
func TestTopicSplitPartitions(t *testing.T) {
194-
t.Run("SplitPartition", func(t *testing.T) {
195-
mc := gomock.NewController(t)
196-
defer mc.Finish()
197-
198-
readerID := topicreadercommon.NextReaderID()
199-
baseReader := NewMockbatchedStreamReader(mc)
200-
reader := &Reader{
201-
reader: baseReader,
202-
readerID: readerID,
203-
}
204-
205-
// Create initial partition session
206-
initialSession := newTestPartitionSessionReaderID(readerID, 1)
207-
baseReader.EXPECT().WaitInit(gomock.Any()).Return(nil)
208-
209-
// Expect partition split notification
210-
splitSession1 := newTestPartitionSessionReaderID(readerID, 2)
211-
splitSession2 := newTestPartitionSessionReaderID(readerID, 3)
212-
baseReader.EXPECT().OnSplitPartition(gomock.Any(), initialSession, splitSession1, splitSession2).Return(nil)
213-
214-
// Test partition split handling
215-
err := reader.OnSplitPartition(context.Background(), initialSession, splitSession1, splitSession2)
216-
require.NoError(t, err)
217-
})
218-
219-
t.Run("SplitPartitionError", func(t *testing.T) {
220-
mc := gomock.NewController(t)
221-
defer mc.Finish()
222-
223-
readerID := topicreadercommon.NextReaderID()
224-
baseReader := NewMockbatchedStreamReader(mc)
225-
reader := &Reader{
226-
reader: baseReader,
227-
readerID: readerID,
228-
}
229-
230-
// Create initial partition session
231-
initialSession := newTestPartitionSessionReaderID(readerID, 1)
232-
baseReader.EXPECT().WaitInit(gomock.Any()).Return(nil)
233-
234-
// Expect partition split notification with error
235-
splitSession1 := newTestPartitionSessionReaderID(readerID, 2)
236-
splitSession2 := newTestPartitionSessionReaderID(readerID, 3)
237-
testErr := errors.New("test error")
238-
baseReader.EXPECT().OnSplitPartition(gomock.Any(), initialSession, splitSession1, splitSession2).Return(testErr)
239-
240-
// Test partition split handling with error
241-
err := reader.OnSplitPartition(context.Background(), initialSession, splitSession1, splitSession2)
242-
require.ErrorIs(t, err, testErr)
243-
})
244-
245-
t.Run("SplitPartitionFromOtherReader", func(t *testing.T) {
246-
mc := gomock.NewController(t)
247-
defer mc.Finish()
248-
249-
readerID := topicreadercommon.NextReaderID()
250-
baseReader := NewMockbatchedStreamReader(mc)
251-
reader := &Reader{
252-
reader: baseReader,
253-
readerID: readerID,
254-
}
255-
256-
// Create initial partition session from different reader
257-
initialSession := newTestPartitionSessionReaderID(readerID+1, 1)
258-
splitSession1 := newTestPartitionSessionReaderID(readerID+1, 2)
259-
splitSession2 := newTestPartitionSessionReaderID(readerID+1, 3)
260-
261-
// Test partition split handling from other reader
262-
err := reader.OnSplitPartition(context.Background(), initialSession, splitSession1, splitSession2)
263-
require.ErrorIs(t, err, errCommitSessionFromOtherReader)
264-
})
265-
}
266-
267193
func newTestPartitionSessionReaderID(
268194
readerID int64,
269195
partitionSessionID rawtopicreader.PartitionSessionID,

internal/topic/topicreaderinternal/stream_reader_impl.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -984,6 +984,8 @@ func (r *topicStreamReaderImpl) onStopPartitionSessionRequest(m *rawtopicreader.
984984
}
985985

986986
func (r *topicStreamReaderImpl) onEndPartitionSession(m *rawtopicreader.EndPartitionSession) error {
987+
// need err value in else block
988+
//nolint:revive
987989
if session, err := r.sessionController.Get(m.PartitionSessionID); err == nil {
988990
trace.TopicOnReaderEndPartitionSession(
989991
r.cfg.Trace,
@@ -1000,6 +1002,8 @@ func (r *topicStreamReaderImpl) onEndPartitionSession(m *rawtopicreader.EndParti
10001002

10011003
return nil
10021004
} else {
1003-
return xerrors.Retryable(xerrors.Wrap(fmt.Errorf("ydb: unknown partition for end partition session: %w", err)))
1005+
return xerrors.Retryable(xerrors.Wrap(fmt.Errorf(
1006+
"ydb: unknown partition for end partition session: %w", err,
1007+
)))
10041008
}
10051009
}

internal/topic/topicreaderinternal/stream_reader_impl_test.go

Lines changed: 0 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -478,7 +478,6 @@ func TestStreamReaderImpl_OnPartitionCloseHandle(t *testing.T) {
478478
func TestStreamReaderImpl_TestEndSessionForSplitMergePartitions(t *testing.T) {
479479
t.Run("Split", func(t *testing.T) {
480480
xtest.TestManyTimes(t, func(t testing.TB) {
481-
482481
e := newTopicReaderTestEnv(t)
483482

484483
// doesn't check sends
@@ -562,7 +561,6 @@ func TestStreamReaderImpl_TestEndSessionForSplitMergePartitions(t *testing.T) {
562561
})
563562
t.Run("Merge", func(t *testing.T) {
564563
xtest.TestManyTimes(t, func(t testing.TB) {
565-
566564
e := newTopicReaderTestEnv(t)
567565

568566
// doesn't check sends
@@ -1159,132 +1157,6 @@ func TestTopicStreamReadImpl_CommitWithBadSession(t *testing.T) {
11591157
})
11601158
}
11611159

1162-
func TestTopicSplitPartitions(t *testing.T) {
1163-
t.Run("SplitPartition", func(t *testing.T) {
1164-
e := newTopicReaderTestEnv(t)
1165-
e.Start()
1166-
1167-
// Create initial partition session
1168-
initialSession := newTestPartitionSessionReaderID(e.reader.readerID, e.partitionSessionID)
1169-
require.NoError(t, e.reader.sessionController.Add(initialSession))
1170-
1171-
// Create split sessions
1172-
splitSession1 := newTestPartitionSessionReaderID(e.reader.readerID, e.partitionSessionID+1)
1173-
splitSession2 := newTestPartitionSessionReaderID(e.reader.readerID, e.partitionSessionID+2)
1174-
1175-
// Send split partition request
1176-
splitRequestReceived := make(empty.Chan)
1177-
e.stream.EXPECT().Send(&rawtopicreader.SplitPartitionSessionRequest{
1178-
PartitionSessionID: e.partitionSessionID,
1179-
}).DoAndReturn(func(_ rawtopicreader.ClientMessage) error {
1180-
close(splitRequestReceived)
1181-
return nil
1182-
})
1183-
1184-
// Send split response
1185-
e.SendFromServer(&rawtopicreader.SplitPartitionSessionResponse{
1186-
PartitionSessionID: e.partitionSessionID,
1187-
SplitSession1: e.partitionSessionID + 1,
1188-
SplitSession2: e.partitionSessionID + 2,
1189-
})
1190-
1191-
// Wait for split request to be sent
1192-
xtest.WaitChannelClosed(t, splitRequestReceived)
1193-
1194-
// Verify split sessions are added to storage
1195-
session1, err := e.reader.sessionController.Get(e.partitionSessionID + 1)
1196-
require.NoError(t, err)
1197-
require.Equal(t, splitSession1, session1)
1198-
1199-
session2, err := e.reader.sessionController.Get(e.partitionSessionID + 2)
1200-
require.NoError(t, err)
1201-
require.Equal(t, splitSession2, session2)
1202-
1203-
// Verify initial session is removed
1204-
_, err = e.reader.sessionController.Get(e.partitionSessionID)
1205-
require.Error(t, err)
1206-
})
1207-
1208-
t.Run("SplitPartitionError", func(t *testing.T) {
1209-
e := newTopicReaderTestEnv(t)
1210-
e.Start()
1211-
1212-
// Create initial partition session
1213-
initialSession := newTestPartitionSessionReaderID(e.reader.readerID, e.partitionSessionID)
1214-
require.NoError(t, e.reader.sessionController.Add(initialSession))
1215-
1216-
// Send split partition request with error
1217-
splitRequestReceived := make(empty.Chan)
1218-
testErr := errors.New("test error")
1219-
e.stream.EXPECT().Send(&rawtopicreader.SplitPartitionSessionRequest{
1220-
PartitionSessionID: e.partitionSessionID,
1221-
}).DoAndReturn(func(_ rawtopicreader.ClientMessage) error {
1222-
close(splitRequestReceived)
1223-
return testErr
1224-
})
1225-
1226-
// Send split response
1227-
e.SendFromServer(&rawtopicreader.SplitPartitionSessionResponse{
1228-
PartitionSessionID: e.partitionSessionID,
1229-
SplitSession1: e.partitionSessionID + 1,
1230-
SplitSession2: e.partitionSessionID + 2,
1231-
})
1232-
1233-
// Wait for split request to be sent
1234-
xtest.WaitChannelClosed(t, splitRequestReceived)
1235-
1236-
// Verify initial session is still present
1237-
session, err := e.reader.sessionController.Get(e.partitionSessionID)
1238-
require.NoError(t, err)
1239-
require.Equal(t, initialSession, session)
1240-
1241-
// Verify split sessions are not added
1242-
_, err = e.reader.sessionController.Get(e.partitionSessionID + 1)
1243-
require.Error(t, err)
1244-
_, err = e.reader.sessionController.Get(e.partitionSessionID + 2)
1245-
require.Error(t, err)
1246-
})
1247-
1248-
t.Run("SplitPartitionFromOtherReader", func(t *testing.T) {
1249-
e := newTopicReaderTestEnv(t)
1250-
e.Start()
1251-
1252-
// Create initial partition session from different reader
1253-
initialSession := newTestPartitionSessionReaderID(e.reader.readerID+1, e.partitionSessionID)
1254-
require.NoError(t, e.reader.sessionController.Add(initialSession))
1255-
1256-
// Send split partition request
1257-
splitRequestReceived := make(empty.Chan)
1258-
e.stream.EXPECT().Send(&rawtopicreader.SplitPartitionSessionRequest{
1259-
PartitionSessionID: e.partitionSessionID,
1260-
}).DoAndReturn(func(_ rawtopicreader.ClientMessage) error {
1261-
close(splitRequestReceived)
1262-
return nil
1263-
})
1264-
1265-
// Send split response
1266-
e.SendFromServer(&rawtopicreader.SplitPartitionSessionResponse{
1267-
PartitionSessionID: e.partitionSessionID,
1268-
SplitSession1: e.partitionSessionID + 1,
1269-
SplitSession2: e.partitionSessionID + 2,
1270-
})
1271-
1272-
// Wait for split request to be sent
1273-
xtest.WaitChannelClosed(t, splitRequestReceived)
1274-
1275-
// Verify initial session is still present
1276-
session, err := e.reader.sessionController.Get(e.partitionSessionID)
1277-
require.NoError(t, err)
1278-
require.Equal(t, initialSession, session)
1279-
1280-
// Verify split sessions are not added
1281-
_, err = e.reader.sessionController.Get(e.partitionSessionID + 1)
1282-
require.Error(t, err)
1283-
_, err = e.reader.sessionController.Get(e.partitionSessionID + 2)
1284-
require.Error(t, err)
1285-
})
1286-
}
1287-
12881160
type streamEnv struct {
12891161
TopicClient *MockTopicClient
12901162
ctx context.Context //nolint:containedctx

0 commit comments

Comments
 (0)