Skip to content

Commit e4d54bf

Browse files
authored
Merge pull request #1284 from ydb-platform/remove-check-node-id
* Diabled check the node is available for query service session
2 parents f2e5b7b + f9fa00b commit e4d54bf

File tree

9 files changed

+40
-110
lines changed

9 files changed

+40
-110
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
* Refactored the `balancers.PreferLocations()' function - it is a clean/pure function
1+
* Removed check the node is available for query and table service sessions
2+
* Refactored the `balancers.PreferLocations()` function - it is a clean/pure function
23

34
## v3.74.2
45
* Added description to scan errors with use query service client scanner

internal/balancer/balancer.go

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -46,19 +46,6 @@ type Balancer struct {
4646
onApplyDiscoveredEndpoints []func(ctx context.Context, endpoints []endpoint.Info)
4747
}
4848

49-
func (b *Balancer) HasNode(id uint32) bool {
50-
if b.config.SingleConn {
51-
return true
52-
}
53-
b.mu.RLock()
54-
defer b.mu.RUnlock()
55-
if _, has := b.connectionsState.connByNodeID[id]; has {
56-
return true
57-
}
58-
59-
return false
60-
}
61-
6249
func (b *Balancer) OnUpdate(onApplyDiscoveredEndpoints func(ctx context.Context, endpoints []endpoint.Info)) {
6350
b.mu.WithLock(func() {
6451
b.onApplyDiscoveredEndpoints = append(b.onApplyDiscoveredEndpoints, onApplyDiscoveredEndpoints)

internal/query/client.go

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,6 @@ import (
2020

2121
//go:generate mockgen -destination grpc_client_mock_test.go -package query -write_package_comment=false github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1 QueryServiceClient,QueryService_AttachSessionClient,QueryService_ExecuteQueryClient
2222

23-
type nodeChecker interface {
24-
HasNode(id uint32) bool
25-
}
26-
27-
type balancer interface {
28-
grpc.ClientConnInterface
29-
nodeChecker
30-
}
31-
3223
var _ query.Client = (*Client)(nil)
3324

3425
type Client struct {
@@ -284,7 +275,7 @@ func (c *Client) DoTx(ctx context.Context, op query.TxOperation, opts ...options
284275
return nil
285276
}
286277

287-
func New(ctx context.Context, balancer balancer, cfg *config.Config) *Client {
278+
func New(ctx context.Context, balancer grpc.ClientConnInterface, cfg *config.Config) *Client {
288279
onDone := trace.QueryOnNew(cfg.Trace(), &ctx,
289280
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.New"),
290281
)
@@ -313,11 +304,7 @@ func New(ctx context.Context, balancer balancer, cfg *config.Config) *Client {
313304
}
314305
defer cancelCreate()
315306

316-
s, err := createSession(createCtx, client.grpcClient, cfg,
317-
withSessionCheck(func(s *Session) bool {
318-
return balancer.HasNode(uint32(s.nodeID))
319-
}),
320-
)
307+
s, err := createSession(createCtx, client.grpcClient, cfg)
321308
if err != nil {
322309
return nil, xerrors.WithStackTrace(err)
323310
}

internal/query/session.go

Lines changed: 12 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,15 @@ import (
2121

2222
var _ query.Session = (*Session)(nil)
2323

24-
type (
25-
Session struct {
26-
cfg *config.Config
27-
id string
28-
nodeID int64
29-
grpcClient Ydb_Query_V1.QueryServiceClient
30-
statusCode statusCode
31-
closeOnce func(ctx context.Context) error
32-
checks []func(s *Session) bool
33-
}
34-
sessionOption func(session *Session)
35-
)
24+
type Session struct {
25+
cfg *config.Config
26+
id string
27+
nodeID int64
28+
grpcClient Ydb_Query_V1.QueryServiceClient
29+
statusCode statusCode
30+
closeOnce func(ctx context.Context) error
31+
checks []func(s *Session) bool
32+
}
3633

3734
func (s *Session) ReadRow(ctx context.Context, q string, opts ...options.ExecuteOption) (row query.Row, _ error) {
3835
_, r, err := s.Execute(ctx, q, opts...)
@@ -74,40 +71,20 @@ func (s *Session) ReadResultSet(ctx context.Context, q string, opts ...options.E
7471
return rs, nil
7572
}
7673

77-
func withSessionCheck(f func(*Session) bool) sessionOption {
78-
return func(s *Session) {
79-
s.checks = append(s.checks, f)
80-
}
81-
}
82-
83-
func createSession(
84-
ctx context.Context, client Ydb_Query_V1.QueryServiceClient, cfg *config.Config, opts ...sessionOption,
85-
) (s *Session, finalErr error) {
74+
func createSession(ctx context.Context, client Ydb_Query_V1.QueryServiceClient, cfg *config.Config) (
75+
s *Session, finalErr error,
76+
) {
8677
s = &Session{
8778
cfg: cfg,
8879
grpcClient: client,
8980
statusCode: statusUnknown,
90-
checks: []func(*Session) bool{
91-
func(s *Session) bool {
92-
switch s.status() {
93-
case statusIdle, statusInUse:
94-
return true
95-
default:
96-
return false
97-
}
98-
},
99-
},
10081
}
10182
defer func() {
10283
if finalErr != nil && s != nil {
10384
s.setStatus(statusError)
10485
}
10586
}()
10687

107-
for _, opt := range opts {
108-
opt(s)
109-
}
110-
11188
onDone := trace.QueryOnSessionCreate(s.cfg.Trace(), &ctx,
11289
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.createSession"),
11390
)

internal/table/client.go

Lines changed: 17 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -25,44 +25,34 @@ import (
2525
// sessionBuilder is the interface that holds logic of creating sessions.
2626
type sessionBuilder func(ctx context.Context) (*session, error)
2727

28-
type nodeChecker interface {
29-
HasNode(id uint32) bool
30-
}
31-
32-
type balancer interface {
33-
grpc.ClientConnInterface
34-
nodeChecker
35-
}
36-
37-
func New(ctx context.Context, balancer balancer, config *config.Config) *Client {
28+
func New(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config) *Client {
3829
onDone := trace.TableOnInit(config.Trace(), &ctx,
3930
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/table.New"),
4031
)
4132
defer func() {
4233
onDone(config.SizeLimit())
4334
}()
4435

45-
return newClient(ctx, balancer, func(ctx context.Context) (s *session, err error) {
46-
return newSession(ctx, balancer, config)
36+
return newClient(ctx, cc, func(ctx context.Context) (s *session, err error) {
37+
return newSession(ctx, cc, config)
4738
}, config)
4839
}
4940

5041
func newClient(
5142
ctx context.Context,
52-
balancer balancer,
43+
cc grpc.ClientConnInterface,
5344
builder sessionBuilder,
5445
config *config.Config,
5546
) *Client {
5647
c := &Client{
57-
clock: config.Clock(),
58-
config: config,
59-
cc: balancer,
60-
nodeChecker: balancer,
61-
build: builder,
62-
index: make(map[*session]sessionInfo),
63-
idle: list.New(),
64-
waitQ: list.New(),
65-
limit: config.SizeLimit(),
48+
clock: config.Clock(),
49+
config: config,
50+
cc: cc,
51+
build: builder,
52+
index: make(map[*session]sessionInfo),
53+
idle: list.New(),
54+
waitQ: list.New(),
55+
limit: config.SizeLimit(),
6656
waitChPool: sync.Pool{
6757
New: func() interface{} {
6858
ch := make(chan *session)
@@ -84,11 +74,10 @@ func newClient(
8474
// A Client is safe for use by multiple goroutines simultaneously.
8575
type Client struct {
8676
// read-only fields
87-
config *config.Config
88-
build sessionBuilder
89-
cc grpc.ClientConnInterface
90-
nodeChecker nodeChecker
91-
clock clockwork.Clock
77+
config *config.Config
78+
build sessionBuilder
79+
cc grpc.ClientConnInterface
80+
clock clockwork.Clock
9281

9382
// read-write fields
9483
mu xsync.Mutex
@@ -404,7 +393,7 @@ func (c *Client) internalPoolGet(ctx context.Context, opts ...getOption) (s *ses
404393
i++
405394
s = tryGetIdleSession(c)
406395
if s != nil {
407-
if !isValidNode(c, s) {
396+
if !s.isReady() {
408397
closeInvalidSession(ctx, s)
409398
s = nil
410399

@@ -437,11 +426,6 @@ func tryGetIdleSession(c *Client) *session {
437426
return s
438427
}
439428

440-
//nolint:interfacer
441-
func isValidNode(c *Client, s *session) bool {
442-
return c.nodeChecker == nil || c.nodeChecker.HasNode(s.NodeID())
443-
}
444-
445429
func closeInvalidSession(ctx context.Context, s *session) {
446430
_ = s.Close(ctx)
447431
}
@@ -599,9 +583,6 @@ func (c *Client) Put(ctx context.Context, s *session) (err error) {
599583
case s.isClosed():
600584
return xerrors.WithStackTrace(errSessionClosed)
601585

602-
case c.nodeChecker != nil && !c.nodeChecker.HasNode(s.NodeID()):
603-
return xerrors.WithStackTrace(errNodeIsNotObservable)
604-
605586
default:
606587
c.mu.Lock()
607588
defer c.mu.Unlock()

internal/table/client_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -868,17 +868,17 @@ type StubBuilder struct {
868868

869869
func newClientWithStubBuilder(
870870
t testing.TB,
871-
balancer balancer,
871+
cc grpc.ClientConnInterface,
872872
stubLimit int,
873873
options ...config.Option,
874874
) *Client {
875875
c := newClient(
876876
context.Background(),
877-
balancer,
877+
cc,
878878
(&StubBuilder{
879879
T: t,
880880
Limit: stubLimit,
881-
cc: balancer,
881+
cc: cc,
882882
}).createSession,
883883
config.New(options...),
884884
)

internal/table/errors.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,6 @@ var (
3232
// operation could not be completed.
3333
errNoProgress = xerrors.Wrap(errors.New("no progress"))
3434

35-
// errNodeIsNotObservable returned by a Client instance to indicate that required node is not observable
36-
errNodeIsNotObservable = xerrors.Wrap(errors.New("node is not observable"))
37-
3835
// errParamsRequired returned by a Client instance to indicate that required params is not defined
3936
errParamsRequired = xerrors.Wrap(errors.New("params required"))
4037
)

internal/table/session.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,10 @@ func (s *session) isClosing() bool {
113113
return s.Status() == table.SessionClosing
114114
}
115115

116+
func (s *session) isReady() bool {
117+
return s.Status() == table.SessionReady
118+
}
119+
116120
func newSession(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config) (
117121
s *session, err error,
118122
) {

testutil/driver.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -138,10 +138,6 @@ type balancerStub struct {
138138
) (grpc.ClientStream, error)
139139
}
140140

141-
func (b *balancerStub) HasNode(id uint32) bool {
142-
return true
143-
}
144-
145141
func (b *balancerStub) Invoke(
146142
ctx context.Context,
147143
method string,

0 commit comments

Comments
 (0)