Skip to content

Commit df97f13

Browse files
authored
[Feature] Extract AgencyCache Interface (#1328)
1 parent 1ad4251 commit df97f13

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

64 files changed

+1065
-586
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Change Log
22

33
## [master](https://github.com/arangodb/kube-arangodb/tree/master) (N/A)
4+
- (Feature) AgencyCache Interface
45

56
## [1.2.29](https://github.com/arangodb/kube-arangodb/tree/1.2.29) (2023-06-08)
67
- (Maintenance) Add govulncheck to pipeline, update golangci-linter

pkg/deployment/agency/cache.go

Lines changed: 45 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/rs/zerolog"
2929

3030
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
31+
"github.com/arangodb/kube-arangodb/pkg/deployment/agency/state"
3132
"github.com/arangodb/kube-arangodb/pkg/generated/metric_descriptions"
3233
"github.com/arangodb/kube-arangodb/pkg/logging"
3334
"github.com/arangodb/kube-arangodb/pkg/util/arangod/conn"
@@ -141,13 +142,13 @@ type Health interface {
141142

142143
type Cache interface {
143144
Reload(ctx context.Context, size int, clients Connections) (uint64, error)
144-
Data() (State, bool)
145-
DataDB() (StateDB, bool)
145+
Data() (state.State, bool)
146+
DataDB() (state.DB, bool)
146147
CommitIndex() uint64
147148
// Health returns true when healthy object is available.
148149
Health() (Health, bool)
149150
// ShardsInSyncMap returns last in sync state of shards. If no state is available, false is returned.
150-
ShardsInSyncMap() (ShardsSyncStatus, bool)
151+
ShardsInSyncMap() (state.ShardsSyncStatus, bool)
151152
}
152153

153154
func NewCache(namespace, name string, mode *api.DeploymentMode) Cache {
@@ -162,10 +163,11 @@ func NewAgencyCache(namespace, name string) Cache {
162163
c := &cache{
163164
namespace: namespace,
164165
name: name,
165-
shardsSyncStatus: ShardsSyncStatus{},
166+
shardsSyncStatus: state.ShardsSyncStatus{},
166167
}
167168

168169
c.log = logger.WrapObj(c)
170+
c.loader = getLoader()
169171

170172
return c
171173
}
@@ -177,12 +179,12 @@ func NewSingleCache() Cache {
177179
type cacheSingle struct {
178180
}
179181

180-
func (c cacheSingle) ShardsInSyncMap() (ShardsSyncStatus, bool) {
182+
func (c cacheSingle) ShardsInSyncMap() (state.ShardsSyncStatus, bool) {
181183
return nil, false
182184
}
183185

184-
func (c cacheSingle) DataDB() (StateDB, bool) {
185-
return StateDB{}, false
186+
func (c cacheSingle) DataDB() (state.DB, bool) {
187+
return state.DB{}, false
186188
}
187189

188190
func (c cacheSingle) CommitIndex() uint64 {
@@ -198,8 +200,8 @@ func (c cacheSingle) Reload(_ context.Context, _ int, _ Connections) (uint64, er
198200
return 0, nil
199201
}
200202

201-
func (c cacheSingle) Data() (State, bool) {
202-
return State{}, true
203+
func (c cacheSingle) Data() (state.State, bool) {
204+
return state.State{}, true
203205
}
204206

205207
type cache struct {
@@ -209,16 +211,11 @@ type cache struct {
209211

210212
lock sync.RWMutex
211213

212-
valid bool
213-
214-
commitIndex uint64
215-
216-
data State
217-
dataDB StateDB
214+
loader StateLoader
218215

219216
health Health
220217

221-
shardsSyncStatus ShardsSyncStatus
218+
shardsSyncStatus state.ShardsSyncStatus
222219
}
223220

224221
func (c *cache) WrapLogger(in *zerolog.Event) *zerolog.Event {
@@ -229,29 +226,32 @@ func (c *cache) CommitIndex() uint64 {
229226
c.lock.RLock()
230227
defer c.lock.RUnlock()
231228

232-
return c.commitIndex
229+
_, index, _ := c.loader.State()
230+
return index
233231
}
234232

235-
func (c *cache) Data() (State, bool) {
233+
func (c *cache) Data() (state.State, bool) {
236234
c.lock.RLock()
237235
defer c.lock.RUnlock()
238236

239-
if !c.valid {
240-
return State{}, false
237+
data, _, ok := c.loader.State()
238+
if ok {
239+
return data.Arango, true
241240
}
242241

243-
return c.data, true
242+
return state.State{}, false
244243
}
245244

246-
func (c *cache) DataDB() (StateDB, bool) {
245+
func (c *cache) DataDB() (state.DB, bool) {
247246
c.lock.RLock()
248247
defer c.lock.RUnlock()
249248

250-
if !c.valid {
251-
return StateDB{}, false
249+
data, _, ok := c.loader.State()
250+
if ok {
251+
return data.ArangoDB, true
252252
}
253253

254-
return c.dataDB, c.valid
254+
return state.DB{}, false
255255
}
256256

257257
// Health returns always false for single cache.
@@ -270,17 +270,13 @@ func (c *cache) Reload(ctx context.Context, size int, clients Connections) (uint
270270
c.lock.Lock()
271271
defer c.lock.Unlock()
272272

273-
index, err := c.reload(ctx, size, clients)
273+
data, index, err := c.reload(ctx, size, clients)
274274
if err != nil {
275275
return index, err
276276
}
277277

278-
if !c.valid {
279-
return index, nil
280-
}
281-
282278
// Refresh map of the shards
283-
shardNames := c.data.GetShardsStatus()
279+
shardNames := data.Arango.GetShardsStatus()
284280

285281
n := time.Now()
286282

@@ -301,44 +297,39 @@ func (c *cache) Reload(ctx context.Context, size int, clients Connections) (uint
301297
return index, nil
302298
}
303299

304-
func (c *cache) reload(ctx context.Context, size int, clients Connections) (uint64, error) {
305-
leaderCli, leaderConfig, health, err := c.getLeader(ctx, size, clients)
300+
func (c *cache) reload(ctx context.Context, size int, clients Connections) (*state.Root, uint64, error) {
301+
leaderCli, health, err := c.getLeader(ctx, size, clients)
306302
if err != nil {
307303
// Invalidate a leader ID and agency state.
308304
// In the next iteration leaderID will be sat because `valid` will be false.
309-
c.valid = false
305+
c.loader.Invalidate()
310306
c.health = nil
311307

312-
return 0, err
308+
return nil, 0, err
313309
}
314310

315311
health.namespace = c.namespace
316312
health.name = c.name
317313

318314
c.health = health
319-
if leaderConfig.CommitIndex == c.commitIndex && c.valid {
320-
// We are on same index, nothing to do
321-
return leaderConfig.CommitIndex, nil
315+
316+
if err := c.loader.Refresh(ctx, StaticLeaderDiscovery(leaderCli)); err != nil {
317+
return nil, 0, err
322318
}
323319

324-
// A leader should be known even if an agency state is invalid.
325-
if data, err := c.loadState(ctx, leaderCli); err != nil {
326-
c.valid = false
327-
return leaderConfig.CommitIndex, err
328-
} else {
329-
c.data = data.Arango
330-
c.dataDB = data.ArangoDB
331-
c.valid = true
332-
c.commitIndex = leaderConfig.CommitIndex
333-
return leaderConfig.CommitIndex, nil
320+
data, index, ok := c.loader.State()
321+
if !ok {
322+
return nil, 0, errors.Newf("State is invalid after reload")
334323
}
324+
325+
return data, index, nil
335326
}
336327

337-
func (c *cache) ShardsInSyncMap() (ShardsSyncStatus, bool) {
328+
func (c *cache) ShardsInSyncMap() (state.ShardsSyncStatus, bool) {
338329
c.lock.RLock()
339330
defer c.lock.RUnlock()
340331

341-
if !c.valid {
332+
if !c.loader.Valid() {
342333
return nil, false
343334
}
344335

@@ -351,7 +342,7 @@ func (c *cache) ShardsInSyncMap() (ShardsSyncStatus, bool) {
351342

352343
// getLeader returns config and client to a leader agency, and health to check if agencies are on the same page.
353344
// If there is no quorum for the leader then error is returned.
354-
func (c *cache) getLeader(ctx context.Context, size int, clients Connections) (conn.Connection, *Config, health, error) {
345+
func (c *cache) getLeader(ctx context.Context, size int, clients Connections) (conn.Connection, health, error) {
355346
configs := make([]*Config, len(clients))
356347
errs := make([]error, len(clients))
357348
names := make([]string, 0, len(clients))
@@ -406,7 +397,7 @@ func (c *cache) getLeader(ctx context.Context, size int, clients Connections) (c
406397

407398
if err := h.Serving(); err != nil {
408399
c.log.Err(err).Warn("Agency Not serving")
409-
return nil, nil, h, err
400+
return nil, h, err
410401
}
411402

412403
if err := h.Healthy(); err != nil {
@@ -416,10 +407,10 @@ func (c *cache) getLeader(ctx context.Context, size int, clients Connections) (c
416407
for id := range names {
417408
if h.leaderID == h.names[id] {
418409
if cfg := configs[id]; cfg != nil {
419-
return clients[names[id]], cfg, h, nil
410+
return clients[names[id]], h, nil
420411
}
421412
}
422413
}
423414

424-
return nil, nil, h, errors.Newf("Unable to find agent")
415+
return nil, h, errors.Newf("Unable to find agent")
425416
}

pkg/deployment/agency/cache/config.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,19 @@ func Init(cmd *cobra.Command) error {
4343
}
4444

4545
f.DurationVar(&global.RefreshDelay, "agency.refresh-delay", util.BoolSwitch(ee, 500*time.Millisecond, 0), "The Agency refresh delay (0 = no delay)")
46+
f.DurationVar(&global.RefreshDelay, "agency.refresh-interval", 0, "The Agency refresh interval (0 = do not refresh)")
4647

4748
return nil
4849
}
4950

5051
var global Config
5152

53+
func GlobalConfig() Config {
54+
return global
55+
}
56+
5257
type Config struct {
53-
PollEnabled bool
54-
RefreshDelay time.Duration
58+
PollEnabled bool
59+
RefreshDelay time.Duration
60+
RefreshInterval time.Duration
5561
}

pkg/deployment/agency/config_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//
22
// DISCLAIMER
33
//
4-
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
4+
// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany
55
//
66
// Licensed under the Apache License, Version 2.0 (the "License");
77
// you may not use this file except in compliance with the License.
@@ -28,7 +28,7 @@ import (
2828
"github.com/stretchr/testify/require"
2929
)
3030

31-
//go:embed testdata/config.json
31+
//go:embed state/testdata/config.json
3232
var config []byte
3333

3434
func Test_Config_Unmarshal(t *testing.T) {

pkg/deployment/agency/leader.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
//
2+
// DISCLAIMER
3+
//
4+
// Copyright 2023 ArangoDB GmbH, Cologne, Germany
5+
//
6+
// Licensed under the Apache License, Version 2.0 (the "License");
7+
// you may not use this file except in compliance with the License.
8+
// You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing, software
13+
// distributed under the License is distributed on an "AS IS" BASIS,
14+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
// See the License for the specific language governing permissions and
16+
// limitations under the License.
17+
//
18+
// Copyright holder is ArangoDB GmbH, Cologne, Germany
19+
//
20+
21+
package agency
22+
23+
import (
24+
"context"
25+
26+
"github.com/arangodb/kube-arangodb/pkg/util/arangod/conn"
27+
)
28+
29+
type LeaderDiscovery interface {
30+
Discover(ctx context.Context) (conn.Connection, error)
31+
}
32+
33+
func StaticLeaderDiscovery(in conn.Connection) LeaderDiscovery {
34+
return staticLeaderDiscovery{conn: in}
35+
}
36+
37+
type staticLeaderDiscovery struct {
38+
conn conn.Connection
39+
}
40+
41+
func (s staticLeaderDiscovery) Discover(ctx context.Context) (conn.Connection, error) {
42+
return s.conn, nil
43+
}

0 commit comments

Comments
 (0)