Skip to content

Commit 7b9c0b4

Browse files
authored
[Feature] Agency cache generic (#1329)
1 parent df97f13 commit 7b9c0b4

File tree

8 files changed

+64
-66
lines changed

8 files changed

+64
-66
lines changed

pkg/deployment/agency/cache.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ func NewAgencyCache(namespace, name string) Cache {
167167
}
168168

169169
c.log = logger.WrapObj(c)
170-
c.loader = getLoader()
170+
c.loader = getLoader[state.Root]()
171171

172172
return c
173173
}
@@ -211,7 +211,7 @@ type cache struct {
211211

212212
lock sync.RWMutex
213213

214-
loader StateLoader
214+
loader StateLoader[state.Root]
215215

216216
health Health
217217

@@ -382,6 +382,10 @@ func (c *cache) getLeader(ctx context.Context, size int, clients Connections) (c
382382
h.election = make(map[string]int, len(clients))
383383

384384
for id := range configs {
385+
if err := errs[id]; err != nil {
386+
c.log.Err(err).Str("agent", names[id]).Warn("Agent config request failed")
387+
}
388+
385389
if config := configs[id]; config != nil {
386390
name := config.Configuration.ID
387391
if name == h.names[id] {
@@ -390,6 +394,8 @@ func (c *cache) getLeader(ctx context.Context, size int, clients Connections) (c
390394
h.leaders[name] = config.LeaderId
391395
h.election[config.LeaderId]++
392396
h.leaderID = config.LeaderId
397+
} else {
398+
c.log.Str("agent", names[id]).Warn("Agent does not have leader")
393399
}
394400
}
395401
}

pkg/deployment/agency/loader.go

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -26,30 +26,29 @@ import (
2626
"time"
2727

2828
agencyCecheConfig "github.com/arangodb/kube-arangodb/pkg/deployment/agency/cache"
29-
"github.com/arangodb/kube-arangodb/pkg/deployment/agency/state"
3029
)
3130

32-
func getLoader() StateLoader {
33-
loader := getLoaderBase()
31+
func getLoader[T interface{}]() StateLoader[T] {
32+
loader := getLoaderBase[T]()
3433

35-
loader = InvalidateOnErrorLoader(loader)
34+
loader = InvalidateOnErrorLoader[T](loader)
3635

37-
loader = DelayLoader(loader, agencyCecheConfig.GlobalConfig().RefreshDelay)
38-
loader = RefreshLoader(loader, agencyCecheConfig.GlobalConfig().RefreshInterval)
36+
loader = DelayLoader[T](loader, agencyCecheConfig.GlobalConfig().RefreshDelay)
37+
loader = RefreshLoader[T](loader, agencyCecheConfig.GlobalConfig().RefreshInterval)
3938

4039
return loader
4140
}
4241

43-
func getLoaderBase() StateLoader {
42+
func getLoaderBase[T interface{}]() StateLoader[T] {
4443
if agencyCecheConfig.GlobalConfig().PollEnabled {
45-
return NewSimpleStateLoader()
44+
return NewSimpleStateLoader[T]()
4645
} else {
47-
return NewSimpleStateLoader()
46+
return NewSimpleStateLoader[T]()
4847
}
4948
}
5049

51-
type StateLoader interface {
52-
State() (*state.Root, uint64, bool)
50+
type StateLoader[T interface{}] interface {
51+
State() (*T, uint64, bool)
5352

5453
Invalidate()
5554
Valid() bool
@@ -59,35 +58,35 @@ type StateLoader interface {
5958
Refresh(ctx context.Context, discovery LeaderDiscovery) error
6059
}
6160

62-
func NewSimpleStateLoader() StateLoader {
63-
return &simpleStateLoader{}
61+
func NewSimpleStateLoader[T interface{}]() StateLoader[T] {
62+
return &simpleStateLoader[T]{}
6463
}
6564

66-
type simpleStateLoader struct {
65+
type simpleStateLoader[T interface{}] struct {
6766
lock sync.Mutex
6867

69-
state *state.Root
68+
state *T
7069
index uint64
7170
valid bool
7271

7372
updateTime time.Time
7473
}
7574

76-
func (s *simpleStateLoader) UpdateTime() time.Time {
75+
func (s *simpleStateLoader[T]) UpdateTime() time.Time {
7776
s.lock.Lock()
7877
defer s.lock.Unlock()
7978

8079
return s.updateTime
8180
}
8281

83-
func (s *simpleStateLoader) Valid() bool {
82+
func (s *simpleStateLoader[T]) Valid() bool {
8483
s.lock.Lock()
8584
defer s.lock.Unlock()
8685

8786
return s.valid
8887
}
8988

90-
func (s *simpleStateLoader) State() (*state.Root, uint64, bool) {
89+
func (s *simpleStateLoader[T]) State() (*T, uint64, bool) {
9190
s.lock.Lock()
9291
defer s.lock.Unlock()
9392

@@ -98,14 +97,14 @@ func (s *simpleStateLoader) State() (*state.Root, uint64, bool) {
9897
return s.state, s.index, true
9998
}
10099

101-
func (s *simpleStateLoader) Invalidate() {
100+
func (s *simpleStateLoader[T]) Invalidate() {
102101
s.lock.Lock()
103102
defer s.lock.Unlock()
104103

105104
s.valid = false
106105
}
107106

108-
func (s *simpleStateLoader) Refresh(ctx context.Context, discovery LeaderDiscovery) error {
107+
func (s *simpleStateLoader[T]) Refresh(ctx context.Context, discovery LeaderDiscovery) error {
109108
s.lock.Lock()
110109
defer s.lock.Unlock()
111110

@@ -121,7 +120,7 @@ func (s *simpleStateLoader) Refresh(ctx context.Context, discovery LeaderDiscove
121120

122121
if s.index != cfg.CommitIndex {
123122
// Full reload
124-
state, err := GetAgencyState(ctx, conn)
123+
state, err := GetAgencyState[T](ctx, conn)
125124
if err != nil {
126125
return err
127126
}

pkg/deployment/agency/loader_delayer.go

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,59 +24,57 @@ import (
2424
"context"
2525
"sync"
2626
"time"
27-
28-
"github.com/arangodb/kube-arangodb/pkg/deployment/agency/state"
2927
)
3028

31-
func DelayLoader(loader StateLoader, delay time.Duration) StateLoader {
29+
func DelayLoader[T interface{}](loader StateLoader[T], delay time.Duration) StateLoader[T] {
3230
if delay <= 0 {
3331
return loader
3432
}
3533

36-
return &delayerLoader{
34+
return &delayerLoader[T]{
3735
parent: loader,
3836
delay: delay,
3937
}
4038
}
4139

42-
type delayerLoader struct {
40+
type delayerLoader[T interface{}] struct {
4341
lock sync.Mutex
4442

4543
last time.Time
4644
delay time.Duration
4745

48-
parent StateLoader
46+
parent StateLoader[T]
4947
}
5048

51-
func (i *delayerLoader) UpdateTime() time.Time {
49+
func (i *delayerLoader[T]) UpdateTime() time.Time {
5250
i.lock.Lock()
5351
defer i.lock.Unlock()
5452

5553
return i.parent.UpdateTime()
5654
}
5755

58-
func (i *delayerLoader) Valid() bool {
56+
func (i *delayerLoader[T]) Valid() bool {
5957
i.lock.Lock()
6058
defer i.lock.Unlock()
6159

6260
return i.parent.Valid()
6361
}
6462

65-
func (i *delayerLoader) State() (*state.Root, uint64, bool) {
63+
func (i *delayerLoader[T]) State() (*T, uint64, bool) {
6664
i.lock.Lock()
6765
defer i.lock.Unlock()
6866

6967
return i.parent.State()
7068
}
7169

72-
func (i *delayerLoader) Invalidate() {
70+
func (i *delayerLoader[T]) Invalidate() {
7371
i.lock.Lock()
7472
defer i.lock.Unlock()
7573

7674
i.parent.Invalidate()
7775
}
7876

79-
func (i *delayerLoader) Refresh(ctx context.Context, discovery LeaderDiscovery) error {
77+
func (i *delayerLoader[T]) Refresh(ctx context.Context, discovery LeaderDiscovery) error {
8078
i.lock.Lock()
8179
defer i.lock.Unlock()
8280

pkg/deployment/agency/loader_invalidate.go

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,51 +24,49 @@ import (
2424
"context"
2525
"sync"
2626
"time"
27-
28-
"github.com/arangodb/kube-arangodb/pkg/deployment/agency/state"
2927
)
3028

31-
func InvalidateOnErrorLoader(loader StateLoader) StateLoader {
32-
return &invalidateOnErrorLoader{
29+
func InvalidateOnErrorLoader[T interface{}](loader StateLoader[T]) StateLoader[T] {
30+
return &invalidateOnErrorLoader[T]{
3331
parent: loader,
3432
}
3533
}
3634

37-
type invalidateOnErrorLoader struct {
35+
type invalidateOnErrorLoader[T interface{}] struct {
3836
lock sync.Mutex
3937

40-
parent StateLoader
38+
parent StateLoader[T]
4139
}
4240

43-
func (i *invalidateOnErrorLoader) UpdateTime() time.Time {
41+
func (i *invalidateOnErrorLoader[T]) UpdateTime() time.Time {
4442
i.lock.Lock()
4543
defer i.lock.Unlock()
4644

4745
return i.parent.UpdateTime()
4846
}
4947

50-
func (i *invalidateOnErrorLoader) Valid() bool {
48+
func (i *invalidateOnErrorLoader[T]) Valid() bool {
5149
i.lock.Lock()
5250
defer i.lock.Unlock()
5351

5452
return i.parent.Valid()
5553
}
5654

57-
func (i *invalidateOnErrorLoader) State() (*state.Root, uint64, bool) {
55+
func (i *invalidateOnErrorLoader[T]) State() (*T, uint64, bool) {
5856
i.lock.Lock()
5957
defer i.lock.Unlock()
6058

6159
return i.parent.State()
6260
}
6361

64-
func (i *invalidateOnErrorLoader) Invalidate() {
62+
func (i *invalidateOnErrorLoader[T]) Invalidate() {
6563
i.lock.Lock()
6664
defer i.lock.Unlock()
6765

6866
i.parent.Invalidate()
6967
}
7068

71-
func (i *invalidateOnErrorLoader) Refresh(ctx context.Context, discovery LeaderDiscovery) (err error) {
69+
func (i *invalidateOnErrorLoader[T]) Refresh(ctx context.Context, discovery LeaderDiscovery) (err error) {
7270
i.lock.Lock()
7371
defer i.lock.Unlock()
7472

pkg/deployment/agency/loader_refresher.go

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,59 +24,57 @@ import (
2424
"context"
2525
"sync"
2626
"time"
27-
28-
"github.com/arangodb/kube-arangodb/pkg/deployment/agency/state"
2927
)
3028

31-
func RefreshLoader(loader StateLoader, delay time.Duration) StateLoader {
29+
func RefreshLoader[T interface{}](loader StateLoader[T], delay time.Duration) StateLoader[T] {
3230
if delay <= 0 {
3331
return loader
3432
}
3533

36-
return &refresherLoader{
34+
return &refresherLoader[T]{
3735
parent: loader,
3836
delay: delay,
3937
}
4038
}
4139

42-
type refresherLoader struct {
40+
type refresherLoader[T interface{}] struct {
4341
lock sync.Mutex
4442

4543
last time.Time
4644
delay time.Duration
4745

48-
parent StateLoader
46+
parent StateLoader[T]
4947
}
5048

51-
func (i *refresherLoader) UpdateTime() time.Time {
49+
func (i *refresherLoader[T]) UpdateTime() time.Time {
5250
i.lock.Lock()
5351
defer i.lock.Unlock()
5452

5553
return i.parent.UpdateTime()
5654
}
5755

58-
func (i *refresherLoader) Valid() bool {
56+
func (i *refresherLoader[T]) Valid() bool {
5957
i.lock.Lock()
6058
defer i.lock.Unlock()
6159

6260
return i.parent.Valid()
6361
}
6462

65-
func (i *refresherLoader) State() (*state.Root, uint64, bool) {
63+
func (i *refresherLoader[T]) State() (*T, uint64, bool) {
6664
i.lock.Lock()
6765
defer i.lock.Unlock()
6866

6967
return i.parent.State()
7068
}
7169

72-
func (i *refresherLoader) Invalidate() {
70+
func (i *refresherLoader[T]) Invalidate() {
7371
i.lock.Lock()
7472
defer i.lock.Unlock()
7573

7674
i.parent.Invalidate()
7775
}
7876

79-
func (i *refresherLoader) Refresh(ctx context.Context, discovery LeaderDiscovery) error {
77+
func (i *refresherLoader[T]) Refresh(ctx context.Context, discovery LeaderDiscovery) error {
8078
i.lock.Lock()
8179
defer i.lock.Unlock()
8280

pkg/deployment/agency/state.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,27 +24,28 @@ import (
2424
"context"
2525
"net/http"
2626

27-
"github.com/arangodb/kube-arangodb/pkg/deployment/agency/state"
2827
"github.com/arangodb/kube-arangodb/pkg/util/arangod/conn"
2928
"github.com/arangodb/kube-arangodb/pkg/util/errors"
3029
)
3130

32-
func GetAgencyState(ctx context.Context, connection conn.Connection) (state.Root, error) {
33-
resp, code, err := conn.NewExecutor[ReadRequest, state.Roots](connection).Execute(ctx, http.MethodPost, "/_api/agency/read", GetAgencyReadRequestFields())
31+
func GetAgencyState[T interface{}](ctx context.Context, connection conn.Connection) (T, error) {
32+
var def T
33+
34+
resp, code, err := conn.NewExecutor[ReadRequest, []T](connection).Execute(ctx, http.MethodPost, "/_api/agency/read", GetAgencyReadRequestFields())
3435
if err != nil {
35-
return state.Root{}, err
36+
return def, err
3637
}
3738

3839
if code != http.StatusOK {
39-
return state.Root{}, errors.Newf("Unknown response code %d", code)
40+
return def, errors.Newf("Unknown response code %d", code)
4041
}
4142

4243
if resp == nil {
43-
return state.Root{}, errors.Newf("Missing response body")
44+
return def, errors.Newf("Missing response body")
4445
}
4546

4647
if len(*resp) != 1 {
47-
return state.Root{}, errors.Newf("Invalid response size")
48+
return def, errors.Newf("Invalid response size")
4849
}
4950

5051
return (*resp)[0], nil

pkg/deployment/agency/state/state.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@
2020

2121
package state
2222

23-
type Roots []Root
24-
2523
type Root struct {
2624
Arango State `json:"arango"`
2725
ArangoDB DB `json:"arangodb,omitempty"`

0 commit comments

Comments
 (0)