Skip to content

Commit 9f997d7

Browse files
Support for server-side load balancing of sessions (#1813)
Co-authored-by: Aleksey Myasnikov <asmyasnikov@ydb.tech>
1 parent 2c230d0 commit 9f997d7

File tree

11 files changed

+246
-22
lines changed

11 files changed

+246
-22
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
* Fixed the support of server-side session balancing in `database/sql` driver
2+
* Added `ydb.WithDisableSessionBalancer()` driver option for disable server-side session balancing on table and query clients
3+
14
## v3.111.3
25
* Fixed session closing in `ydb.WithExecuteDataQueryOverQueryClient(true)` scenario
36

internal/config/config.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ type Common struct {
1616
traceRetry trace.Retry
1717
retryBudget budget.Budget
1818

19+
disableSessionBalancer bool
20+
1921
panicCallback func(e interface{})
2022
}
2123

@@ -60,6 +62,10 @@ func (c *Common) RetryBudget() budget.Budget {
6062
return c.retryBudget
6163
}
6264

65+
func (c *Common) DisableSessionBalancer() bool {
66+
return c.disableSessionBalancer
67+
}
68+
6369
// SetOperationTimeout define the maximum amount of time a YDB server will process
6470
// an operation. After timeout exceeds YDB will try to cancel operation and
6571
// regardless of the cancellation appropriate error will be returned to
@@ -97,3 +103,7 @@ func SetTraceRetry(c *Common, t *trace.Retry, opts ...trace.RetryComposeOption)
97103
func SetRetryBudget(c *Common, b budget.Budget) {
98104
c.retryBudget = b
99105
}
106+
107+
func (c *Common) SetDisableSessionBalancer() {
108+
c.disableSessionBalancer = true
109+
}

internal/query/client.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"google.golang.org/grpc"
1111

1212
"github.com/ydb-platform/ydb-go-sdk/v3/internal/closer"
13+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/meta"
1314
"github.com/ydb-platform/ydb-go-sdk/v3/internal/operation"
1415
"github.com/ydb-platform/ydb-go-sdk/v3/internal/pool"
1516
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config"
@@ -595,6 +596,10 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, cfg *config.Config) *
595596
}
596597
defer cancelCreate()
597598

599+
if !cfg.DisableSessionBalancer() {
600+
createCtx = meta.WithAllowFeatures(createCtx, meta.HintSessionBalancer)
601+
}
602+
598603
s, err := createSession(createCtx, client,
599604
WithConn(cc),
600605
WithDeleteTimeout(cfg.SessionDeleteTimeout()),

internal/query/config/options.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,3 +85,9 @@ func WithLazyTx(lazyTx bool) Option {
8585
c.lazyTx = lazyTx
8686
}
8787
}
88+
89+
func WithDisableSessionBalancer() Option {
90+
return func(c *Config) {
91+
c.SetDisableSessionBalancer()
92+
}
93+
}

internal/query/session_core.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@ import (
1313
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
1414
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query"
1515
"google.golang.org/grpc"
16+
"google.golang.org/grpc/metadata"
1617

1718
"github.com/ydb-platform/ydb-go-sdk/v3/internal/closer"
1819
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
1920
balancerContext "github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
21+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/meta"
2022
"github.com/ydb-platform/ydb-go-sdk/v3/internal/pool"
2123
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
2224
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
@@ -90,6 +92,19 @@ func (core *sessionCore) Status() string {
9092
}
9193
}
9294

95+
func (core *sessionCore) checkCloseHint(md metadata.MD) {
96+
for header, values := range md {
97+
if header != meta.HeaderServerHints {
98+
continue
99+
}
100+
for _, hint := range values {
101+
if hint == meta.HintSessionClose {
102+
core.SetStatus(StatusClosing)
103+
}
104+
}
105+
}
106+
}
107+
93108
type Option func(*sessionCore)
94109

95110
func WithConn(cc grpc.ClientConnInterface) Option {
@@ -150,7 +165,7 @@ func Open(
150165
if core.cc != nil {
151166
core.Client = Ydb_Query_V1.NewQueryServiceClient(
152167
conn.WithContextModifier(core.cc, func(ctx context.Context) context.Context {
153-
return balancerContext.WithNodeID(ctx, core.NodeID())
168+
return meta.WithTrailerCallback(balancerContext.WithNodeID(ctx, core.NodeID()), core.checkCloseHint)
154169
}),
155170
)
156171
}

internal/table/client.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"google.golang.org/grpc"
1212
"google.golang.org/protobuf/proto"
1313

14+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/meta"
1415
"github.com/ydb-platform/ydb-go-sdk/v3/internal/pool"
1516
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
1617
"github.com/ydb-platform/ydb-go-sdk/v3/internal/table/config"
@@ -56,6 +57,10 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config
5657
}),
5758
pool.WithClock[*Session, Session](config.Clock()),
5859
pool.WithCreateItemFunc[*Session, Session](func(ctx context.Context) (*Session, error) {
60+
if !config.DisableSessionBalancer() {
61+
ctx = meta.WithAllowFeatures(ctx, meta.HintSessionBalancer)
62+
}
63+
5964
return newSession(ctx, cc, config)
6065
}),
6166
pool.WithTrace[*Session, Session](&pool.Trace{

internal/table/config/config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,12 @@ func UseQuerySession(b bool) Option {
186186
}
187187
}
188188

189+
func WithDisableSessionBalancer() Option {
190+
return func(c *Config) {
191+
c.SetDisableSessionBalancer()
192+
}
193+
}
194+
189195
// WithClock replaces default clock
190196
func WithClock(clock clockwork.Clock) Option {
191197
return func(c *Config) {

internal/xsql/connector.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"google.golang.org/grpc"
1515

1616
"github.com/ydb-platform/ydb-go-sdk/v3/internal/bind"
17+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/meta"
1718
internalQuery "github.com/ydb-platform/ydb-go-sdk/v3/internal/query"
1819
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config"
1920
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
@@ -117,6 +118,10 @@ func (c *Connector) Connect(ctx context.Context) (_ driver.Conn, finalErr error)
117118
stack.FunctionID("database/sql.(*Connector).Connect", stack.Package("database/sql")),
118119
)
119120

121+
if !c.disableServerBalancer {
122+
ctx = meta.WithAllowFeatures(ctx, meta.HintSessionBalancer)
123+
}
124+
120125
switch c.processor {
121126
case QUERY:
122127
s, err := internalQuery.CreateSession(ctx, Ydb_Query_V1.NewQueryServiceClient(c.balancer), c.queryConfig)

options.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,20 @@ func WithDatabase(database string) Option {
252252
}
253253
}
254254

255+
// WithDisableSessionBalancer returns an Option that disables session balancing.
256+
func WithDisableSessionBalancer() Option {
257+
return func(ctx context.Context, d *Driver) error {
258+
d.queryOptions = append(d.queryOptions, queryConfig.WithDisableSessionBalancer())
259+
d.tableOptions = append(d.tableOptions, tableConfig.WithDisableSessionBalancer())
260+
261+
// implicit disable session balancer for SQL driver;
262+
// rule of thumb: if user disables session balancer anywhere, we disable it everywhere
263+
d.databaseSQLOptions = append(d.databaseSQLOptions, WithDisableServerBalancer())
264+
265+
return nil
266+
}
267+
}
268+
255269
// WithSecure defines secure option
256270
//
257271
// Warning: use ydb.Open with required Driver string parameter instead

tests/integration/helpers_test.go

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -127,39 +127,50 @@ func (scope *scopeT) driverNamed(name string, opts ...ydb.Option) *ydb.Driver {
127127
connectionString := scope.ConnectionString()
128128
scope.Logf("Connect with connection string, driver name %q: %v", name, connectionString)
129129

130-
token := scope.AuthToken()
131-
if token == "" {
132-
scope.Logf("With empty auth token")
133-
opts = append(opts, ydb.WithAnonymousCredentials())
134-
} else {
135-
scope.Logf("With auth token")
136-
opts = append(opts, ydb.WithAccessTokenCredentials(token))
137-
}
138-
cert := scope.CertFile()
139-
if cert == "" {
140-
scope.Logf("Without tls")
141-
opts = append(opts, ydb.WithTLSSInsecureSkipVerify())
142-
} else {
143-
scope.Logf("With tls")
144-
opts = append(opts, ydb.WithCertificatesFromFile(cert))
145-
}
130+
driver := scope.NonCachingDriver(opts...)
146131

147-
connectionContext, cancel := context.WithTimeout(scope.Ctx, time.Second*10)
148-
defer cancel()
149-
150-
driver, err := ydb.Open(connectionContext, connectionString, opts...)
151132
clean := func() {
152133
if driver != nil {
153134
scope.Require.NoError(driver.Close(scope.Ctx))
154135
}
155136
}
156137

157-
return fixenv.NewGenericResultWithCleanup(driver, clean), err
138+
return fixenv.NewGenericResultWithCleanup(driver, clean), nil
158139
}
159140

160141
return fixenv.CacheResult(scope.Env, f, fixenv.CacheOptions{CacheKey: name})
161142
}
162143

144+
func (scope *scopeT) NonCachingDriver(opts ...ydb.Option) *ydb.Driver {
145+
connectionString := scope.ConnectionString()
146+
scope.Logf("Connect with connection string: %v", connectionString)
147+
148+
token := scope.AuthToken()
149+
if token == "" {
150+
scope.Logf("With empty auth token")
151+
opts = append(opts, ydb.WithAnonymousCredentials())
152+
} else {
153+
scope.Logf("With auth token")
154+
opts = append(opts, ydb.WithAccessTokenCredentials(token))
155+
}
156+
cert := scope.CertFile()
157+
if cert == "" {
158+
scope.Logf("Without tls")
159+
opts = append(opts, ydb.WithTLSSInsecureSkipVerify())
160+
} else {
161+
scope.Logf("With tls")
162+
opts = append(opts, ydb.WithCertificatesFromFile(cert))
163+
}
164+
165+
connectionContext, cancel := context.WithTimeout(scope.Ctx, time.Second*10)
166+
defer cancel()
167+
168+
driver, err := ydb.Open(connectionContext, connectionString, opts...)
169+
scope.Require.NoError(err)
170+
171+
return driver
172+
}
173+
163174
func (scope *scopeT) SQLDriver(opts ...ydb.ConnectorOption) *sql.DB {
164175
f := func() (*fixenv.GenericResult[*sql.DB], error) {
165176
driver := scope.Driver()

0 commit comments

Comments
 (0)