Skip to content

Commit 5efc4b5

Browse files
committed
sync
1 parent cfe998c commit 5efc4b5

File tree

3 files changed

+103
-6
lines changed

3 files changed

+103
-6
lines changed

internal/grpcwrapper/rawtopic/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func (c *Client) StreamRead(ctxStreamLifeTime context.Context, readerID int64, t
101101
return rawtopicreader.StreamReader{
102102
Stream: protoResp,
103103
ReaderID: readerID,
104-
Tracer: *tracer,
104+
Tracer: tracer,
105105
}, nil
106106
}
107107

internal/grpcwrapper/rawtopic/controlplane_types.go

Lines changed: 86 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,11 @@ import (
1010
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
1111
)
1212

13-
var errUnexpectedNilPartitioningSettings = xerrors.Wrap(errors.New("ydb: unexpected nil partitioning settings"))
13+
var (
14+
errUnexpectedNilPartitioningSettings = xerrors.Wrap(errors.New("ydb: unexpected nil partitioning settings"))
15+
errUnexpecredNilAutoPartitioningSettings = xerrors.Wrap(errors.New("ydb: unexpected nil auto-partitioning settings"))
16+
errUnexpectedNilAutoPartitionWriteSpeed = xerrors.Wrap(errors.New("ydb: unexpected nil auto-partition write speed"))
17+
)
1418

1519
type Consumer struct {
1620
Name string
@@ -47,8 +51,10 @@ const (
4751
)
4852

4953
type PartitioningSettings struct {
50-
MinActivePartitions int64
51-
PartitionCountLimit int64
54+
MinActivePartitions int64
55+
MaxActivePartitions int64
56+
PartitionCountLimit int64
57+
AutoPartitioningSettings AutoPartitioningSettings
5258
}
5359

5460
func (s *PartitioningSettings) FromProto(proto *Ydb_Topic.PartitioningSettings) error {
@@ -57,26 +63,101 @@ func (s *PartitioningSettings) FromProto(proto *Ydb_Topic.PartitioningSettings)
5763
}
5864

5965
s.MinActivePartitions = proto.GetMinActivePartitions()
66+
s.MaxActivePartitions = proto.GetMaxActivePartitions()
6067
s.PartitionCountLimit = proto.GetPartitionCountLimit() //nolint:staticcheck
6168

6269
return nil
6370
}
6471

6572
func (s *PartitioningSettings) ToProto() *Ydb_Topic.PartitioningSettings {
6673
return &Ydb_Topic.PartitioningSettings{
67-
MinActivePartitions: s.MinActivePartitions,
68-
PartitionCountLimit: s.PartitionCountLimit,
74+
MinActivePartitions: s.MinActivePartitions,
75+
MaxActivePartitions: s.MaxActivePartitions,
76+
PartitionCountLimit: s.PartitionCountLimit,
77+
AutoPartitioningSettings: s.AutoPartitioningSettings.ToProto(),
6978
}
7079
}
7180

81+
type AutoPartitioningSettings struct {
82+
AutoPartitioningStrategy AutoPartitioningStrategy
83+
AutoPartitioningWriteSpeedStrategy AutoPartitioningWriteSpeedStrategy
84+
}
85+
86+
func (s *AutoPartitioningSettings) ToProto() *Ydb_Topic.AutoPartitioningSettings {
87+
if s == nil {
88+
return nil
89+
}
90+
91+
return &Ydb_Topic.AutoPartitioningSettings{
92+
Strategy: s.AutoPartitioningStrategy.ToProto(),
93+
PartitionWriteSpeed: s.AutoPartitioningWriteSpeedStrategy.ToProto(),
94+
}
95+
}
96+
97+
func (s *AutoPartitioningSettings) FromProto(proto *Ydb_Topic.AutoPartitioningSettings) error {
98+
if proto == nil {
99+
return xerrors.WithStackTrace(errUnexpecredNilAutoPartitioningSettings)
100+
}
101+
s.AutoPartitioningStrategy = AutoPartitioningStrategy(proto.GetStrategy())
102+
103+
if proto.GetPartitionWriteSpeed() != nil {
104+
if err := s.AutoPartitioningWriteSpeedStrategy.FromProto(proto.GetPartitionWriteSpeed()); err != nil {
105+
return err
106+
}
107+
}
108+
109+
return nil
110+
}
111+
112+
type AutoPartitioningStrategy int32
113+
114+
const (
115+
AutoPartitioningStrategyUnspecified = AutoPartitioningStrategy(Ydb_Topic.AutoPartitioningStrategy_AUTO_PARTITIONING_STRATEGY_UNSPECIFIED)
116+
AutoPartitioningStrategyDisabled = AutoPartitioningStrategy(Ydb_Topic.AutoPartitioningStrategy_AUTO_PARTITIONING_STRATEGY_DISABLED)
117+
AutoPartitioningStrategyScaleUpAndDown = AutoPartitioningStrategy(Ydb_Topic.AutoPartitioningStrategy_AUTO_PARTITIONING_STRATEGY_SCALE_UP_AND_DOWN)
118+
AutoPartitioningStrategyPaused = AutoPartitioningStrategy(Ydb_Topic.AutoPartitioningStrategy_AUTO_PARTITIONING_STRATEGY_PAUSED)
119+
)
120+
121+
func (s AutoPartitioningStrategy) ToProto() Ydb_Topic.AutoPartitioningStrategy {
122+
return Ydb_Topic.AutoPartitioningStrategy(s)
123+
}
124+
125+
type AutoPartitioningWriteSpeedStrategy struct {
126+
StabilizationWindow rawoptional.Duration
127+
UpUtilizationPercent int32
128+
DownUtilizationPercent int32
129+
}
130+
131+
func (s *AutoPartitioningWriteSpeedStrategy) ToProto() *Ydb_Topic.AutoPartitioningWriteSpeedStrategy {
132+
return &Ydb_Topic.AutoPartitioningWriteSpeedStrategy{
133+
StabilizationWindow: s.StabilizationWindow.ToProto(),
134+
UpUtilizationPercent: s.UpUtilizationPercent,
135+
DownUtilizationPercent: s.DownUtilizationPercent,
136+
}
137+
}
138+
139+
func (s *AutoPartitioningWriteSpeedStrategy) FromProto(speed *Ydb_Topic.AutoPartitioningWriteSpeedStrategy) error {
140+
if speed == nil {
141+
return xerrors.WithStackTrace(errUnexpectedNilAutoPartitionWriteSpeed)
142+
}
143+
144+
s.StabilizationWindow.MustFromProto(speed.StabilizationWindow)
145+
s.UpUtilizationPercent = speed.UpUtilizationPercent
146+
s.DownUtilizationPercent = speed.DownUtilizationPercent
147+
148+
return nil
149+
}
150+
72151
type AlterPartitioningSettings struct {
73152
SetMinActivePartitions rawoptional.Int64
153+
SetMaxActivePartitions rawoptional.Int64
74154
SetPartitionCountLimit rawoptional.Int64
75155
}
76156

77157
func (s *AlterPartitioningSettings) ToProto() *Ydb_Topic.AlterPartitioningSettings {
78158
return &Ydb_Topic.AlterPartitioningSettings{
79159
SetMinActivePartitions: s.SetMinActivePartitions.ToProto(),
160+
SetMaxActivePartitions: s.SetMaxActivePartitions.ToProto(),
80161
SetPartitionCountLimit: s.SetPartitionCountLimit.ToProto(),
81162
}
82163
}

tests/integration/topic_partitions_balanced_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package integration
55

66
import (
77
"context"
8+
"fmt"
89
"sync"
910
"sync/atomic"
1011
"testing"
@@ -107,3 +108,18 @@ func TestTopicPartitionsBalanced(t *testing.T) {
107108
xtest.WaitChannelClosed(t, firstReaderReadStopped)
108109
require.NoError(t, firstReader.Close(ctx))
109110
}
111+
112+
func TestTopicSplitPartitions(t *testing.T) {
113+
scope := newScope(t)
114+
115+
params := `
116+
max_active_partitions=100,
117+
partition_write_burst_bytes=1,
118+
auto_partitioning_strategy=scale_up,
119+
auto_partitioning_up_utilization_percent=1,
120+
auto_partitioning_stabilization_window=Interval('PT1S')
121+
`
122+
err := scope.Driver().Query().Exec(scope.Ctx, fmt.Sprintf("ALTER TOPIC `%v` SET (%v)", scope.TopicPath(), params))
123+
scope.Require.NoError(err)
124+
125+
}

0 commit comments

Comments
 (0)