Skip to content

Commit 69a73b2

Browse files
committed
Avoid overwriting status changes
1 parent 16ada24 commit 69a73b2

22 files changed

+274
-137
lines changed
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
//
2+
// DISCLAIMER
3+
//
4+
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
5+
//
6+
// Licensed under the Apache License, Version 2.0 (the "License");
7+
// you may not use this file except in compliance with the License.
8+
// You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing, software
13+
// distributed under the License is distributed on an "AS IS" BASIS,
14+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
// See the License for the specific language governing permissions and
16+
// limitations under the License.
17+
//
18+
// Copyright holder is ArangoDB GmbH, Cologne, Germany
19+
//
20+
// Author Ewout Prangsma
21+
//
22+
23+
package v1alpha
24+
25+
import (
26+
"testing"
27+
28+
"github.com/stretchr/testify/assert"
29+
)
30+
31+
// TestMemberStatusList tests modifying a MemberStatusList.
32+
func TestMemberStatusList(t *testing.T) {
33+
list := &MemberStatusList{}
34+
m1 := MemberStatus{ID: "m1"}
35+
m2 := MemberStatus{ID: "m2"}
36+
m3 := MemberStatus{ID: "m3"}
37+
assert.Equal(t, 0, len(*list))
38+
39+
assert.NoError(t, list.Add(m1))
40+
assert.Equal(t, 1, len(*list))
41+
42+
assert.NoError(t, list.Add(m2))
43+
assert.NoError(t, list.Add(m3))
44+
assert.Equal(t, 3, len(*list))
45+
46+
assert.Error(t, list.Add(m2))
47+
assert.Equal(t, 3, len(*list))
48+
49+
assert.NoError(t, list.RemoveByID(m3.ID))
50+
assert.Equal(t, 2, len(*list))
51+
assert.False(t, list.ContainsID(m3.ID))
52+
53+
m2.PodName = "foo"
54+
assert.NoError(t, list.Update(m2))
55+
assert.Equal(t, 2, len(*list))
56+
assert.True(t, list.ContainsID(m2.ID))
57+
x, found := list.ElementByPodName("foo")
58+
assert.True(t, found)
59+
assert.Equal(t, "foo", x.PodName)
60+
assert.Equal(t, m2.ID, x.ID)
61+
}

pkg/deployment/cluster_scaling_integration.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func (ci *clusterScalingIntegration) ListenForClusterEvents(stopCh <-chan struct
7171
delay := time.Second * 2
7272

7373
// Is deployment in running state
74-
if ci.depl.status.Phase == api.DeploymentPhaseRunning {
74+
if ci.depl.GetPhase() == api.DeploymentPhaseRunning {
7575
// Update cluster with our state
7676
ctx := context.Background()
7777
safeToAskCluster, err := ci.updateClusterServerCount(ctx)

pkg/deployment/context_impl.go

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ package deployment
2424

2525
import (
2626
"context"
27+
"fmt"
28+
"sync/atomic"
2729

2830
"github.com/arangodb/arangosync/client"
2931
"github.com/arangodb/arangosync/tasks"
@@ -63,20 +65,31 @@ func (d *Deployment) GetNamespace() string {
6365
return d.apiObject.GetNamespace()
6466
}
6567

68+
// GetPhase returns the current phase of the deployment
69+
func (d *Deployment) GetPhase() api.DeploymentPhase {
70+
return d.status.last.Phase
71+
}
72+
6673
// GetSpec returns the current specification
6774
func (d *Deployment) GetSpec() api.DeploymentSpec {
6875
return d.apiObject.Spec
6976
}
7077

7178
// GetStatus returns the current status of the deployment
72-
func (d *Deployment) GetStatus() api.DeploymentStatus {
73-
return d.status
79+
// together with the current version of that status.
80+
func (d *Deployment) GetStatus() (api.DeploymentStatus, int32) {
81+
version := atomic.LoadInt32(&d.status.version)
82+
return *d.status.last.DeepCopy(), version
7483
}
7584

7685
// UpdateStatus replaces the status of the deployment with the given status and
7786
// updates the resources in k8s.
78-
func (d *Deployment) UpdateStatus(status api.DeploymentStatus, force ...bool) error {
79-
d.status = status
87+
func (d *Deployment) UpdateStatus(status api.DeploymentStatus, lastVersion int32, force ...bool) error {
88+
if !atomic.CompareAndSwapInt32(&d.status.version, lastVersion, lastVersion+1) {
89+
// Status is obsolete
90+
return maskAny(fmt.Errorf("Status conflict error. Expected version %d, got %d", lastVersion, d.status.version))
91+
}
92+
d.status.last = *status.DeepCopy()
8093
if err := d.updateCRStatus(force...); err != nil {
8194
return maskAny(err)
8295
}
@@ -105,7 +118,7 @@ func (d *Deployment) GetServerClient(ctx context.Context, group api.ServerGroup,
105118
// GetAgencyClients returns a client connection for every agency member.
106119
// If the given predicate is not nil, only agents are included where the given predicate returns true.
107120
func (d *Deployment) GetAgencyClients(ctx context.Context, predicate func(id string) bool) ([]driver.Connection, error) {
108-
agencyMembers := d.status.Members.Agents
121+
agencyMembers := d.status.last.Members.Agents
109122
result := make([]driver.Connection, 0, len(agencyMembers))
110123
for _, m := range agencyMembers {
111124
if predicate != nil && !predicate(m.ID) {
@@ -157,13 +170,14 @@ func (d *Deployment) GetSyncServerClient(ctx context.Context, group api.ServerGr
157170
// If ID is non-empty, it will be used, otherwise a new ID is created.
158171
func (d *Deployment) CreateMember(group api.ServerGroup, id string) error {
159172
log := d.deps.Log
160-
id, err := d.createMember(group, id, d.apiObject)
173+
status, lastVersion := d.GetStatus()
174+
id, err := createMember(log, &status, group, id, d.apiObject)
161175
if err != nil {
162176
log.Debug().Err(err).Str("group", group.AsRole()).Msg("Failed to create member")
163177
return maskAny(err)
164178
}
165179
// Save added member
166-
if err := d.updateCRStatus(); err != nil {
180+
if err := d.UpdateStatus(status, lastVersion); err != nil {
167181
log.Debug().Err(err).Msg("Updating CR status failed")
168182
return maskAny(err)
169183
}

pkg/deployment/deployment.go

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,12 @@ const (
8383
// Deployment is the in process state of an ArangoDeployment.
8484
type Deployment struct {
8585
apiObject *api.ArangoDeployment // API object
86-
status api.DeploymentStatus // Internal status of the CR
87-
config Config
88-
deps Dependencies
86+
status struct {
87+
version int32
88+
last api.DeploymentStatus // Internal status copy of the CR
89+
}
90+
config Config
91+
deps Dependencies
8992

9093
eventCh chan *deploymentEvent
9194
stopCh chan struct{}
@@ -112,20 +115,20 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoDeployment) (*De
112115
}
113116
d := &Deployment{
114117
apiObject: apiObject,
115-
status: *(apiObject.Status.DeepCopy()),
116118
config: config,
117119
deps: deps,
118120
eventCh: make(chan *deploymentEvent, deploymentEventQueueSize),
119121
stopCh: make(chan struct{}),
120122
eventsCli: deps.KubeCli.Core().Events(apiObject.GetNamespace()),
121123
clientCache: newClientCache(deps.KubeCli, apiObject),
122124
}
125+
d.status.last = *(apiObject.Status.DeepCopy())
123126
d.reconciler = reconcile.NewReconciler(deps.Log, d)
124127
d.resilience = resilience.NewResilience(deps.Log, d)
125128
d.resources = resources.NewResources(deps.Log, d)
126-
if d.status.AcceptedSpec == nil {
129+
if d.status.last.AcceptedSpec == nil {
127130
// We've validated the spec, so let's use it from now.
128-
d.status.AcceptedSpec = apiObject.Spec.DeepCopy()
131+
d.status.last.AcceptedSpec = apiObject.Spec.DeepCopy()
129132
}
130133

131134
go d.run()
@@ -185,7 +188,7 @@ func (d *Deployment) send(ev *deploymentEvent) {
185188
func (d *Deployment) run() {
186189
log := d.deps.Log
187190

188-
if d.status.Phase == api.DeploymentPhaseNone {
191+
if d.GetPhase() == api.DeploymentPhaseNone {
189192
// Create secrets
190193
if err := d.resources.EnsureSecrets(); err != nil {
191194
d.CreateEvent(k8sutil.NewErrorEvent("Failed to create secrets", err, d.GetAPIObject()))
@@ -211,8 +214,9 @@ func (d *Deployment) run() {
211214
d.CreateEvent(k8sutil.NewErrorEvent("Failed to create pods", err, d.GetAPIObject()))
212215
}
213216

214-
d.status.Phase = api.DeploymentPhaseRunning
215-
if err := d.updateCRStatus(); err != nil {
217+
status, lastVersion := d.GetStatus()
218+
status.Phase = api.DeploymentPhaseRunning
219+
if err := d.UpdateStatus(status, lastVersion); err != nil {
216220
log.Warn().Err(err).Msg("update initial CR status failed")
217221
}
218222
log.Info().Msg("start running...")
@@ -277,13 +281,14 @@ func (d *Deployment) handleArangoDeploymentUpdatedEvent() error {
277281
}
278282

279283
specBefore := d.apiObject.Spec
280-
if d.status.AcceptedSpec != nil {
281-
specBefore = *d.status.AcceptedSpec
284+
status := d.status.last
285+
if d.status.last.AcceptedSpec != nil {
286+
specBefore = *status.AcceptedSpec.DeepCopy()
282287
}
283288
newAPIObject := current.DeepCopy()
284289
newAPIObject.Spec.SetDefaultsFrom(specBefore)
285290
newAPIObject.Spec.SetDefaults(d.apiObject.GetName())
286-
newAPIObject.Status = d.status
291+
newAPIObject.Status = status
287292
resetFields := specBefore.ResetImmutableFields(&newAPIObject.Spec)
288293
if len(resetFields) > 0 {
289294
log.Debug().Strs("fields", resetFields).Msg("Found modified immutable fields")
@@ -309,9 +314,12 @@ func (d *Deployment) handleArangoDeploymentUpdatedEvent() error {
309314
return maskAny(fmt.Errorf("failed to update ArangoDeployment spec: %v", err))
310315
}
311316
// Save updated accepted spec
312-
d.status.AcceptedSpec = newAPIObject.Spec.DeepCopy()
313-
if err := d.updateCRStatus(); err != nil {
314-
return maskAny(fmt.Errorf("failed to update ArangoDeployment status: %v", err))
317+
{
318+
status, lastVersion := d.GetStatus()
319+
status.AcceptedSpec = newAPIObject.Spec.DeepCopy()
320+
if err := d.UpdateStatus(status, lastVersion); err != nil {
321+
return maskAny(fmt.Errorf("failed to update ArangoDeployment status: %v", err))
322+
}
315323
}
316324

317325
// Notify cluster of desired server count
@@ -351,7 +359,7 @@ func (d *Deployment) updateCRStatus(force ...bool) error {
351359
attempt := 0
352360
for {
353361
attempt++
354-
update.Status = d.status
362+
update.Status = d.status.last
355363
if update.GetDeletionTimestamp() == nil {
356364
ensureFinalizers(update)
357365
}
@@ -388,7 +396,7 @@ func (d *Deployment) updateCRSpec(newSpec api.DeploymentSpec) error {
388396
for {
389397
attempt++
390398
update.Spec = newSpec
391-
update.Status = d.status
399+
update.Status = d.status.last
392400
ns := d.apiObject.GetNamespace()
393401
newAPIObject, err := d.deps.DatabaseCRCli.DatabaseV1alpha().ArangoDeployments(ns).Update(update)
394402
if err == nil {
@@ -417,7 +425,7 @@ func (d *Deployment) updateCRSpec(newSpec api.DeploymentSpec) error {
417425
// Since there is no recovery from a failed deployment, use with care!
418426
func (d *Deployment) failOnError(err error, msg string) {
419427
log.Error().Err(err).Msg(msg)
420-
d.status.Reason = err.Error()
428+
d.status.last.Reason = err.Error()
421429
d.reportFailedStatus()
422430
}
423431

@@ -428,7 +436,7 @@ func (d *Deployment) reportFailedStatus() {
428436
log.Info().Msg("deployment failed. Reporting failed reason...")
429437

430438
op := func() error {
431-
d.status.Phase = api.DeploymentPhaseFailed
439+
d.status.last.Phase = api.DeploymentPhaseFailed
432440
err := d.updateCRStatus()
433441
if err == nil || k8sutil.IsNotFound(err) {
434442
// Status has been updated

pkg/deployment/deployment_inspector.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func (d *Deployment) inspectDeployment(lastInterval time.Duration) time.Duration
6060
}
6161
} else {
6262
// Is the deployment in failed state, if so, give up.
63-
if d.status.Phase == api.DeploymentPhaseFailed {
63+
if d.GetPhase() == api.DeploymentPhaseFailed {
6464
log.Debug().Msg("Deployment is in Failed state.")
6565
return nextInterval
6666
}
@@ -72,7 +72,8 @@ func (d *Deployment) inspectDeployment(lastInterval time.Duration) time.Duration
7272
}
7373

7474
// Is the deployment in a good state?
75-
if d.status.Conditions.IsTrue(api.ConditionTypeSecretsChanged) {
75+
status, _ := d.GetStatus()
76+
if status.Conditions.IsTrue(api.ConditionTypeSecretsChanged) {
7677
log.Debug().Msg("Condition SecretsChanged is true. Revert secrets before we can continue")
7778
return nextInterval
7879
}

pkg/deployment/images.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,15 +55,15 @@ type imagesBuilder struct {
5555
// ensureImages creates pods needed to detect ImageID for specified images.
5656
// Returns: retrySoon, error
5757
func (d *Deployment) ensureImages(apiObject *api.ArangoDeployment) (bool, error) {
58+
status, lastVersion := d.GetStatus()
5859
ib := imagesBuilder{
5960
APIObject: apiObject,
6061
Spec: apiObject.Spec,
61-
Status: d.status,
62+
Status: status,
6263
Log: d.deps.Log,
6364
KubeCli: d.deps.KubeCli,
6465
UpdateCRStatus: func(status api.DeploymentStatus) error {
65-
d.status = status
66-
if err := d.updateCRStatus(); err != nil {
66+
if err := d.UpdateStatus(status, lastVersion); err != nil {
6767
return maskAny(err)
6868
}
6969
return nil

0 commit comments

Comments
 (0)