Skip to content

Commit df8e87e

Browse files
authored
[Refactor] Use cached member's clients (#1033)
1 parent 9c9e91a commit df8e87e

26 files changed

+98
-122
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
## [master](https://github.com/arangodb/kube-arangodb/tree/master) (N/A)
44
- (Bugfix) Ensure pod names not too long
5+
- (Refactor) Use cached member's clients
56

67
## [1.2.14](https://github.com/arangodb/kube-arangodb/tree/1.2.14) (2022-07-14)
78
- (Feature) Add ArangoSync TLS based rotation

pkg/apis/deployment/v1/deployment_status_members.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ import (
2424
"github.com/arangodb/kube-arangodb/pkg/util/errors"
2525
)
2626

27+
// MemberStatusFunc is a callback which is used to traverse a specific group of servers and check their status.
28+
type MemberStatusFunc func(group ServerGroup, list MemberStatusList) error
29+
2730
// DeploymentStatusMembers holds the member status of all server groups
2831
type DeploymentStatusMembers struct {
2932
Single MemberStatusList `json:"single,omitempty"`
@@ -81,11 +84,11 @@ func (ds DeploymentStatusMembers) ElementByID(id string) (MemberStatus, ServerGr
8184
// ForeachServerGroup calls the given callback for all server groups.
8285
// If the callback returns an error, this error is returned and the callback is
8386
// not called for the remaining groups.
84-
func (ds DeploymentStatusMembers) ForeachServerGroup(cb func(group ServerGroup, list MemberStatusList) error) error {
87+
func (ds DeploymentStatusMembers) ForeachServerGroup(cb MemberStatusFunc) error {
8588
return ds.ForeachServerInGroups(cb, AllServerGroups...)
8689
}
8790

88-
func (ds DeploymentStatusMembers) ForeachServerInGroups(cb func(group ServerGroup, list MemberStatusList) error, groups ...ServerGroup) error {
91+
func (ds DeploymentStatusMembers) ForeachServerInGroups(cb MemberStatusFunc, groups ...ServerGroup) error {
8992
for _, group := range groups {
9093
if err := ds.ForServerGroup(cb, group); err != nil {
9194
return err
@@ -95,7 +98,7 @@ func (ds DeploymentStatusMembers) ForeachServerInGroups(cb func(group ServerGrou
9598
return nil
9699
}
97100

98-
func (ds DeploymentStatusMembers) ForServerGroup(cb func(group ServerGroup, list MemberStatusList) error, group ServerGroup) error {
101+
func (ds DeploymentStatusMembers) ForServerGroup(cb MemberStatusFunc, group ServerGroup) error {
99102
switch group {
100103
case ServerGroupSingle:
101104
if err := cb(ServerGroupSingle, ds.Single); err != nil {

pkg/apis/deployment/v2alpha1/deployment_status_members.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ import (
2424
"github.com/arangodb/kube-arangodb/pkg/util/errors"
2525
)
2626

27+
// MemberStatusFunc is a callback which is used to traverse a specific group of servers and check their status.
28+
type MemberStatusFunc func(group ServerGroup, list MemberStatusList) error
29+
2730
// DeploymentStatusMembers holds the member status of all server groups
2831
type DeploymentStatusMembers struct {
2932
Single MemberStatusList `json:"single,omitempty"`
@@ -81,11 +84,11 @@ func (ds DeploymentStatusMembers) ElementByID(id string) (MemberStatus, ServerGr
8184
// ForeachServerGroup calls the given callback for all server groups.
8285
// If the callback returns an error, this error is returned and the callback is
8386
// not called for the remaining groups.
84-
func (ds DeploymentStatusMembers) ForeachServerGroup(cb func(group ServerGroup, list MemberStatusList) error) error {
87+
func (ds DeploymentStatusMembers) ForeachServerGroup(cb MemberStatusFunc) error {
8588
return ds.ForeachServerInGroups(cb, AllServerGroups...)
8689
}
8790

88-
func (ds DeploymentStatusMembers) ForeachServerInGroups(cb func(group ServerGroup, list MemberStatusList) error, groups ...ServerGroup) error {
91+
func (ds DeploymentStatusMembers) ForeachServerInGroups(cb MemberStatusFunc, groups ...ServerGroup) error {
8992
for _, group := range groups {
9093
if err := ds.ForServerGroup(cb, group); err != nil {
9194
return err
@@ -95,7 +98,7 @@ func (ds DeploymentStatusMembers) ForeachServerInGroups(cb func(group ServerGrou
9598
return nil
9699
}
97100

98-
func (ds DeploymentStatusMembers) ForServerGroup(cb func(group ServerGroup, list MemberStatusList) error, group ServerGroup) error {
101+
func (ds DeploymentStatusMembers) ForServerGroup(cb MemberStatusFunc, group ServerGroup) error {
99102
switch group {
100103
case ServerGroupSingle:
101104
if err := cb(ServerGroupSingle, ds.Single); err != nil {

pkg/deployment/context_impl.go

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -203,30 +203,6 @@ func (d *Deployment) GetAuthentication() conn.Auth {
203203
return d.clientCache.GetAuth()
204204
}
205205

206-
// GetAgencyClients returns a client connection for every agency member.
207-
func (d *Deployment) GetAgencyClients(ctx context.Context) ([]driver.Connection, error) {
208-
return d.GetAgencyClientsWithPredicate(ctx, nil)
209-
}
210-
211-
// GetAgencyClientsWithPredicate returns a client connection for every agency member.
212-
// If the given predicate is not nil, only agents are included where the given predicate returns true.
213-
func (d *Deployment) GetAgencyClientsWithPredicate(ctx context.Context, predicate func(id string) bool) ([]driver.Connection, error) {
214-
agencyMembers := d.status.last.Members.Agents
215-
result := make([]driver.Connection, 0, len(agencyMembers))
216-
for _, m := range agencyMembers {
217-
if predicate != nil && !predicate(m.ID) {
218-
continue
219-
}
220-
client, err := d.GetServerClient(ctx, api.ServerGroupAgents, m.ID)
221-
if err != nil {
222-
return nil, errors.WithStack(err)
223-
}
224-
conn := client.Connection()
225-
result = append(result, conn)
226-
}
227-
return result, nil
228-
}
229-
230206
// GetAgency returns a connection to the agency.
231207
func (d *Deployment) GetAgency(ctx context.Context, agencyIDs ...string) (agency.Agency, error) {
232208
return d.clientCache.GetAgency(ctx, agencyIDs...)
@@ -544,7 +520,7 @@ func (d *Deployment) EnableScalingCluster(ctx context.Context) error {
544520
return d.clusterScalingIntegration.EnableScalingCluster(ctx)
545521
}
546522

547-
// GetAgencyPlan returns agency plan
523+
// GetAgencyData returns agency plan.
548524
func (d *Deployment) GetAgencyData(ctx context.Context, i interface{}, keyParts ...string) error {
549525
a, err := d.GetAgency(ctx)
550526
if err != nil {

pkg/deployment/member/state.go

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,13 @@ import (
2626

2727
"github.com/rs/zerolog"
2828

29+
"github.com/arangodb/arangosync-client/client"
2930
"github.com/arangodb/go-driver"
3031

3132
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
3233
"github.com/arangodb/kube-arangodb/pkg/deployment/reconciler"
3334
"github.com/arangodb/kube-arangodb/pkg/logging"
35+
"github.com/arangodb/kube-arangodb/pkg/util/errors"
3436
"github.com/arangodb/kube-arangodb/pkg/util/globals"
3537
)
3638

@@ -40,6 +42,13 @@ type StateInspectorGetter interface {
4042

4143
type StateInspector interface {
4244
RefreshState(ctx context.Context, members api.DeploymentStatusMemberElements)
45+
46+
// GetMemberClient returns member connection to an ArangoDB server.
47+
GetMemberClient(id string) (driver.Client, error)
48+
49+
// GetMemberSyncClient returns member connection to an ArangoSync server.
50+
GetMemberSyncClient(id string) (client.API, error)
51+
4352
MemberState(id string) (State, bool)
4453

4554
Health() Health
@@ -164,6 +173,7 @@ func (s *stateInspector) fetchArangosyncMemberState(ctx context.Context, m api.D
164173
"arangosync-build": v.Build,
165174
},
166175
}
176+
state.syncClient = c
167177
}
168178
return state
169179
}
@@ -180,10 +190,43 @@ func (s *stateInspector) fetchServerMemberState(ctx context.Context, m api.Deplo
180190
state.NotReachableErr = err
181191
} else {
182192
state.Version = v
193+
state.client = c
183194
}
184195
return state
185196
}
186197

198+
// GetMemberClient returns member client to a server.
199+
func (s *stateInspector) GetMemberClient(id string) (driver.Client, error) {
200+
if state, ok := s.MemberState(id); ok {
201+
if state.NotReachableErr != nil {
202+
// ArangoDB client can be set, but it might be old value.
203+
return nil, state.NotReachableErr
204+
}
205+
206+
if state.client != nil {
207+
return state.client, nil
208+
}
209+
}
210+
211+
return nil, errors.Newf("failed to get ArangoDB member client: %s", id)
212+
}
213+
214+
// GetMemberSyncClient returns member client to a server.
215+
func (s *stateInspector) GetMemberSyncClient(id string) (client.API, error) {
216+
if state, ok := s.MemberState(id); ok {
217+
if state.NotReachableErr != nil {
218+
// ArangoSync client can be set, but it might be old value.
219+
return nil, state.NotReachableErr
220+
}
221+
222+
if state.syncClient != nil {
223+
return state.syncClient, nil
224+
}
225+
}
226+
227+
return nil, errors.Newf("failed to get ArangoSync member client: %s", id)
228+
}
229+
187230
func (s *stateInspector) MemberState(id string) (State, bool) {
188231
s.lock.Lock()
189232
defer s.lock.Unlock()
@@ -203,10 +246,16 @@ type Health struct {
203246
Error error
204247
}
205248

249+
// State describes a state of a member.
206250
type State struct {
251+
// NotReachableErr set to non-nil if a member is not reachable.
207252
NotReachableErr error
208-
253+
// Version of this specific member.
209254
Version driver.VersionInfo
255+
// client to this specific ArangoDB member.
256+
client driver.Client
257+
// client to this specific ArangoSync member.
258+
syncClient client.API
210259
}
211260

212261
func (s State) IsReachable() bool {

pkg/deployment/reconcile/action_context.go

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626

2727
core "k8s.io/api/core/v1"
2828

29-
"github.com/arangodb/arangosync-client/client"
3029
"github.com/arangodb/go-driver"
3130

3231
backupApi "github.com/arangodb/kube-arangodb/pkg/apis/backup/v1"
@@ -50,8 +49,7 @@ type ActionContext interface {
5049
reconciler.DeploymentPodRenderer
5150
reconciler.ArangoAgencyGet
5251
reconciler.DeploymentInfoGetter
53-
reconciler.DeploymentClient
54-
reconciler.DeploymentSyncClient
52+
reconciler.DeploymentDatabaseClient
5553

5654
member.StateInspectorGetter
5755

@@ -267,24 +265,6 @@ func (ac *actionContext) GetDatabaseClient(ctx context.Context) (driver.Client,
267265
return c, nil
268266
}
269267

270-
// GetServerClient returns a cached client for a specific server.
271-
func (ac *actionContext) GetServerClient(ctx context.Context, group api.ServerGroup, id string) (driver.Client, error) {
272-
c, err := ac.context.GetServerClient(ctx, group, id)
273-
if err != nil {
274-
return nil, errors.WithStack(err)
275-
}
276-
return c, nil
277-
}
278-
279-
// GetSyncServerClient returns a cached client for a specific arangosync server.
280-
func (ac *actionContext) GetSyncServerClient(ctx context.Context, group api.ServerGroup, id string) (client.API, error) {
281-
c, err := ac.context.GetSyncServerClient(ctx, group, id)
282-
if err != nil {
283-
return nil, errors.WithStack(err)
284-
}
285-
return c, nil
286-
}
287-
288268
// GetMemberStatusByID returns the current member status
289269
// for the member with given id.
290270
// Returns member status, true when found, or false

pkg/deployment/reconcile/action_encryption_refresh.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,15 +55,14 @@ func (a *encryptionKeyRefreshAction) Start(ctx context.Context) (bool, error) {
5555
func (a *encryptionKeyRefreshAction) CheckProgress(ctx context.Context) (bool, bool, error) {
5656
ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx)
5757
defer cancel()
58-
keyfolder, err := a.actionCtx.ACS().CurrentClusterCache().Secret().V1().Read().Get(ctxChild, pod.GetEncryptionFolderSecretName(a.actionCtx.GetName()), meta.GetOptions{})
58+
keyFolder, err := a.actionCtx.ACS().CurrentClusterCache().Secret().V1().Read().Get(ctxChild,
59+
pod.GetEncryptionFolderSecretName(a.actionCtx.GetName()), meta.GetOptions{})
5960
if err != nil {
6061
a.log.Err(err).Error("Unable to fetch encryption folder")
6162
return true, false, nil
6263
}
6364

64-
ctxChild, cancel = globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
65-
defer cancel()
66-
c, err := a.actionCtx.GetServerClient(ctxChild, a.action.Group, a.action.MemberID)
65+
c, err := a.actionCtx.GetMembersState().GetMemberClient(a.action.MemberID)
6766
if err != nil {
6867
a.log.Err(err).Warn("Unable to get client")
6968
return true, false, nil
@@ -78,7 +77,7 @@ func (a *encryptionKeyRefreshAction) CheckProgress(ctx context.Context) (bool, b
7877
return true, false, nil
7978
}
8079

81-
if !e.Result.KeysPresent(keyfolder.Data) {
80+
if !e.Result.KeysPresent(keyFolder.Data) {
8281
return false, false, nil
8382
}
8483

pkg/deployment/reconcile/action_jwt_refresh.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,15 +56,13 @@ func (a *jwtRefreshAction) CheckProgress(ctx context.Context) (bool, bool, error
5656
return true, false, nil
5757
}
5858

59-
ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
60-
defer cancel()
61-
c, err := a.actionCtx.GetServerClient(ctxChild, a.action.Group, a.action.MemberID)
59+
c, err := a.actionCtx.GetMembersState().GetMemberClient(a.action.MemberID)
6260
if err != nil {
6361
a.log.Err(err).Warn("Unable to get client")
6462
return true, false, nil
6563
}
6664

67-
ctxChild, cancel = globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
65+
ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
6866
defer cancel()
6967
if invalid, err := isMemberJWTTokenInvalid(ctxChild, client.NewClient(c.Connection()), folder.Data, true); err != nil {
7068
a.log.Err(err).Warn("Error while getting JWT Status")

pkg/deployment/reconcile/action_remove_member.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ func (a *actionRemoveMember) Start(ctx context.Context) (bool, error) {
8686
if err := arangod.RemoveServerFromCluster(ctxChild, client.Connection(), driver.ServerID(m.ID)); err != nil {
8787
if !driver.IsNotFound(err) && !driver.IsPreconditionFailed(err) {
8888
a.log.Err(err).Str("member-id", m.ID).Error("Failed to remove server from cluster")
89-
// ignore this error, maybe all coordinators are failed and no connction to cluster is possible
89+
// ignore this error, maybe all coordinators are failed and no connection to cluster is possible
9090
} else if driver.IsPreconditionFailed(err) {
9191
health := a.actionCtx.GetMembersState().Health()
9292
if health.Error != nil {

pkg/deployment/reconcile/action_runtime_container_args_udpate.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -248,9 +248,7 @@ func (a actionRuntimeContainerArgsUpdate) setLogLevel(ctx context.Context, logLe
248248
return nil
249249
}
250250

251-
ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
252-
defer cancel()
253-
cli, err := a.actionCtx.GetServerClient(ctxChild, a.action.Group, a.action.MemberID)
251+
cli, err := a.actionCtx.GetMembersState().GetMemberClient(a.action.MemberID)
254252
if err != nil {
255253
return err
256254
}
@@ -265,7 +263,7 @@ func (a actionRuntimeContainerArgsUpdate) setLogLevel(ctx context.Context, logLe
265263
return err
266264
}
267265

268-
ctxChild, cancel = globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
266+
ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
269267
defer cancel()
270268
resp, err := conn.Do(ctxChild, req)
271269
if err != nil {

0 commit comments

Comments
 (0)