Skip to content

Commit b5d1297

Browse files
authored
Merge pull request #1374 moved offset to topiccommon
2 parents 4e24bb4 + 686f6bd commit b5d1297

File tree

13 files changed

+103
-85
lines changed

13 files changed

+103
-85
lines changed
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package rawtopiccommon
2+
3+
type Offset int64
4+
5+
func NewOffset(v int64) Offset {
6+
return Offset(v)
7+
}
8+
9+
func (offset *Offset) FromInt64(v int64) {
10+
*offset = Offset(v)
11+
}
12+
13+
func (offset Offset) ToInt64() int64 {
14+
return int64(offset)
15+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package rawtopiccommon
2+
3+
import (
4+
"errors"
5+
6+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Topic"
7+
8+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
9+
)
10+
11+
var errUnexpectedProtobufInOffsets = xerrors.Wrap(errors.New("ydb: unexpected protobuf nil offsets"))
12+
13+
type OffsetRange struct {
14+
Start Offset
15+
End Offset
16+
}
17+
18+
func (r *OffsetRange) FromProto(p *Ydb_Topic.OffsetsRange) error {
19+
if p == nil {
20+
return xerrors.WithStackTrace(errUnexpectedProtobufInOffsets)
21+
}
22+
23+
r.Start.FromInt64(p.GetStart())
24+
r.End.FromInt64(p.GetEnd())
25+
26+
return nil
27+
}
28+
29+
func (r *OffsetRange) ToProto() *Ydb_Topic.OffsetsRange {
30+
return &Ydb_Topic.OffsetsRange{
31+
Start: r.Start.ToInt64(),
32+
End: r.End.ToInt64(),
33+
}
34+
}

internal/grpcwrapper/rawtopic/rawtopicreader/messages.go

Lines changed: 13 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,11 @@ import (
1212
)
1313

1414
var (
15-
errUnexpectedNilStreamReadMessageReadResponse = xerrors.Wrap(errors.New("ydb: unexpected nil Ydb_Topic.StreamReadMessage_ReadResponse")) //nolint:lll
16-
errNilPartitionData = xerrors.Wrap(errors.New("ydb: unexpected nil partition data"))
17-
errUnexpectedNilBatchInPartitionData = xerrors.Wrap(errors.New("ydb: unexpected nil batch in partition data")) //nolint:lll
18-
errUnexpectedMessageNilInPartitionData = xerrors.Wrap(errors.New("ydb: unexpected message nil in partition data")) //nolint:lll
19-
errUnexpectedProtobufInOffsets = xerrors.Wrap(errors.New("ydb: unexpected protobuf nil offsets"))
15+
errUnexpectedNilStreamReadMessageReadResponse = xerrors.Wrap(errors.New("ydb: unexpected nil Ydb_Topic.StreamReadMessage_ReadResponse")) //nolint:lll
16+
errNilPartitionData = xerrors.Wrap(errors.New("ydb: unexpected nil partition data"))
17+
errUnexpectedNilBatchInPartitionData = xerrors.Wrap(errors.New("ydb: unexpected nil batch in partition data")) //nolint:lll
18+
errUnexpectedMessageNilInPartitionData = xerrors.Wrap(errors.New("ydb: unexpected message nil in partition data")) //nolint:lll
19+
2020
errUnexpectedProtoNilStartPartitionSessionRequest = xerrors.Wrap(errors.New("ydb: unexpected proto nil start partition session request")) //nolint:lll
2121
errUnexpectedNilPartitionSession = xerrors.Wrap(errors.New("ydb: unexpected proto nil partition session in start partition session request")) //nolint:lll
2222
errUnexpectedGrpcNilStopPartitionSessionRequest = xerrors.Wrap(errors.New("ydb: unexpected grpc nil stop partition session request")) //nolint:lll
@@ -32,22 +32,8 @@ func (id PartitionSessionID) ToInt64() int64 {
3232
return int64(id)
3333
}
3434

35-
type Offset int64
36-
37-
func NewOffset(v int64) Offset {
38-
return Offset(v)
39-
}
40-
41-
func (offset *Offset) FromInt64(v int64) {
42-
*offset = Offset(v)
43-
}
44-
45-
func (offset Offset) ToInt64() int64 {
46-
return int64(offset)
47-
}
48-
4935
type OptionalOffset struct {
50-
Offset Offset
36+
Offset rawtopiccommon.Offset
5137
HasValue bool
5238
}
5339

@@ -286,7 +272,7 @@ type Batch struct {
286272
}
287273

288274
type MessageData struct {
289-
Offset Offset
275+
Offset rawtopiccommon.Offset
290276
SeqNo int64
291277
CreatedAt time.Time
292278
Data []byte
@@ -330,30 +316,7 @@ func (r *CommitOffsetRequest) toProto() *Ydb_Topic.StreamReadMessage_CommitOffse
330316

331317
type PartitionCommitOffset struct {
332318
PartitionSessionID PartitionSessionID
333-
Offsets []OffsetRange
334-
}
335-
336-
type OffsetRange struct {
337-
Start Offset
338-
End Offset
339-
}
340-
341-
func (r *OffsetRange) FromProto(p *Ydb_Topic.OffsetsRange) error {
342-
if p == nil {
343-
return xerrors.WithStackTrace(errUnexpectedProtobufInOffsets)
344-
}
345-
346-
r.Start.FromInt64(p.GetStart())
347-
r.End.FromInt64(p.GetEnd())
348-
349-
return nil
350-
}
351-
352-
func (r *OffsetRange) ToProto() *Ydb_Topic.OffsetsRange {
353-
return &Ydb_Topic.OffsetsRange{
354-
Start: r.Start.ToInt64(),
355-
End: r.End.ToInt64(),
356-
}
319+
Offsets []rawtopiccommon.OffsetRange
357320
}
358321

359322
type CommitOffsetResponse struct {
@@ -382,7 +345,7 @@ func (r *CommitOffsetResponse) fromProto(proto *Ydb_Topic.StreamReadMessage_Comm
382345

383346
type PartitionCommittedOffset struct {
384347
PartitionSessionID PartitionSessionID
385-
CommittedOffset Offset
348+
CommittedOffset rawtopiccommon.Offset
386349
}
387350

388351
//
@@ -407,7 +370,7 @@ type PartitionSessionStatusResponse struct {
407370
rawtopiccommon.ServerMessageMetadata
408371

409372
PartitionSessionID PartitionSessionID
410-
PartitionOffsets OffsetRange
373+
PartitionOffsets rawtopiccommon.OffsetRange
411374
WriteTimeHighWatermark time.Time
412375
}
413376

@@ -433,8 +396,8 @@ type StartPartitionSessionRequest struct {
433396
rawtopiccommon.ServerMessageMetadata
434397

435398
PartitionSession PartitionSession
436-
CommittedOffset Offset
437-
PartitionOffsets OffsetRange
399+
CommittedOffset rawtopiccommon.Offset
400+
PartitionOffsets rawtopiccommon.OffsetRange
438401
}
439402

440403
func (r *StartPartitionSessionRequest) fromProto(p *Ydb_Topic.StreamReadMessage_StartPartitionSessionRequest) error {
@@ -489,7 +452,7 @@ type StopPartitionSessionRequest struct {
489452

490453
PartitionSessionID PartitionSessionID
491454
Graceful bool
492-
CommittedOffset Offset
455+
CommittedOffset rawtopiccommon.Offset
493456
}
494457

495458
func (r *StopPartitionSessionRequest) fromProto(proto *Ydb_Topic.StreamReadMessage_StopPartitionSessionRequest) error {

internal/topic/topiclistenerinternal/stream_listener.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -292,11 +292,11 @@ func (l *streamListener) onStartPartitionRequest(
292292
}
293293

294294
if userResp.readOffset != nil {
295-
resp.ReadOffset.Offset = rawtopicreader.NewOffset(*userResp.readOffset)
295+
resp.ReadOffset.Offset = rawtopiccommon.NewOffset(*userResp.readOffset)
296296
resp.ReadOffset.HasValue = true
297297
}
298298
if userResp.CommitOffset != nil {
299-
resp.CommitOffset.Offset = rawtopicreader.NewOffset(*userResp.CommitOffset)
299+
resp.CommitOffset.Offset = rawtopiccommon.NewOffset(*userResp.CommitOffset)
300300
resp.CommitOffset.HasValue = true
301301
}
302302

internal/topic/topiclistenerinternal/stream_listener_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ func TestStreamListener_OnReceiveServerMessage(t *testing.T) {
108108
PartitionID: 123,
109109
},
110110
CommittedOffset: 10,
111-
PartitionOffsets: rawtopicreader.OffsetRange{
111+
PartitionOffsets: rawtopiccommon.OffsetRange{
112112
Start: 5,
113113
End: 15,
114114
},
@@ -118,11 +118,11 @@ func TestStreamListener_OnReceiveServerMessage(t *testing.T) {
118118
require.Equal(t, &rawtopicreader.StartPartitionSessionResponse{
119119
PartitionSessionID: 100,
120120
ReadOffset: rawtopicreader.OptionalOffset{
121-
Offset: rawtopicreader.NewOffset(respReadOffset),
121+
Offset: rawtopiccommon.NewOffset(respReadOffset),
122122
HasValue: true,
123123
},
124124
CommitOffset: rawtopicreader.OptionalOffset{
125-
Offset: rawtopicreader.NewOffset(respCommitOffset),
125+
Offset: rawtopiccommon.NewOffset(respCommitOffset),
126126
HasValue: true,
127127
},
128128
}, req)

internal/topic/topicreadercommon/commit_range.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package topicreadercommon
33
import (
44
"sort"
55

6+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon"
67
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicreader"
78
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
89
)
@@ -142,7 +143,7 @@ func (r *CommitRanges) toRawPartitionCommitOffset() []rawtopicreader.PartitionCo
142143

143144
for i := range r.Ranges {
144145
commit := &r.Ranges[i]
145-
offsetsRange := rawtopicreader.OffsetRange{
146+
offsetsRange := rawtopiccommon.OffsetRange{
146147
Start: commit.CommitOffsetStart,
147148
End: commit.CommitOffsetEnd,
148149
}
@@ -168,8 +169,8 @@ func (p PublicCommitRange) getCommitRange() PublicCommitRange {
168169
}
169170

170171
type CommitRange struct {
171-
CommitOffsetStart rawtopicreader.Offset
172-
CommitOffsetEnd rawtopicreader.Offset
172+
CommitOffsetStart rawtopiccommon.Offset
173+
CommitOffsetEnd rawtopiccommon.Offset
173174
PartitionSession *PartitionSession
174175
}
175176

internal/topic/topicreadercommon/commit_range_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
"github.com/stretchr/testify/require"
77

8+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon"
89
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicreader"
910
)
1011

@@ -193,7 +194,7 @@ func TestCommitsToRawPartitionCommitOffset(t *testing.T) {
193194
expected: []rawtopicreader.PartitionCommitOffset{
194195
{
195196
PartitionSessionID: 1,
196-
Offsets: []rawtopicreader.OffsetRange{
197+
Offsets: []rawtopiccommon.OffsetRange{
197198
{Start: 1, End: 2},
198199
},
199200
},
@@ -209,7 +210,7 @@ func TestCommitsToRawPartitionCommitOffset(t *testing.T) {
209210
expected: []rawtopicreader.PartitionCommitOffset{
210211
{
211212
PartitionSessionID: 1,
212-
Offsets: []rawtopicreader.OffsetRange{
213+
Offsets: []rawtopiccommon.OffsetRange{
213214
{Start: 1, End: 2},
214215
{Start: 10, End: 20},
215216
{Start: 30, End: 40},
@@ -228,14 +229,14 @@ func TestCommitsToRawPartitionCommitOffset(t *testing.T) {
228229
expected: []rawtopicreader.PartitionCommitOffset{
229230
{
230231
PartitionSessionID: 1,
231-
Offsets: []rawtopicreader.OffsetRange{
232+
Offsets: []rawtopiccommon.OffsetRange{
232233
{Start: 1, End: 2},
233234
{Start: 10, End: 20},
234235
},
235236
},
236237
{
237238
PartitionSessionID: 2,
238-
Offsets: []rawtopicreader.OffsetRange{
239+
Offsets: []rawtopiccommon.OffsetRange{
239240
{Start: 1, End: 2},
240241
{Start: 10, End: 20},
241242
},
@@ -254,14 +255,14 @@ func TestCommitsToRawPartitionCommitOffset(t *testing.T) {
254255
expected: []rawtopicreader.PartitionCommitOffset{
255256
{
256257
PartitionSessionID: 1,
257-
Offsets: []rawtopicreader.OffsetRange{
258+
Offsets: []rawtopiccommon.OffsetRange{
258259
{Start: 1, End: 2},
259260
{Start: 10, End: 20},
260261
},
261262
},
262263
{
263264
PartitionSessionID: 2,
264-
Offsets: []rawtopicreader.OffsetRange{
265+
Offsets: []rawtopiccommon.OffsetRange{
265266
{Start: 1, End: 2},
266267
{Start: 3, End: 4},
267268
{Start: 5, End: 6},

internal/topic/topicreadercommon/read_partition_session.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"sync/atomic"
66

7+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon"
78
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicreader"
89
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
910
)
@@ -32,7 +33,7 @@ func NewPartitionSession(
3233
connectionID string,
3334
partitionSessionID rawtopicreader.PartitionSessionID,
3435
clientPartitionSessionID int64,
35-
committedOffset rawtopicreader.Offset,
36+
committedOffset rawtopiccommon.Offset,
3637
) *PartitionSession {
3738
partitionContext, cancel := xcontext.WithCancel(partitionContext)
3839

@@ -64,29 +65,29 @@ func (s *PartitionSession) Close() {
6465
s.ctxCancel()
6566
}
6667

67-
func (s *PartitionSession) CommittedOffset() rawtopicreader.Offset {
68+
func (s *PartitionSession) CommittedOffset() rawtopiccommon.Offset {
6869
v := s.committedOffsetVal.Load()
6970

70-
var res rawtopicreader.Offset
71+
var res rawtopiccommon.Offset
7172
res.FromInt64(v)
7273

7374
return res
7475
}
7576

76-
func (s *PartitionSession) SetCommittedOffset(v rawtopicreader.Offset) {
77+
func (s *PartitionSession) SetCommittedOffset(v rawtopiccommon.Offset) {
7778
s.committedOffsetVal.Store(v.ToInt64())
7879
}
7980

80-
func (s *PartitionSession) LastReceivedMessageOffset() rawtopicreader.Offset {
81+
func (s *PartitionSession) LastReceivedMessageOffset() rawtopiccommon.Offset {
8182
v := s.lastReceivedOffsetEndVal.Load()
8283

83-
var res rawtopicreader.Offset
84+
var res rawtopiccommon.Offset
8485
res.FromInt64(v)
8586

8687
return res
8788
}
8889

89-
func (s *PartitionSession) SetLastReceivedMessageOffset(v rawtopicreader.Offset) {
90+
func (s *PartitionSession) SetLastReceivedMessageOffset(v rawtopiccommon.Offset) {
9091
s.lastReceivedOffsetEndVal.Store(v.ToInt64())
9192
}
9293

internal/topic/topicreaderinternal/batcher_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/stretchr/testify/require"
1111

1212
"github.com/ydb-platform/ydb-go-sdk/v3/internal/empty"
13+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon"
1314
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicreader"
1415
"github.com/ydb-platform/ydb-go-sdk/v3/internal/topic/topicreadercommon"
1516
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
@@ -339,8 +340,8 @@ func TestBatcherConcurency(t *testing.T) {
339340
go func() {
340341
for i := 0; i < count; i++ {
341342
_ = b.PushRawMessage(session, &rawtopicreader.StartPartitionSessionRequest{
342-
CommittedOffset: rawtopicreader.NewOffset(int64(i)),
343-
PartitionOffsets: rawtopicreader.OffsetRange{},
343+
CommittedOffset: rawtopiccommon.NewOffset(int64(i)),
344+
PartitionOffsets: rawtopiccommon.OffsetRange{},
344345
})
345346
}
346347
}()
@@ -353,7 +354,7 @@ func TestBatcherConcurency(t *testing.T) {
353354
require.NoError(tb, err)
354355
require.Equal(
355356
tb,
356-
rawtopicreader.NewOffset(int64(i)),
357+
rawtopiccommon.NewOffset(int64(i)),
357358
res.RawMessage.(*rawtopicreader.StartPartitionSessionRequest).CommittedOffset,
358359
)
359360
}

0 commit comments

Comments
 (0)