Skip to content

Commit dddd695

Browse files
committed
Merge branch 'master' into bugfix/abort-on-cleanout-failure
2 parents aa0e6ef + 5e3b732 commit dddd695

24 files changed

+375
-189
lines changed

pkg/apis/deployment/v1alpha/deployment_status_members.go

Lines changed: 48 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -75,23 +75,23 @@ func (ds DeploymentStatusMembers) ElementByID(id string) (MemberStatus, ServerGr
7575
// ForeachServerGroup calls the given callback for all server groups.
7676
// If the callback returns an error, this error is returned and the callback is
7777
// not called for the remaining groups.
78-
func (ds DeploymentStatusMembers) ForeachServerGroup(cb func(group ServerGroup, list *MemberStatusList) error) error {
79-
if err := cb(ServerGroupSingle, &ds.Single); err != nil {
78+
func (ds DeploymentStatusMembers) ForeachServerGroup(cb func(group ServerGroup, list MemberStatusList) error) error {
79+
if err := cb(ServerGroupSingle, ds.Single); err != nil {
8080
return maskAny(err)
8181
}
82-
if err := cb(ServerGroupAgents, &ds.Agents); err != nil {
82+
if err := cb(ServerGroupAgents, ds.Agents); err != nil {
8383
return maskAny(err)
8484
}
85-
if err := cb(ServerGroupDBServers, &ds.DBServers); err != nil {
85+
if err := cb(ServerGroupDBServers, ds.DBServers); err != nil {
8686
return maskAny(err)
8787
}
88-
if err := cb(ServerGroupCoordinators, &ds.Coordinators); err != nil {
88+
if err := cb(ServerGroupCoordinators, ds.Coordinators); err != nil {
8989
return maskAny(err)
9090
}
91-
if err := cb(ServerGroupSyncMasters, &ds.SyncMasters); err != nil {
91+
if err := cb(ServerGroupSyncMasters, ds.SyncMasters); err != nil {
9292
return maskAny(err)
9393
}
94-
if err := cb(ServerGroupSyncWorkers, &ds.SyncWorkers); err != nil {
94+
if err := cb(ServerGroupSyncWorkers, ds.SyncWorkers); err != nil {
9595
return maskAny(err)
9696
}
9797
return nil
@@ -137,22 +137,47 @@ func (ds DeploymentStatusMembers) MemberStatusByPVCName(pvcName string) (MemberS
137137
return MemberStatus{}, 0, false
138138
}
139139

140-
// UpdateMemberStatus updates the given status in the given group.
141-
func (ds *DeploymentStatusMembers) UpdateMemberStatus(status MemberStatus, group ServerGroup) error {
140+
// Add adds the given status in the given group.
141+
func (ds *DeploymentStatusMembers) Add(status MemberStatus, group ServerGroup) error {
142142
var err error
143143
switch group {
144144
case ServerGroupSingle:
145-
err = ds.Single.Update(status)
145+
err = ds.Single.add(status)
146146
case ServerGroupAgents:
147-
err = ds.Agents.Update(status)
147+
err = ds.Agents.add(status)
148148
case ServerGroupDBServers:
149-
err = ds.DBServers.Update(status)
149+
err = ds.DBServers.add(status)
150150
case ServerGroupCoordinators:
151-
err = ds.Coordinators.Update(status)
151+
err = ds.Coordinators.add(status)
152152
case ServerGroupSyncMasters:
153-
err = ds.SyncMasters.Update(status)
153+
err = ds.SyncMasters.add(status)
154154
case ServerGroupSyncWorkers:
155-
err = ds.SyncWorkers.Update(status)
155+
err = ds.SyncWorkers.add(status)
156+
default:
157+
return maskAny(errors.Wrapf(NotFoundError, "ServerGroup %d is not known", group))
158+
}
159+
if err != nil {
160+
return maskAny(err)
161+
}
162+
return nil
163+
}
164+
165+
// Update updates the given status in the given group.
166+
func (ds *DeploymentStatusMembers) Update(status MemberStatus, group ServerGroup) error {
167+
var err error
168+
switch group {
169+
case ServerGroupSingle:
170+
err = ds.Single.update(status)
171+
case ServerGroupAgents:
172+
err = ds.Agents.update(status)
173+
case ServerGroupDBServers:
174+
err = ds.DBServers.update(status)
175+
case ServerGroupCoordinators:
176+
err = ds.Coordinators.update(status)
177+
case ServerGroupSyncMasters:
178+
err = ds.SyncMasters.update(status)
179+
case ServerGroupSyncWorkers:
180+
err = ds.SyncWorkers.update(status)
156181
default:
157182
return maskAny(errors.Wrapf(NotFoundError, "ServerGroup %d is not known", group))
158183
}
@@ -168,17 +193,17 @@ func (ds *DeploymentStatusMembers) RemoveByID(id string, group ServerGroup) erro
168193
var err error
169194
switch group {
170195
case ServerGroupSingle:
171-
err = ds.Single.RemoveByID(id)
196+
err = ds.Single.removeByID(id)
172197
case ServerGroupAgents:
173-
err = ds.Agents.RemoveByID(id)
198+
err = ds.Agents.removeByID(id)
174199
case ServerGroupDBServers:
175-
err = ds.DBServers.RemoveByID(id)
200+
err = ds.DBServers.removeByID(id)
176201
case ServerGroupCoordinators:
177-
err = ds.Coordinators.RemoveByID(id)
202+
err = ds.Coordinators.removeByID(id)
178203
case ServerGroupSyncMasters:
179-
err = ds.SyncMasters.RemoveByID(id)
204+
err = ds.SyncMasters.removeByID(id)
180205
case ServerGroupSyncWorkers:
181-
err = ds.SyncWorkers.RemoveByID(id)
206+
err = ds.SyncWorkers.removeByID(id)
182207
default:
183208
return maskAny(errors.Wrapf(NotFoundError, "ServerGroup %d is not known", group))
184209
}
@@ -190,8 +215,8 @@ func (ds *DeploymentStatusMembers) RemoveByID(id string, group ServerGroup) erro
190215

191216
// AllMembersReady returns true when all members are in the Ready state.
192217
func (ds DeploymentStatusMembers) AllMembersReady() bool {
193-
if err := ds.ForeachServerGroup(func(group ServerGroup, list *MemberStatusList) error {
194-
for _, x := range *list {
218+
if err := ds.ForeachServerGroup(func(group ServerGroup, list MemberStatusList) error {
219+
for _, x := range list {
195220
if !x.Conditions.IsTrue(ConditionTypeReady) {
196221
return fmt.Errorf("not ready")
197222
}

pkg/apis/deployment/v1alpha/member_status_list.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func (l MemberStatusList) ElementByPVCName(pvcName string) (MemberStatus, bool)
7676

7777
// Add a member to the list.
7878
// Returns an AlreadyExistsError if the ID of the given member already exists.
79-
func (l *MemberStatusList) Add(m MemberStatus) error {
79+
func (l *MemberStatusList) add(m MemberStatus) error {
8080
src := *l
8181
for _, x := range src {
8282
if x.ID == m.ID {
@@ -89,7 +89,7 @@ func (l *MemberStatusList) Add(m MemberStatus) error {
8989

9090
// Update a member in the list.
9191
// Returns a NotFoundError if the ID of the given member cannot be found.
92-
func (l MemberStatusList) Update(m MemberStatus) error {
92+
func (l MemberStatusList) update(m MemberStatus) error {
9393
for i, x := range l {
9494
if x.ID == m.ID {
9595
l[i] = m
@@ -101,7 +101,7 @@ func (l MemberStatusList) Update(m MemberStatus) error {
101101

102102
// RemoveByID a member with given ID from the list.
103103
// Returns a NotFoundError if the ID of the given member cannot be found.
104-
func (l *MemberStatusList) RemoveByID(id string) error {
104+
func (l *MemberStatusList) removeByID(id string) error {
105105
src := *l
106106
for i, x := range src {
107107
if x.ID == id {
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
//
2+
// DISCLAIMER
3+
//
4+
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
5+
//
6+
// Licensed under the Apache License, Version 2.0 (the "License");
7+
// you may not use this file except in compliance with the License.
8+
// You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing, software
13+
// distributed under the License is distributed on an "AS IS" BASIS,
14+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
// See the License for the specific language governing permissions and
16+
// limitations under the License.
17+
//
18+
// Copyright holder is ArangoDB GmbH, Cologne, Germany
19+
//
20+
// Author Ewout Prangsma
21+
//
22+
23+
package v1alpha
24+
25+
import (
26+
"testing"
27+
28+
"github.com/stretchr/testify/assert"
29+
)
30+
31+
// TestMemberStatusList tests modifying a MemberStatusList.
32+
func TestMemberStatusList(t *testing.T) {
33+
list := &MemberStatusList{}
34+
m1 := MemberStatus{ID: "m1"}
35+
m2 := MemberStatus{ID: "m2"}
36+
m3 := MemberStatus{ID: "m3"}
37+
assert.Equal(t, 0, len(*list))
38+
39+
assert.NoError(t, list.add(m1))
40+
assert.Equal(t, 1, len(*list))
41+
42+
assert.NoError(t, list.add(m2))
43+
assert.NoError(t, list.add(m3))
44+
assert.Equal(t, 3, len(*list))
45+
46+
assert.Error(t, list.add(m2))
47+
assert.Equal(t, 3, len(*list))
48+
49+
assert.NoError(t, list.removeByID(m3.ID))
50+
assert.Equal(t, 2, len(*list))
51+
assert.False(t, list.ContainsID(m3.ID))
52+
assert.Equal(t, m1.ID, (*list)[0].ID)
53+
assert.Equal(t, m2.ID, (*list)[1].ID)
54+
55+
m2.PodName = "foo"
56+
assert.NoError(t, list.update(m2))
57+
assert.Equal(t, 2, len(*list))
58+
assert.True(t, list.ContainsID(m2.ID))
59+
x, found := list.ElementByPodName("foo")
60+
assert.True(t, found)
61+
assert.Equal(t, "foo", x.PodName)
62+
assert.Equal(t, m2.ID, x.ID)
63+
64+
assert.NoError(t, list.add(m3))
65+
assert.Equal(t, 3, len(*list))
66+
assert.Equal(t, m1.ID, (*list)[0].ID)
67+
assert.Equal(t, m2.ID, (*list)[1].ID)
68+
assert.Equal(t, m3.ID, (*list)[2].ID)
69+
}

pkg/deployment/cluster_scaling_integration.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func (ci *clusterScalingIntegration) ListenForClusterEvents(stopCh <-chan struct
7171
delay := time.Second * 2
7272

7373
// Is deployment in running state
74-
if ci.depl.status.Phase == api.DeploymentPhaseRunning {
74+
if ci.depl.GetPhase() == api.DeploymentPhaseRunning {
7575
// Update cluster with our state
7676
ctx := context.Background()
7777
safeToAskCluster, err := ci.updateClusterServerCount(ctx)

pkg/deployment/context_impl.go

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ package deployment
2424

2525
import (
2626
"context"
27+
"fmt"
2728

2829
"github.com/arangodb/arangosync/client"
2930
"github.com/arangodb/arangosync/tasks"
@@ -65,20 +66,44 @@ func (d *Deployment) GetNamespace() string {
6566
return d.apiObject.GetNamespace()
6667
}
6768

69+
// GetPhase returns the current phase of the deployment
70+
func (d *Deployment) GetPhase() api.DeploymentPhase {
71+
return d.status.last.Phase
72+
}
73+
6874
// GetSpec returns the current specification
6975
func (d *Deployment) GetSpec() api.DeploymentSpec {
7076
return d.apiObject.Spec
7177
}
7278

7379
// GetStatus returns the current status of the deployment
74-
func (d *Deployment) GetStatus() api.DeploymentStatus {
75-
return d.status
80+
// together with the current version of that status.
81+
func (d *Deployment) GetStatus() (api.DeploymentStatus, int32) {
82+
d.status.mutex.Lock()
83+
defer d.status.mutex.Unlock()
84+
85+
version := d.status.version
86+
return *d.status.last.DeepCopy(), version
7687
}
7788

7889
// UpdateStatus replaces the status of the deployment with the given status and
7990
// updates the resources in k8s.
80-
func (d *Deployment) UpdateStatus(status api.DeploymentStatus, force ...bool) error {
81-
d.status = status
91+
// If the given last version does not match the actual last version of the status object,
92+
// an error is returned.
93+
func (d *Deployment) UpdateStatus(status api.DeploymentStatus, lastVersion int32, force ...bool) error {
94+
d.status.mutex.Lock()
95+
defer d.status.mutex.Unlock()
96+
97+
if d.status.version != lastVersion {
98+
// Status is obsolete
99+
d.deps.Log.Error().
100+
Int32("expected-version", lastVersion).
101+
Int32("actual-version", d.status.version).
102+
Msg("UpdateStatus version conflict error.")
103+
return maskAny(fmt.Errorf("Status conflict error. Expected version %d, got %d", lastVersion, d.status.version))
104+
}
105+
d.status.version++
106+
d.status.last = *status.DeepCopy()
82107
if err := d.updateCRStatus(force...); err != nil {
83108
return maskAny(err)
84109
}
@@ -107,7 +132,7 @@ func (d *Deployment) GetServerClient(ctx context.Context, group api.ServerGroup,
107132
// GetAgencyClients returns a client connection for every agency member.
108133
// If the given predicate is not nil, only agents are included where the given predicate returns true.
109134
func (d *Deployment) GetAgencyClients(ctx context.Context, predicate func(id string) bool) ([]driver.Connection, error) {
110-
agencyMembers := d.status.Members.Agents
135+
agencyMembers := d.status.last.Members.Agents
111136
result := make([]driver.Connection, 0, len(agencyMembers))
112137
for _, m := range agencyMembers {
113138
if predicate != nil && !predicate(m.ID) {
@@ -168,13 +193,14 @@ func (d *Deployment) GetSyncServerClient(ctx context.Context, group api.ServerGr
168193
// If ID is non-empty, it will be used, otherwise a new ID is created.
169194
func (d *Deployment) CreateMember(group api.ServerGroup, id string) error {
170195
log := d.deps.Log
171-
id, err := d.createMember(group, id, d.apiObject)
196+
status, lastVersion := d.GetStatus()
197+
id, err := createMember(log, &status, group, id, d.apiObject)
172198
if err != nil {
173199
log.Debug().Err(err).Str("group", group.AsRole()).Msg("Failed to create member")
174200
return maskAny(err)
175201
}
176202
// Save added member
177-
if err := d.updateCRStatus(); err != nil {
203+
if err := d.UpdateStatus(status, lastVersion); err != nil {
178204
log.Debug().Err(err).Msg("Updating CR status failed")
179205
return maskAny(err)
180206
}

0 commit comments

Comments
 (0)