Skip to content

Commit 995c43e

Browse files
committed
compile
1 parent 5efc4b5 commit 995c43e

File tree

4 files changed

+205
-2
lines changed

4 files changed

+205
-2
lines changed

internal/topic/topiclistenerinternal/stream_listener.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ type streamListener struct {
3737
syncCommitter *topicreadercommon.Committer
3838

3939
closing atomic.Bool
40+
tracer *trace.Topic
4041

4142
m xsync.Mutex
4243
messagesToSend []rawtopicreader.ClientMessage
@@ -54,6 +55,7 @@ func newStreamListener(
5455
handler: eventListener,
5556
background: *background.NewWorker(xcontext.ValueOnly(connectionCtx), "topic reader stream listener"),
5657
sessionIDCounter: sessionIDCounter,
58+
tracer: &trace.Topic{}, // TODO: add read tracer
5759
}
5860

5961
res.initVars(sessionIDCounter)
@@ -161,7 +163,7 @@ func (l *streamListener) initStream(ctx context.Context, client TopicClient) err
161163
}
162164
}()
163165

164-
stream, err := client.StreamRead(streamCtx)
166+
stream, err := client.StreamRead(streamCtx, -1, l.tracer)
165167
if err != nil {
166168
return xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf(
167169
"ydb: topic listener failed connect to a stream: %w",

internal/topic/topiclistenerinternal/topic_client.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@ import (
44
"context"
55

66
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicreader"
7+
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
78
)
89

910
type TopicClient interface {
10-
StreamRead(connectionCtx context.Context) (rawtopicreader.StreamReader, error)
11+
StreamRead(ctxStreamLifeTime context.Context, readerID int64, tracer *trace.Topic) (rawtopicreader.StreamReader, error)
1112
}

internal/topic/topicreaderinternal/reader_test.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,80 @@ func TestReader_WaitInit(t *testing.T) {
190190
require.NoError(t, err)
191191
}
192192

193+
func TestTopicSplitPartitions(t *testing.T) {
194+
t.Run("SplitPartition", func(t *testing.T) {
195+
mc := gomock.NewController(t)
196+
defer mc.Finish()
197+
198+
readerID := topicreadercommon.NextReaderID()
199+
baseReader := NewMockbatchedStreamReader(mc)
200+
reader := &Reader{
201+
reader: baseReader,
202+
readerID: readerID,
203+
}
204+
205+
// Create initial partition session
206+
initialSession := newTestPartitionSessionReaderID(readerID, 1)
207+
baseReader.EXPECT().WaitInit(gomock.Any()).Return(nil)
208+
209+
// Expect partition split notification
210+
splitSession1 := newTestPartitionSessionReaderID(readerID, 2)
211+
splitSession2 := newTestPartitionSessionReaderID(readerID, 3)
212+
baseReader.EXPECT().OnSplitPartition(gomock.Any(), initialSession, splitSession1, splitSession2).Return(nil)
213+
214+
// Test partition split handling
215+
err := reader.OnSplitPartition(context.Background(), initialSession, splitSession1, splitSession2)
216+
require.NoError(t, err)
217+
})
218+
219+
t.Run("SplitPartitionError", func(t *testing.T) {
220+
mc := gomock.NewController(t)
221+
defer mc.Finish()
222+
223+
readerID := topicreadercommon.NextReaderID()
224+
baseReader := NewMockbatchedStreamReader(mc)
225+
reader := &Reader{
226+
reader: baseReader,
227+
readerID: readerID,
228+
}
229+
230+
// Create initial partition session
231+
initialSession := newTestPartitionSessionReaderID(readerID, 1)
232+
baseReader.EXPECT().WaitInit(gomock.Any()).Return(nil)
233+
234+
// Expect partition split notification with error
235+
splitSession1 := newTestPartitionSessionReaderID(readerID, 2)
236+
splitSession2 := newTestPartitionSessionReaderID(readerID, 3)
237+
testErr := errors.New("test error")
238+
baseReader.EXPECT().OnSplitPartition(gomock.Any(), initialSession, splitSession1, splitSession2).Return(testErr)
239+
240+
// Test partition split handling with error
241+
err := reader.OnSplitPartition(context.Background(), initialSession, splitSession1, splitSession2)
242+
require.ErrorIs(t, err, testErr)
243+
})
244+
245+
t.Run("SplitPartitionFromOtherReader", func(t *testing.T) {
246+
mc := gomock.NewController(t)
247+
defer mc.Finish()
248+
249+
readerID := topicreadercommon.NextReaderID()
250+
baseReader := NewMockbatchedStreamReader(mc)
251+
reader := &Reader{
252+
reader: baseReader,
253+
readerID: readerID,
254+
}
255+
256+
// Create initial partition session from different reader
257+
initialSession := newTestPartitionSessionReaderID(readerID+1, 1)
258+
splitSession1 := newTestPartitionSessionReaderID(readerID+1, 2)
259+
splitSession2 := newTestPartitionSessionReaderID(readerID+1, 3)
260+
261+
// Test partition split handling from other reader
262+
err := reader.OnSplitPartition(context.Background(), initialSession, splitSession1, splitSession2)
263+
require.ErrorIs(t, err, errCommitSessionFromOtherReader)
264+
})
265+
}
266+
193267
func newTestPartitionSessionReaderID(
194268
readerID int64,
195269
partitionSessionID rawtopicreader.PartitionSessionID,

internal/topic/topicreaderinternal/stream_reader_impl_test.go

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1159,6 +1159,132 @@ func TestTopicStreamReadImpl_CommitWithBadSession(t *testing.T) {
11591159
})
11601160
}
11611161

1162+
func TestTopicSplitPartitions(t *testing.T) {
1163+
t.Run("SplitPartition", func(t *testing.T) {
1164+
e := newTopicReaderTestEnv(t)
1165+
e.Start()
1166+
1167+
// Create initial partition session
1168+
initialSession := newTestPartitionSessionReaderID(e.reader.readerID, e.partitionSessionID)
1169+
require.NoError(t, e.reader.sessionController.Add(initialSession))
1170+
1171+
// Create split sessions
1172+
splitSession1 := newTestPartitionSessionReaderID(e.reader.readerID, e.partitionSessionID+1)
1173+
splitSession2 := newTestPartitionSessionReaderID(e.reader.readerID, e.partitionSessionID+2)
1174+
1175+
// Send split partition request
1176+
splitRequestReceived := make(empty.Chan)
1177+
e.stream.EXPECT().Send(&rawtopicreader.SplitPartitionSessionRequest{
1178+
PartitionSessionID: e.partitionSessionID,
1179+
}).DoAndReturn(func(_ rawtopicreader.ClientMessage) error {
1180+
close(splitRequestReceived)
1181+
return nil
1182+
})
1183+
1184+
// Send split response
1185+
e.SendFromServer(&rawtopicreader.SplitPartitionSessionResponse{
1186+
PartitionSessionID: e.partitionSessionID,
1187+
SplitSession1: e.partitionSessionID + 1,
1188+
SplitSession2: e.partitionSessionID + 2,
1189+
})
1190+
1191+
// Wait for split request to be sent
1192+
xtest.WaitChannelClosed(t, splitRequestReceived)
1193+
1194+
// Verify split sessions are added to storage
1195+
session1, err := e.reader.sessionController.Get(e.partitionSessionID + 1)
1196+
require.NoError(t, err)
1197+
require.Equal(t, splitSession1, session1)
1198+
1199+
session2, err := e.reader.sessionController.Get(e.partitionSessionID + 2)
1200+
require.NoError(t, err)
1201+
require.Equal(t, splitSession2, session2)
1202+
1203+
// Verify initial session is removed
1204+
_, err = e.reader.sessionController.Get(e.partitionSessionID)
1205+
require.Error(t, err)
1206+
})
1207+
1208+
t.Run("SplitPartitionError", func(t *testing.T) {
1209+
e := newTopicReaderTestEnv(t)
1210+
e.Start()
1211+
1212+
// Create initial partition session
1213+
initialSession := newTestPartitionSessionReaderID(e.reader.readerID, e.partitionSessionID)
1214+
require.NoError(t, e.reader.sessionController.Add(initialSession))
1215+
1216+
// Send split partition request with error
1217+
splitRequestReceived := make(empty.Chan)
1218+
testErr := errors.New("test error")
1219+
e.stream.EXPECT().Send(&rawtopicreader.SplitPartitionSessionRequest{
1220+
PartitionSessionID: e.partitionSessionID,
1221+
}).DoAndReturn(func(_ rawtopicreader.ClientMessage) error {
1222+
close(splitRequestReceived)
1223+
return testErr
1224+
})
1225+
1226+
// Send split response
1227+
e.SendFromServer(&rawtopicreader.SplitPartitionSessionResponse{
1228+
PartitionSessionID: e.partitionSessionID,
1229+
SplitSession1: e.partitionSessionID + 1,
1230+
SplitSession2: e.partitionSessionID + 2,
1231+
})
1232+
1233+
// Wait for split request to be sent
1234+
xtest.WaitChannelClosed(t, splitRequestReceived)
1235+
1236+
// Verify initial session is still present
1237+
session, err := e.reader.sessionController.Get(e.partitionSessionID)
1238+
require.NoError(t, err)
1239+
require.Equal(t, initialSession, session)
1240+
1241+
// Verify split sessions are not added
1242+
_, err = e.reader.sessionController.Get(e.partitionSessionID + 1)
1243+
require.Error(t, err)
1244+
_, err = e.reader.sessionController.Get(e.partitionSessionID + 2)
1245+
require.Error(t, err)
1246+
})
1247+
1248+
t.Run("SplitPartitionFromOtherReader", func(t *testing.T) {
1249+
e := newTopicReaderTestEnv(t)
1250+
e.Start()
1251+
1252+
// Create initial partition session from different reader
1253+
initialSession := newTestPartitionSessionReaderID(e.reader.readerID+1, e.partitionSessionID)
1254+
require.NoError(t, e.reader.sessionController.Add(initialSession))
1255+
1256+
// Send split partition request
1257+
splitRequestReceived := make(empty.Chan)
1258+
e.stream.EXPECT().Send(&rawtopicreader.SplitPartitionSessionRequest{
1259+
PartitionSessionID: e.partitionSessionID,
1260+
}).DoAndReturn(func(_ rawtopicreader.ClientMessage) error {
1261+
close(splitRequestReceived)
1262+
return nil
1263+
})
1264+
1265+
// Send split response
1266+
e.SendFromServer(&rawtopicreader.SplitPartitionSessionResponse{
1267+
PartitionSessionID: e.partitionSessionID,
1268+
SplitSession1: e.partitionSessionID + 1,
1269+
SplitSession2: e.partitionSessionID + 2,
1270+
})
1271+
1272+
// Wait for split request to be sent
1273+
xtest.WaitChannelClosed(t, splitRequestReceived)
1274+
1275+
// Verify initial session is still present
1276+
session, err := e.reader.sessionController.Get(e.partitionSessionID)
1277+
require.NoError(t, err)
1278+
require.Equal(t, initialSession, session)
1279+
1280+
// Verify split sessions are not added
1281+
_, err = e.reader.sessionController.Get(e.partitionSessionID + 1)
1282+
require.Error(t, err)
1283+
_, err = e.reader.sessionController.Get(e.partitionSessionID + 2)
1284+
require.Error(t, err)
1285+
})
1286+
}
1287+
11621288
type streamEnv struct {
11631289
TopicClient *MockTopicClient
11641290
ctx context.Context //nolint:containedctx

0 commit comments

Comments
 (0)