Skip to content

Commit c09ecaf

Browse files
authored
[Feature] Add InSync Cache (#1298)
1 parent f2675c5 commit c09ecaf

File tree

6 files changed

+260
-4
lines changed

6 files changed

+260
-4
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Change Log
22

33
## [master](https://github.com/arangodb/kube-arangodb/tree/master) (N/A)
4+
- (Feature) Add InSync Cache
45

56
## [1.2.26](https://github.com/arangodb/kube-arangodb/tree/1.2.26) (2023-04-18)
67
- (Bugfix) Fix manual overwrite for ReplicasCount in helm

pkg/deployment/agency/cache.go

Lines changed: 60 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//
22
// DISCLAIMER
33
//
4-
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
4+
// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany
55
//
66
// Licensed under the Apache License, Version 2.0 (the "License");
77
// you may not use this file except in compliance with the License.
@@ -23,6 +23,7 @@ package agency
2323
import (
2424
"context"
2525
"sync"
26+
"time"
2627

2728
"github.com/rs/zerolog"
2829

@@ -157,6 +158,8 @@ type Cache interface {
157158
CommitIndex() uint64
158159
// Health returns true when healthy object is available.
159160
Health() (Health, bool)
161+
// ShardsInSyncMap returns last in sync state of particular shard
162+
ShardsInSyncMap() (ShardsSyncStatus, bool)
160163
}
161164

162165
func NewCache(namespace, name string, mode *api.DeploymentMode) Cache {
@@ -169,8 +172,9 @@ func NewCache(namespace, name string, mode *api.DeploymentMode) Cache {
169172

170173
func NewAgencyCache(namespace, name string) Cache {
171174
c := &cache{
172-
namespace: namespace,
173-
name: name,
175+
namespace: namespace,
176+
name: name,
177+
shardsSyncStatus: ShardsSyncStatus{},
174178
}
175179

176180
c.log = logger.WrapObj(c)
@@ -185,6 +189,10 @@ func NewSingleCache() Cache {
185189
type cacheSingle struct {
186190
}
187191

192+
func (c cacheSingle) ShardsInSyncMap() (ShardsSyncStatus, bool) {
193+
return nil, false
194+
}
195+
188196
func (c cacheSingle) DataDB() (StateDB, bool) {
189197
return StateDB{}, false
190198
}
@@ -221,6 +229,8 @@ type cache struct {
221229
dataDB StateDB
222230

223231
health Health
232+
233+
shardsSyncStatus ShardsSyncStatus
224234
}
225235

226236
func (c *cache) WrapLogger(in *zerolog.Event) *zerolog.Event {
@@ -272,6 +282,38 @@ func (c *cache) Reload(ctx context.Context, size int, clients map[string]agency.
272282
c.lock.Lock()
273283
defer c.lock.Unlock()
274284

285+
index, err := c.reload(ctx, size, clients)
286+
if err != nil {
287+
return index, err
288+
}
289+
290+
if !c.valid {
291+
return index, nil
292+
}
293+
294+
// Refresh map of the shards
295+
shardNames := c.data.GetShardsStatus()
296+
297+
n := time.Now()
298+
299+
for k := range c.shardsSyncStatus {
300+
if _, ok := shardNames[k]; !ok {
301+
delete(c.shardsSyncStatus, k)
302+
}
303+
}
304+
305+
for k, v := range shardNames {
306+
if _, ok := c.shardsSyncStatus[k]; !ok {
307+
c.shardsSyncStatus[k] = n
308+
} else if v {
309+
c.shardsSyncStatus[k] = n
310+
}
311+
}
312+
313+
return index, nil
314+
}
315+
316+
func (c *cache) reload(ctx context.Context, size int, clients map[string]agency.Agency) (uint64, error) {
275317
leaderCli, leaderConfig, health, err := c.getLeader(ctx, size, clients)
276318
if err != nil {
277319
// Invalidate a leader ID and agency state.
@@ -304,6 +346,21 @@ func (c *cache) Reload(ctx context.Context, size int, clients map[string]agency.
304346
}
305347
}
306348

349+
func (c *cache) ShardsInSyncMap() (ShardsSyncStatus, bool) {
350+
c.lock.RLock()
351+
defer c.lock.RUnlock()
352+
353+
if !c.valid {
354+
return nil, false
355+
}
356+
357+
if c.shardsSyncStatus == nil {
358+
return nil, false
359+
}
360+
361+
return c.shardsSyncStatus, true
362+
}
363+
307364
// getLeader returns config and client to a leader agency, and health to check if agencies are on the same page.
308365
// If there is no quorum for the leader then error is returned.
309366
func (c *cache) getLeader(ctx context.Context, size int, clients map[string]agency.Agency) (agency.Agency, *Config, health, error) {

pkg/deployment/agency/server.go

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//
22
// DISCLAIMER
33
//
4-
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
4+
// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany
55
//
66
// Licensed under the Apache License, Version 2.0 (the "License");
77
// you may not use this file except in compliance with the License.
@@ -20,6 +20,8 @@
2020

2121
package agency
2222

23+
import "sort"
24+
2325
type Server string
2426

2527
type Servers []Server
@@ -45,3 +47,50 @@ func (s Servers) Join(ids Servers) Servers {
4547

4648
return r
4749
}
50+
51+
func (s Servers) Equals(ids Servers) bool {
52+
if len(ids) != len(s) {
53+
return false
54+
}
55+
56+
for id := range ids {
57+
if ids[id] != s[id] {
58+
return false
59+
}
60+
}
61+
62+
return true
63+
}
64+
65+
func (s Servers) Sort() {
66+
sort.Slice(s, func(i, j int) bool {
67+
return s[i] < s[j]
68+
})
69+
}
70+
71+
func (s Servers) InSync(ids Servers) bool {
72+
if len(s) != len(ids) {
73+
return false
74+
}
75+
76+
if len(s) == 0 {
77+
return false
78+
}
79+
80+
if s[0] != ids[0] {
81+
return false
82+
}
83+
84+
if len(s) > 1 {
85+
s[1:].Sort()
86+
ids[1:].Sort()
87+
88+
if s.Equals(ids) {
89+
return true
90+
} else {
91+
return false
92+
}
93+
}
94+
95+
return true
96+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
//
2+
// DISCLAIMER
3+
//
4+
// Copyright 2023 ArangoDB GmbH, Cologne, Germany
5+
//
6+
// Licensed under the Apache License, Version 2.0 (the "License");
7+
// you may not use this file except in compliance with the License.
8+
// You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing, software
13+
// distributed under the License is distributed on an "AS IS" BASIS,
14+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
// See the License for the specific language governing permissions and
16+
// limitations under the License.
17+
//
18+
// Copyright holder is ArangoDB GmbH, Cologne, Germany
19+
//
20+
21+
package agency
22+
23+
import "time"
24+
25+
type ShardsSyncStatus map[string]time.Time
26+
27+
func (s ShardsSyncStatus) NotInSyncSince(t time.Duration) []string {
28+
r := make([]string, 0, len(s))
29+
30+
for k, v := range s {
31+
if v.IsZero() {
32+
continue
33+
}
34+
35+
if time.Since(v) > t {
36+
r = append(r, k)
37+
}
38+
}
39+
40+
return r
41+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
//
2+
// DISCLAIMER
3+
//
4+
// Copyright 2023 ArangoDB GmbH, Cologne, Germany
5+
//
6+
// Licensed under the Apache License, Version 2.0 (the "License");
7+
// you may not use this file except in compliance with the License.
8+
// You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing, software
13+
// distributed under the License is distributed on an "AS IS" BASIS,
14+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
// See the License for the specific language governing permissions and
16+
// limitations under the License.
17+
//
18+
// Copyright holder is ArangoDB GmbH, Cologne, Germany
19+
//
20+
21+
package agency
22+
23+
import (
24+
"testing"
25+
26+
"github.com/stretchr/testify/require"
27+
)
28+
29+
func Test_ShardsInSync(t *testing.T) {
30+
s := State{
31+
Current: StateCurrent{
32+
Collections: map[string]StateCurrentDBCollections{
33+
"a": map[string]StateCurrentDBCollection{
34+
"a": map[string]StateCurrentDBShard{
35+
"s0001": {
36+
Servers: Servers{
37+
"A",
38+
"B",
39+
"C",
40+
},
41+
},
42+
},
43+
},
44+
},
45+
},
46+
}
47+
48+
t.Run("All in sync", func(t *testing.T) {
49+
require.True(t, s.IsShardInSync("a", "a", "s0001", Servers{"A", "B", "C"}))
50+
})
51+
52+
t.Run("InSync with random order", func(t *testing.T) {
53+
require.True(t, s.IsShardInSync("a", "a", "s0001", Servers{"A", "C", "B"}))
54+
})
55+
56+
t.Run("Invalid leader", func(t *testing.T) {
57+
require.False(t, s.IsShardInSync("a", "a", "s0001", Servers{"B", "A", "C"}))
58+
})
59+
60+
t.Run("Missing server", func(t *testing.T) {
61+
require.False(t, s.IsShardInSync("a", "a", "s0001", Servers{"A"}))
62+
})
63+
64+
t.Run("Missing db", func(t *testing.T) {
65+
require.False(t, s.IsShardInSync("a1", "a", "s0001", Servers{"A", "B", "C"}))
66+
})
67+
68+
t.Run("Missing col", func(t *testing.T) {
69+
require.False(t, s.IsShardInSync("a", "a1", "s0001", Servers{"A", "B", "C"}))
70+
})
71+
72+
t.Run("Missing shard", func(t *testing.T) {
73+
require.False(t, s.IsShardInSync("a", "a", "s00011", Servers{"A", "B", "C"}))
74+
})
75+
}

pkg/deployment/agency/state.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,39 @@ func (s State) GetDBServerWithLowestShards() Server {
167167
return resultServer
168168
}
169169

170+
func (s State) GetShardsStatus() map[string]bool {
171+
q := map[string]bool{}
172+
173+
for dName, d := range s.Plan.Collections {
174+
for cName, c := range d {
175+
for sName, shard := range c.Shards {
176+
q[sName] = s.IsShardInSync(dName, cName, sName, shard)
177+
}
178+
}
179+
}
180+
181+
return q
182+
}
183+
184+
func (s State) IsShardInSync(db, col, shard string, servers Servers) bool {
185+
dCurrent, ok := s.Current.Collections[db]
186+
if !ok {
187+
return false
188+
}
189+
190+
cCurrent, ok := dCurrent[col]
191+
if !ok {
192+
return false
193+
}
194+
195+
sCurrent, ok := cCurrent[shard]
196+
if !ok {
197+
return false
198+
}
199+
200+
return sCurrent.Servers.InSync(servers)
201+
}
202+
170203
// PlanServers returns all servers which are part of the plan
171204
func (s State) PlanServers() Servers {
172205
q := map[Server]bool{}

0 commit comments

Comments
 (0)