Skip to content

Commit 8b7537c

Browse files
committed
fix linters
1 parent 47b8872 commit 8b7537c

File tree

12 files changed

+83
-83
lines changed

12 files changed

+83
-83
lines changed

coordination/coordination.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,6 @@ func (d *SemaphoreDescription) String() string {
194194
}
195195

196196
func (s *SemaphoreSession) String() string {
197-
return fmt.Sprintf("{PartitionSessionID: %d Count: %d OrderID: %d Data: %q TimeoutMillis: %v}",
197+
return fmt.Sprintf("{SessionID: %d Count: %d OrderID: %d Data: %q TimeoutMillis: %v}",
198198
s.SessionID, s.Count, s.OrderID, s.Data, s.Timeout)
199199
}

examples/topic/topicreader/topicreader_handler.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions"
1111
)
1212

13-
func startReader(ctx context.Context, db *ydb.Driver) (*topiclistener.TopicListener, error) {
13+
func StartReader(ctx context.Context, db *ydb.Driver) (*topiclistener.TopicListener, error) {
1414
handler := &TopicEventsHandler{
1515
locks: make(map[int64]int64),
1616
}
@@ -23,8 +23,6 @@ func startReader(ctx context.Context, db *ydb.Driver) (*topiclistener.TopicListe
2323
return nil, err
2424
}
2525

26-
err = reader.Close(ctx)
27-
2826
return reader, nil
2927
}
3028

@@ -36,25 +34,27 @@ type TopicEventsHandler struct {
3634
locks map[int64]int64 // [partitionSessionID]lockID
3735
}
3836

39-
func (h *TopicEventsHandler) OnReaderCreated(req topiclistener.ReaderReady) error {
37+
func (h *TopicEventsHandler) OnReaderCreated(req *topiclistener.ReaderReady) error {
4038
h.listener = req.Listener
39+
4140
return nil
4241
}
4342

4443
func (h *TopicEventsHandler) OnReadMessages(
4544
ctx context.Context,
46-
event topiclistener.ReadMessages,
45+
event *topiclistener.ReadMessages,
4746
) error {
4847
for _, mess := range event.Batch.Messages {
49-
log.Println("Receive message: %v/%v/%v", mess.Topic(), mess.PartitionID(), mess.SeqNo)
48+
log.Printf("Receive message: %v/%v/%v", mess.Topic(), mess.PartitionID(), mess.SeqNo)
5049
}
5150
_ = h.listener.Commit(ctx, event.Batch)
51+
5252
return nil
5353
}
5454

5555
func (h *TopicEventsHandler) OnStartPartitionSessionRequest(
5656
ctx context.Context,
57-
event topiclistener.StartPartitionSessionRequest,
57+
event *topiclistener.StartPartitionSessionRequest,
5858
) error {
5959
lockID, offset, err := lockPartition(ctx, event.PartitionSession.TopicPath, event.PartitionSession.PartitionID)
6060

@@ -68,12 +68,13 @@ func (h *TopicEventsHandler) OnStartPartitionSessionRequest(
6868
WithReadOffet(offset).
6969
WithCommitOffset(offset),
7070
)
71+
7172
return err
7273
}
7374

7475
func (h *TopicEventsHandler) OnStopPartitionSessionRequest(
7576
ctx context.Context,
76-
event topiclistener.StopPartitionSessionRequest,
77+
event *topiclistener.StopPartitionSessionRequest,
7778
) error {
7879
h.m.Lock()
7980
lockID := h.locks[event.PartitionSession.PartitionSessionID]
@@ -82,15 +83,14 @@ func (h *TopicEventsHandler) OnStopPartitionSessionRequest(
8283

8384
err := unlockPartition(ctx, lockID)
8485
event.Confirm()
86+
8587
return err
8688
}
8789

8890
func lockPartition(ctx context.Context, topic string, partitionID int64) (lockID, offset int64, err error) {
89-
// TODO implement me
90-
panic("implement me")
91+
panic("not implemented in the example")
9192
}
9293

9394
func unlockPartition(ctx context.Context, lockID int64) error {
94-
// TODO implement me
95-
panic("implement me")
95+
panic("not implemented in the example")
9696
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
package rawtopicreader
2+
3+
// Check interface implementation
4+
var _ TopicReaderStreamInterface = StreamReader{}

internal/topic/topicclientinternal/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ func (c *Client) StartListener(
234234
return nil, err
235235
}
236236

237-
return topiclistener.NewTopicListener(&c.rawClient, cfg, handler)
237+
return topiclistener.NewTopicListener(&c.rawClient, &cfg, handler)
238238
}
239239

240240
// StartReader create new topic reader and start pull messages from server

internal/topic/topiclistenerinternal/event_handler.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@ type EventHandler interface {
1616
// You can set topiclistener.StartPartitionSessionConfirm for change default settings.
1717
//
1818
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
19-
OnStartPartitionSessionRequest(ctx context.Context, event PublicStartPartitionSessionEvent) error
19+
OnStartPartitionSessionRequest(ctx context.Context, event *PublicStartPartitionSessionEvent) error
2020

2121
// OnReadMessages called with batch of messages. Max count of messages limited by internal buffer size
2222
//
2323
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
24-
OnReadMessages(ctx context.Context, req PublicReadMessages) error
24+
OnReadMessages(ctx context.Context, event *PublicReadMessages) error
2525

2626
// OnStopPartitionSessionRequest called when the server send stop partition message. It means that no more OnReadMessages
2727
// calls for the partition session.
@@ -31,7 +31,7 @@ type EventHandler interface {
3131
// It is guaranteed about the method will be called least once.
3232
//
3333
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
34-
OnStopPartitionSessionRequest(ctx context.Context, event PublicStopPartitionSessionEvent) error
34+
OnStopPartitionSessionRequest(ctx context.Context, event *PublicStopPartitionSessionEvent) error
3535
}
3636

3737
// PublicReadMessages

internal/topic/topiclistenerinternal/event_handler_mock_test.go

Lines changed: 26 additions & 26 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/topic/topiclistenerinternal/stream_listener.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,10 @@ func newStreamListener(
4747
res := &streamListener{
4848
cfg: config,
4949
handler: eventListener,
50-
background: *background.NewWorker(context.WithoutCancel(connectionCtx), "topic reader stream listener"),
50+
background: *background.NewWorker(xcontext.ValueOnly(connectionCtx), "topic reader stream listener"),
5151
sessionIDCounter: sessionIDCounter,
5252
}
5353
res.initVars(sessionIDCounter)
54-
5554
if err := res.initStream(connectionCtx, client); err != nil {
5655
res.closeWithTimeout(connectionCtx, err)
5756
return nil, err
@@ -113,7 +112,7 @@ func (l *streamListener) initVars(sessionIDCounter *atomic.Int64) {
113112
}
114113

115114
func (l *streamListener) initStream(ctx context.Context, client TopicClient) error {
116-
streamCtx, streamClose := context.WithCancelCause(context.WithoutCancel(ctx))
115+
streamCtx, streamClose := context.WithCancelCause(xcontext.ValueOnly(ctx))
117116
l.streamClose = streamClose
118117
initDone := make(empty.Chan)
119118
defer close(initDone)
@@ -242,7 +241,7 @@ func (l *streamListener) onStartPartitionRequest(ctx context.Context, m *rawtopi
242241
PartitionSessionID: m.PartitionSession.PartitionSessionID,
243242
}
244243

245-
event := PublicStartPartitionSessionEvent{
244+
event := &PublicStartPartitionSessionEvent{
246245
PartitionSession: session.ToPublic(),
247246
CommittedOffset: m.CommittedOffset.ToInt64(),
248247
PartitionOffsets: PublicOffsetsRange{
@@ -299,7 +298,7 @@ func (l *streamListener) onStopPartitionRequest(ctx context.Context, m *rawtopic
299298
handlerCtx = session.Context()
300299
}
301300

302-
event := PublicStopPartitionSessionEvent{
301+
event := &PublicStopPartitionSessionEvent{
303302
PartitionSession: session.ToPublic(),
304303
Graceful: m.Graceful,
305304
CommittedOffset: m.CommittedOffset.ToInt64(),
@@ -339,7 +338,7 @@ func (l *streamListener) onReadResponse(ctx context.Context, m *rawtopicreader.R
339338
}
340339

341340
for _, batch := range batches {
342-
if err = l.handler.OnReadMessages(batch.Context(), PublicReadMessages{
341+
if err = l.handler.OnReadMessages(batch.Context(), &PublicReadMessages{
343342
PartitionSession: topicreadercommon.BatchGetPartitionSession(batch).ToPublic(),
344343
Batch: batch,
345344
}); err != nil {

internal/topic/topiclistenerinternal/stream_listener_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@ func TestStreamListener_OnReceiveServerMessage(t *testing.T) {
3232
}()
3333

3434
EventHandlerMock(e).EXPECT().OnReadMessages(PartitionSession(e).Context(), gomock.Any()).
35-
DoAndReturn(func(ctx context.Context, req PublicReadMessages) error {
36-
require.Equal(t, PartitionSession(e).ClientPartitionSessionID, req.PartitionSession.PartitionSessionID)
37-
require.Equal(t, req.Batch.Messages[0].SeqNo, seqNo)
35+
DoAndReturn(func(ctx context.Context, event *PublicReadMessages) error {
36+
require.Equal(t, PartitionSession(e).ClientPartitionSessionID, event.PartitionSession.PartitionSessionID)
37+
require.Equal(t, event.Batch.Messages[0].SeqNo, seqNo)
3838
return nil
3939
})
4040

@@ -77,7 +77,7 @@ func TestStreamListener_OnReceiveServerMessage(t *testing.T) {
7777
EventHandlerMock(e).EXPECT().OnStartPartitionSessionRequest(
7878
gomock.Any(),
7979
gomock.Any(),
80-
).DoAndReturn(func(ctx context.Context, event PublicStartPartitionSessionEvent) error {
80+
).DoAndReturn(func(ctx context.Context, event *PublicStartPartitionSessionEvent) error {
8181
require.Equal(t, topicreadercommon.PublicPartitionSession{
8282
PartitionSessionID: 1, // ClientPartitionSessionID
8383
TopicPath: "asd",
@@ -88,7 +88,7 @@ func TestStreamListener_OnReceiveServerMessage(t *testing.T) {
8888
Start: 5,
8989
End: 15,
9090
}, event.PartitionOffsets)
91-
event.Confirm(PublicStartPartitionSessionConfirm{}.
91+
event.ConfirmWithParams(PublicStartPartitionSessionConfirm{}.
9292
WithReadOffet(respReadOffset).
9393
WithCommitOffset(respCommitOffset),
9494
)
@@ -137,7 +137,7 @@ func TestStreamListener_OnReceiveServerMessage(t *testing.T) {
137137
EventHandlerMock(e).EXPECT().OnStopPartitionSessionRequest(
138138
PartitionSession(e).Context(),
139139
gomock.Any(),
140-
).DoAndReturn(func(ctx context.Context, event PublicStopPartitionSessionEvent) error {
140+
).DoAndReturn(func(ctx context.Context, event *PublicStopPartitionSessionEvent) error {
141141
require.Equal(t, PartitionSession(e).ClientPartitionSessionID, event.PartitionSession.PartitionSessionID)
142142
require.True(t, event.Graceful)
143143
require.Equal(t, int64(5), event.CommittedOffset)
@@ -170,7 +170,7 @@ func TestStreamListener_CloseSessionsOnCloseListener(t *testing.T) {
170170
EventHandlerMock(e).EXPECT().OnStopPartitionSessionRequest(
171171
PartitionSession(e).Context(),
172172
gomock.Any(),
173-
).Do(func(ctx context.Context, event PublicStopPartitionSessionEvent) error {
173+
).Do(func(ctx context.Context, event *PublicStopPartitionSessionEvent) error {
174174
require.Equal(t, PartitionSession(e).ClientPartitionSessionID, event.PartitionSession.PartitionSessionID)
175175
require.False(t, event.Graceful)
176176
require.Equal(t, PartitionSession(e).CommittedOffset().ToInt64(), event.CommittedOffset)

internal/topic/topiclistenerinternal/topic_listener_reconnector.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
var ErrUserCloseTopic = errors.New("ydb: user closed topic listener")
1313

1414
type TopicListenerReconnector struct {
15-
streamConfig StreamListenerConfig
15+
streamConfig *StreamListenerConfig
1616
client TopicClient
1717
handler EventHandler
1818

@@ -26,7 +26,7 @@ type TopicListenerReconnector struct {
2626

2727
func NewTopicListenerReconnector(
2828
client TopicClient,
29-
streamConfig StreamListenerConfig,
29+
streamConfig *StreamListenerConfig,
3030
handler EventHandler,
3131
) (*TopicListenerReconnector, error) {
3232
res := &TopicListenerReconnector{

0 commit comments

Comments
 (0)