Skip to content

Commit e5c2843

Browse files
committed
typed mocks in topic
1 parent 6558c30 commit e5c2843

File tree

4 files changed

+50
-8
lines changed

4 files changed

+50
-8
lines changed

internal/topic/topicreaderinternal/reader_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,22 +27,26 @@ func TestReader_Close(t *testing.T) {
2727
baseReader.EXPECT().ReadMessageBatch(gomock.Any(), ReadMessageBatchOptions{}).
2828
DoAndReturn(func(ctx context.Context, options ReadMessageBatchOptions) (*topicreadercommon.PublicBatch, error) {
2929
<-readerContext.Done()
30+
3031
return nil, testErr
3132
})
3233
baseReader.EXPECT().ReadMessageBatch(
3334
gomock.Any(),
3435
ReadMessageBatchOptions{batcherGetOptions: batcherGetOptions{MaxCount: 1, MinCount: 1}},
3536
).DoAndReturn(func(ctx context.Context, options ReadMessageBatchOptions) (*topicreadercommon.PublicBatch, error) {
3637
<-readerContext.Done()
38+
3739
return nil, testErr
3840
})
3941
baseReader.EXPECT().Commit(gomock.Any(), gomock.Any()).DoAndReturn(
4042
func(ctx context.Context, commitRange topicreadercommon.CommitRange) error {
4143
<-readerContext.Done()
44+
4245
return testErr
4346
})
4447
baseReader.EXPECT().CloseWithError(gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, _ error) error {
4548
readerCancel()
49+
4650
return nil
4751
})
4852

internal/topic/topicreaderinternal/stream_reader_impl_test.go

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,11 @@ func TestTopicStreamReaderImpl_CommitStolen(t *testing.T) {
113113

114114
// request new data portion
115115
readRequestReceived := make(empty.Chan)
116-
e.stream.EXPECT().Send(&rawtopicreader.ReadRequest{BytesSize: dataSize * 2}).Do(func(_ rawtopicreader.ClientMessage) error {
116+
e.stream.EXPECT().Send(
117+
&rawtopicreader.ReadRequest{BytesSize: dataSize * 2},
118+
).DoAndReturn(func(_ rawtopicreader.ClientMessage) error {
117119
close(readRequestReceived)
120+
118121
return nil
119122
})
120123

@@ -136,6 +139,7 @@ func TestTopicStreamReaderImpl_CommitStolen(t *testing.T) {
136139
},
137140
).DoAndReturn(func(_ rawtopicreader.ClientMessage) error {
138141
close(commitReceived)
142+
139143
return nil
140144
})
141145

@@ -198,8 +202,11 @@ func TestTopicStreamReaderImpl_CommitStolen(t *testing.T) {
198202
const dataSize = 4
199203
// request new data portion
200204
readRequestReceived := make(empty.Chan)
201-
e.stream.EXPECT().Send(&rawtopicreader.ReadRequest{BytesSize: dataSize * 2}).DoAndReturn(func(_ rawtopicreader.ClientMessage) error {
205+
e.stream.EXPECT().Send(
206+
&rawtopicreader.ReadRequest{BytesSize: dataSize * 2},
207+
).DoAndReturn(func(_ rawtopicreader.ClientMessage) error {
202208
close(readRequestReceived)
209+
203210
return nil
204211
})
205212

@@ -271,13 +278,15 @@ func TestTopicStreamReaderImpl_CommitStolen(t *testing.T) {
271278
},
272279
}}).DoAndReturn(func(_ rawtopicreader.ClientMessage) error {
273280
close(commitReceived)
281+
274282
return nil
275283
})
276284

277285
stopPartitionResponseSent := make(empty.Chan)
278286
e.stream.EXPECT().Send(&rawtopicreader.StopPartitionSessionResponse{PartitionSessionID: e.partitionSessionID}).
279287
DoAndReturn(func(_ rawtopicreader.ClientMessage) error {
280288
close(stopPartitionResponseSent)
289+
281290
return nil
282291
})
283292

@@ -409,8 +418,9 @@ func TestStreamReaderImpl_OnPartitionCloseHandle(t *testing.T) {
409418
stopPartitionResponseSent := make(empty.Chan)
410419
e.stream.EXPECT().Send(&rawtopicreader.StopPartitionSessionResponse{
411420
PartitionSessionID: e.partitionSessionID,
412-
}).Return(nil).Do(func(_ rawtopicreader.ClientMessage) error {
421+
}).DoAndReturn(func(_ rawtopicreader.ClientMessage) error {
413422
close(stopPartitionResponseSent)
423+
414424
return nil
415425
})
416426

@@ -538,9 +548,12 @@ func TestTopicStreamReaderImpl_ReadMessages(t *testing.T) {
538548
e := newTopicReaderTestEnv(t)
539549

540550
dataRequested := make(empty.Chan)
541-
e.stream.EXPECT().Send(&rawtopicreader.ReadRequest{BytesSize: int(e.initialBufferSizeBytes)}).
542-
Do(func(_ rawtopicreader.ClientMessage) error {
551+
e.stream.EXPECT().Send(
552+
&rawtopicreader.ReadRequest{BytesSize: int(e.initialBufferSizeBytes)},
553+
).
554+
DoAndReturn(func(_ rawtopicreader.ClientMessage) error {
543555
close(dataRequested)
556+
544557
return nil
545558
})
546559

@@ -598,8 +611,11 @@ func TestTopicStreamReaderImpl_ReadMessages(t *testing.T) {
598611

599612
sendDataRequestCompleted := make(empty.Chan)
600613
dataSize := 6
601-
e.stream.EXPECT().Send(&rawtopicreader.ReadRequest{BytesSize: dataSize}).Do(func(_ rawtopicreader.ClientMessage) error {
614+
e.stream.EXPECT().Send(
615+
&rawtopicreader.ReadRequest{BytesSize: dataSize},
616+
).DoAndReturn(func(_ rawtopicreader.ClientMessage) error {
602617
close(sendDataRequestCompleted)
618+
603619
return nil
604620
})
605621
e.SendFromServer(&rawtopicreader.ReadResponse{
@@ -826,8 +842,11 @@ func TestTopicStreamReaderImpl_ReadMessages(t *testing.T) {
826842
func TestTopicStreamReadImpl_BatchReaderWantMoreMessagesThenBufferCanHold(t *testing.T) {
827843
sendMessageWithFullBuffer := func(e *streamEnv) empty.Chan {
828844
nextDataRequested := make(empty.Chan)
829-
e.stream.EXPECT().Send(&rawtopicreader.ReadRequest{BytesSize: int(e.initialBufferSizeBytes)}).Do(func(_ rawtopicreader.ClientMessage) error {
845+
e.stream.EXPECT().Send(
846+
&rawtopicreader.ReadRequest{BytesSize: int(e.initialBufferSizeBytes)},
847+
).DoAndReturn(func(_ rawtopicreader.ClientMessage) error {
830848
close(nextDataRequested)
849+
831850
return nil
832851
})
833852

@@ -1040,6 +1059,7 @@ func newTopicReaderTestEnv(t testing.TB) streamEnv {
10401059
streamClosed := make(empty.Chan)
10411060
stream.EXPECT().CloseSend().Return(nil).DoAndReturn(func() error {
10421061
close(streamClosed)
1062+
10431063
return nil
10441064
})
10451065

@@ -1072,6 +1092,7 @@ func (e *streamEnv) readerReceiveWaitClose(callback func()) {
10721092
callback()
10731093
}
10741094
<-e.ctx.Done()
1095+
10751096
return nil, errors.New("test reader closed")
10761097
})
10771098
}

internal/topic/topicreaderinternal/stream_reconnector_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ func TestTopicReaderReconnectorCommit(t *testing.T) {
174174
).DoAndReturn(func(ctx context.Context, offset topicreadercommon.CommitRange) error {
175175
require.Equal(t, "v", ctx.Value(k{}))
176176
require.Equal(t, expectedCommitRange, offset)
177+
177178
return nil
178179
})
179180

@@ -194,6 +195,7 @@ func TestTopicReaderReconnectorCommit(t *testing.T) {
194195
).DoAndReturn(func(ctx context.Context, offset topicreadercommon.CommitRange) error {
195196
require.Equal(t, "v", ctx.Value(k{}))
196197
require.Equal(t, expectedCommitRange, offset)
198+
197199
return testErr
198200
})
199201

@@ -319,6 +321,7 @@ func TestTopicReaderReconnectorStart(t *testing.T) {
319321
stream := NewMockbatchedStreamReader(mc)
320322
stream.EXPECT().CloseWithError(gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, err error) error {
321323
require.Error(t, err)
324+
322325
return nil
323326
})
324327

internal/topic/topicwriterinternal/writer_reconnector_test.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ func TestWriterImpl_Write(t *testing.T) {
139139
Codec: rawtopiccommon.CodecRaw,
140140
}).DoAndReturn(func(_ rawtopicwriter.ClientMessage) error {
141141
close(writeMessageReceived)
142+
142143
return nil
143144
})
144145

@@ -191,6 +192,7 @@ func TestWriterImpl_WriteCodecs(t *testing.T) {
191192
e.stream.EXPECT().Send(gomock.Any()).DoAndReturn(func(message rawtopicwriter.ClientMessage) error {
192193
writeReq := message.(*rawtopicwriter.WriteRequest)
193194
messReceived <- writeReq.Codec
195+
194196
return nil
195197
})
196198

@@ -221,6 +223,7 @@ func TestWriterImpl_WriteCodecs(t *testing.T) {
221223
e.stream.EXPECT().Send(gomock.Any()).DoAndReturn(func(message rawtopicwriter.ClientMessage) error {
222224
writeReq := message.(*rawtopicwriter.WriteRequest)
223225
messReceived <- writeReq.Codec
226+
224227
return nil
225228
})
226229

@@ -247,6 +250,7 @@ func TestWriterImpl_WriteCodecs(t *testing.T) {
247250
e.stream.EXPECT().Send(gomock.Any()).DoAndReturn(func(message rawtopicwriter.ClientMessage) error {
248251
writeReq := message.(*rawtopicwriter.WriteRequest)
249252
messReceived <- writeReq.Codec
253+
250254
return nil
251255
}).Times(codecMeasureIntervalBatches * 2)
252256

@@ -471,16 +475,19 @@ func TestWriterImpl_Reconnect(t *testing.T) {
471475
strm.EXPECT().CloseSend().DoAndReturn(func() error {
472476
t.Logf("closed stream: %v", name)
473477
close(streamClosed)
478+
474479
return nil
475480
})
476481

477482
strm.EXPECT().Send(&initReq).DoAndReturn(func(_ rawtopicwriter.ClientMessage) error {
478483
t.Logf("sent init request stream: %v", name)
484+
479485
return nil
480486
})
481487

482488
strm.EXPECT().Recv().DoAndReturn(func() (rawtopicwriter.ServerMessage, error) {
483489
t.Logf("receive init response stream: %v", name)
490+
484491
return &rawtopicwriter.InitResult{
485492
ServerMessageMetadata: rawtopiccommon.ServerMessageMetadata{Status: rawydb.StatusSuccess},
486493
SessionID: name,
@@ -490,6 +497,7 @@ func TestWriterImpl_Reconnect(t *testing.T) {
490497
strm.EXPECT().Recv().DoAndReturn(func() (rawtopicwriter.ServerMessage, error) {
491498
xtest.WaitChannelClosed(t, streamClosed)
492499
t.Logf("channel closed: %v", name)
500+
493501
return nil, errors.New("test stream closed")
494502
}).MaxTimes(1)
495503

@@ -502,8 +510,9 @@ func TestWriterImpl_Reconnect(t *testing.T) {
502510
{SeqNo: 1},
503511
},
504512
Codec: rawtopiccommon.CodecRaw,
505-
}).Do(func(_ rawtopicwriter.ClientMessage) error {
513+
}).DoAndReturn(func(_ rawtopicwriter.ClientMessage) error {
506514
t.Logf("strm2 sent message and return retriable error")
515+
507516
return xerrors.Retryable(errors.New("retriable on strm2"))
508517
})
509518

@@ -515,6 +524,7 @@ func TestWriterImpl_Reconnect(t *testing.T) {
515524
Codec: rawtopiccommon.CodecRaw,
516525
}).DoAndReturn(func(_ rawtopicwriter.ClientMessage) error {
517526
t.Logf("strm3 sent message and return unretriable error")
527+
518528
return errors.New("strm3")
519529
})
520530

@@ -584,6 +594,7 @@ func TestWriterImpl_CloseWithFlush(t *testing.T) {
584594
Codec: rawtopiccommon.CodecRaw,
585595
}).DoAndReturn(func(_ rawtopicwriter.ClientMessage) error {
586596
close(writeCompleted)
597+
587598
return nil
588599
})
589600

@@ -1082,19 +1093,22 @@ func newTestEnv(t testing.TB, options *testEnvOptions) *testEnv {
10821093
PartitionID: res.partitionID,
10831094
SupportedCodecs: supportedCodecs,
10841095
})
1096+
10851097
return nil
10861098
})
10871099
} else {
10881100
res.stream.EXPECT().Send(&req).DoAndReturn(func(receivedRequest rawtopicwriter.ClientMessage) error {
10891101
mess := receivedRequest.(*rawtopicwriter.InitRequest)
10901102
options.customInitRequestHandler(res, mess)
1103+
10911104
return nil
10921105
})
10931106
}
10941107

10951108
streamClosed := make(empty.Chan)
10961109
res.stream.EXPECT().CloseSend().DoAndReturn(func() error {
10971110
close(streamClosed)
1111+
10981112
return nil
10991113
})
11001114

0 commit comments

Comments
 (0)