Skip to content

Commit 9285ece

Browse files
committed
* Added balancers.WithEndpoint() context modifier for define per request the YDB endpoint by NodeID
1 parent e4d54bf commit 9285ece

File tree

8 files changed

+63
-49
lines changed

8 files changed

+63
-49
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
* Removed check the node is available for query and table service sessions
22
* Refactored the `balancers.PreferLocations()` function - it is a clean/pure function
3+
* Added `balancers.WithNodeID()` context modifier for define per request the YDB endpoint by NodeID
34

45
## v3.74.2
56
* Added description to scan errors with use query service client scanner

balancers/context.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package balancers
2+
3+
import (
4+
"context"
5+
6+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
7+
)
8+
9+
func WithNodeID(ctx context.Context, nodeID uint32) context.Context {
10+
return endpoint.WithNodeID(ctx, nodeID)
11+
}

internal/balancer/connections_state.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
balancerConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/config"
77
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
8+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
89
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xrand"
910
)
1011

@@ -73,8 +74,8 @@ func (s *connectionsState) GetConnection(ctx context.Context) (_ conn.Conn, fail
7374
}
7475

7576
func (s *connectionsState) preferConnection(ctx context.Context) conn.Conn {
76-
if e, hasPreferEndpoint := ContextEndpoint(ctx); hasPreferEndpoint {
77-
c := s.connByNodeID[e.NodeID()]
77+
if nodeID, hasPreferEndpoint := endpoint.ContextNodeID(ctx); hasPreferEndpoint {
78+
c := s.connByNodeID[nodeID]
7879
if c != nil && isOkConnection(c, true) {
7980
return c
8081
}

internal/balancer/connections_state_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
balancerConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/config"
1111
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
12+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
1213
"github.com/ydb-platform/ydb-go-sdk/v3/internal/mock"
1314
)
1415

@@ -447,7 +448,7 @@ func TestConnection(t *testing.T) {
447448
&mock.Conn{AddrField: "1", State: conn.Online, NodeIDField: 1},
448449
&mock.Conn{AddrField: "2", State: conn.Online, NodeIDField: 2},
449450
}, nil, balancerConfig.Info{}, false)
450-
c, failed := s.GetConnection(WithEndpoint(context.Background(), &mock.Endpoint{AddrField: "2", NodeIDField: 2}))
451+
c, failed := s.GetConnection(endpoint.WithNodeID(context.Background(), 2))
451452
require.Equal(t, &mock.Conn{AddrField: "2", State: conn.Online, NodeIDField: 2}, c)
452453
require.Equal(t, 0, failed)
453454
})
@@ -456,7 +457,7 @@ func TestConnection(t *testing.T) {
456457
&mock.Conn{AddrField: "1", State: conn.Online, NodeIDField: 1},
457458
&mock.Conn{AddrField: "2", State: conn.Unknown, NodeIDField: 2},
458459
}, nil, balancerConfig.Info{}, false)
459-
c, failed := s.GetConnection(WithEndpoint(context.Background(), &mock.Endpoint{AddrField: "2", NodeIDField: 2}))
460+
c, failed := s.GetConnection(endpoint.WithNodeID(context.Background(), 2))
460461
require.Equal(t, &mock.Conn{AddrField: "1", State: conn.Online, NodeIDField: 1}, c)
461462
require.Equal(t, 0, failed)
462463
})

internal/balancer/ctx.go

Lines changed: 0 additions & 23 deletions
This file was deleted.

internal/endpoint/context.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package endpoint
2+
3+
import "context"
4+
5+
type (
6+
ctxEndpointKey struct{}
7+
)
8+
9+
func WithNodeID(ctx context.Context, nodeID uint32) context.Context {
10+
return context.WithValue(ctx, ctxEndpointKey{}, nodeID)
11+
}
12+
13+
func ContextNodeID(ctx context.Context) (nodeID uint32, ok bool) {
14+
if nodeID, ok = ctx.Value(ctxEndpointKey{}).(uint32); ok {
15+
return nodeID, true
16+
}
17+
18+
return 0, false
19+
}

internal/endpoint/endpoint.go

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,27 +6,31 @@ import (
66
"time"
77
)
88

9-
type Info interface {
10-
NodeID() uint32
11-
Address() string
12-
Location() string
13-
LastUpdated() time.Time
14-
LoadFactor() float32
15-
16-
// Deprecated: LocalDC check "local" by compare endpoint location with discovery "selflocation" field.
17-
// It work good only if connection url always point to local dc.
18-
// Will be removed after Oct 2024.
19-
// Read about versioning policy: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#deprecated
20-
LocalDC() bool
21-
}
22-
23-
type Endpoint interface {
24-
Info
9+
type (
10+
NodeID interface {
11+
NodeID() uint32
12+
}
13+
Info interface {
14+
NodeID
15+
Address() string
16+
Location() string
17+
LastUpdated() time.Time
18+
LoadFactor() float32
19+
20+
// Deprecated: LocalDC check "local" by compare endpoint location with discovery "selflocation" field.
21+
// It work good only if connection url always point to local dc.
22+
// Will be removed after Oct 2024.
23+
// Read about versioning policy: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#deprecated
24+
LocalDC() bool
25+
}
26+
Endpoint interface {
27+
Info
2528

26-
String() string
27-
Copy() Endpoint
28-
Touch(opts ...Option)
29-
}
29+
String() string
30+
Copy() Endpoint
31+
Touch(opts ...Option)
32+
}
33+
)
3034

3135
type endpoint struct { //nolint:maligned
3236
mu sync.RWMutex

internal/table/session.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ import (
1717
"google.golang.org/grpc/metadata"
1818

1919
"github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator"
20-
balancerContext "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer"
2120
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
21+
balancerContext "github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
2222
"github.com/ydb-platform/ydb-go-sdk/v3/internal/feature"
2323
"github.com/ydb-platform/ydb-go-sdk/v3/internal/meta"
2424
"github.com/ydb-platform/ydb-go-sdk/v3/internal/operation"
@@ -159,7 +159,7 @@ func newSession(ctx context.Context, cc grpc.ClientConnInterface, config *config
159159
s.tableService = Ydb_Table_V1.NewTableServiceClient(
160160
conn.WithBeforeFunc(
161161
conn.WithContextModifier(cc, func(ctx context.Context) context.Context {
162-
return meta.WithTrailerCallback(balancerContext.WithEndpoint(ctx, s), s.checkCloseHint)
162+
return meta.WithTrailerCallback(balancerContext.WithNodeID(ctx, s.NodeID()), s.checkCloseHint)
163163
}),
164164
func() {
165165
s.lastUsage.Store(time.Now().Unix())

0 commit comments

Comments
 (0)