Skip to content

Commit da83d85

Browse files
authored
Merge pull request #373 from arangodb/bug-fix/wait-for-sync-upgrade
waitForSync Upgrade
2 parents 8749bfa + 2b8a8d6 commit da83d85

File tree

10 files changed

+142
-24
lines changed

10 files changed

+142
-24
lines changed

pkg/deployment/context_impl.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,3 +380,13 @@ func (d *Deployment) GetExpectedPodArguments(apiObject metav1.Object, deplSpec a
380380
agents api.MemberStatusList, id string, version driver.Version) []string {
381381
return d.resources.GetExpectedPodArguments(apiObject, deplSpec, group, agents, id, version)
382382
}
383+
384+
// GetShardSyncStatus returns true if all shards are in sync
385+
func (d *Deployment) GetShardSyncStatus() bool {
386+
return d.resources.GetShardSyncStatus()
387+
}
388+
389+
// InvalidateSyncStatus resets the sync state to false and triggers an inspection
390+
func (d *Deployment) InvalidateSyncStatus() {
391+
d.resources.InvalidateSyncStatus()
392+
}

pkg/deployment/deployment.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoDeployment) (*De
141141
d.clusterScalingIntegration = ci
142142
go ci.ListenForClusterEvents(d.stopCh)
143143
go d.resources.RunDeploymentHealthLoop(d.stopCh)
144+
go d.resources.RunDeploymentShardSyncLoop(d.stopCh)
144145
}
145146
if config.AllowChaos {
146147
d.chaosMonkey = chaos.NewMonkey(deps.Log, d)

pkg/deployment/reconcile/action_wait_for_member_up.go

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -167,26 +167,6 @@ func (a *actionWaitForMemberUp) checkProgressCluster(ctx context.Context) (bool,
167167
log.Debug().Str("status", string(sh.Status)).Msg("Member set status not yet good")
168168
return false, false, nil
169169
}
170-
if a.action.Group == api.ServerGroupDBServers {
171-
dbs, err := c.Databases(ctx)
172-
if err != nil {
173-
return false, false, err
174-
}
175-
for _, db := range dbs {
176-
inv, err := cluster.DatabaseInventory(ctx, db)
177-
if err != nil {
178-
return false, false, err
179-
}
180-
181-
for _, col := range inv.Collections {
182-
if !col.AllInSync {
183-
log.Debug().Str("col", col.Parameters.Name).Msg("Not in sync")
184-
return false, false, nil
185-
}
186-
}
187-
}
188-
189-
}
190170
// Wait for the member to become ready from a kubernetes point of view
191171
// otherwise the coordinators may be rotated to fast and thus none of them
192172
// is ready resulting in a short downtime

pkg/deployment/reconcile/context.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,4 +93,8 @@ type Context interface {
9393
// GetExpectedPodArguments creates command line arguments for a server in the given group with given ID.
9494
GetExpectedPodArguments(apiObject metav1.Object, deplSpec api.DeploymentSpec, group api.ServerGroup,
9595
agents api.MemberStatusList, id string, version driver.Version) []string
96+
// GetShardSyncStatus returns true if all shards are in sync
97+
GetShardSyncStatus() bool
98+
// InvalidateSyncStatus resets the sync state to false and triggers an inspection
99+
InvalidateSyncStatus()
96100
}

pkg/deployment/reconcile/plan_builder.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -209,12 +209,17 @@ func createPlan(log zerolog.Logger, apiObject k8sutil.APIObject,
209209
})
210210
return newPlan, upgradeNotAllowed, fromVersion, toVersion, fromLicense, toLicense
211211
}
212+
212213
if newPlan, upgradeNotAllowed, fromVersion, toVersion, fromLicense, toLicense := createRotateOrUpgradePlan(); upgradeNotAllowed {
213214
// Upgrade is needed, but not allowed
214215
context.CreateEvent(k8sutil.NewUpgradeNotAllowedEvent(apiObject, fromVersion, toVersion, fromLicense, toLicense))
215-
} else {
216-
// Use the new plan
217-
plan = newPlan
216+
} else if len(newPlan) > 0 {
217+
if clusterReadyForUpgrade(context) {
218+
// Use the new plan
219+
plan = newPlan
220+
} else {
221+
log.Info().Msg("Pod needs upgrade but cluster is not ready. Either some shards are not in sync or some member is not ready.")
222+
}
218223
}
219224
}
220225

@@ -237,6 +242,15 @@ func createPlan(log zerolog.Logger, apiObject k8sutil.APIObject,
237242
return plan, true
238243
}
239244

245+
// clusterReadyForUpgrade returns true if the cluster is ready for the next update, that is:
246+
// - all shards are in sync
247+
// - all members are ready and fine
248+
func clusterReadyForUpgrade(context PlanBuilderContext) bool {
249+
status, _ := context.GetStatus()
250+
allInSync := context.GetShardSyncStatus()
251+
return allInSync && status.Conditions.IsTrue(api.ConditionTypeReady)
252+
}
253+
240254
// podNeedsUpgrading decides if an upgrade of the pod is needed (to comply with
241255
// the given spec) and if that is allowed.
242256
func podNeedsUpgrading(log zerolog.Logger, p v1.Pod, spec api.DeploymentSpec, images api.ImageInfoList) upgradeDecision {

pkg/deployment/reconcile/plan_builder_context.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,12 @@ type PlanBuilderContext interface {
4646
// GetExpectedPodArguments creates command line arguments for a server in the given group with given ID.
4747
GetExpectedPodArguments(apiObject metav1.Object, deplSpec api.DeploymentSpec, group api.ServerGroup,
4848
agents api.MemberStatusList, id string, version driver.Version) []string
49+
// GetShardSyncStatus returns true if all shards are in sync
50+
GetShardSyncStatus() bool
51+
// InvalidateSyncStatus resets the sync state to false and triggers an inspection
52+
InvalidateSyncStatus()
53+
// GetStatus returns the current status of the deployment
54+
GetStatus() (api.DeploymentStatus, int32)
4955
}
5056

5157
// newPlanBuilderContext creates a PlanBuilderContext from the given context

pkg/deployment/reconcile/timeouts.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ const (
3030
removeMemberTimeout = time.Minute * 15
3131
renewTLSCertificateTimeout = time.Minute * 30
3232
renewTLSCACertificateTimeout = time.Minute * 30
33-
rotateMemberTimeout = time.Minute * 30
33+
rotateMemberTimeout = time.Minute * 15
3434
shutdownMemberTimeout = time.Minute * 30
3535
upgradeMemberTimeout = time.Hour * 6
3636
waitForMemberUpTimeout = time.Minute * 15

pkg/deployment/resources/deployment_health.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232

3333
var (
3434
deploymentHealthFetchesCounters = metrics.MustRegisterCounterVec(metricsComponent, "deployment_health_fetches", "Number of times the health of the deployment was fetched", metrics.DeploymentName, metrics.Result)
35+
deploymentSyncFetchesCounters = metrics.MustRegisterCounterVec(metricsComponent, "deployment_sync_fetches", "Number of times the sync status of shards of the deplyoment was fetched", metrics.DeploymentName, metrics.Result)
3536
)
3637

3738
// RunDeploymentHealthLoop creates a loop to fetch the health of the deployment.
@@ -53,6 +54,7 @@ func (r *Resources) RunDeploymentHealthLoop(stopCh <-chan struct{}) {
5354
deploymentHealthFetchesCounters.WithLabelValues(deploymentName, metrics.Success).Inc()
5455
}
5556
select {
57+
case <-r.shardSync.triggerSyncInspection.Done():
5658
case <-time.After(time.Second * 5):
5759
// Continue
5860
case <-stopCh:
@@ -88,3 +90,95 @@ func (r *Resources) fetchDeploymentHealth() error {
8890
r.health.timestamp = time.Now()
8991
return nil
9092
}
93+
94+
// RunDeploymentShardSyncLoop creates a loop to fetch the sync status of shards of the deployment.
95+
// The loop ends when the given channel is closed.
96+
func (r *Resources) RunDeploymentShardSyncLoop(stopCh <-chan struct{}) {
97+
log := r.log
98+
deploymentName := r.context.GetAPIObject().GetName()
99+
100+
if r.context.GetSpec().GetMode() != api.DeploymentModeCluster {
101+
// Deployment health is currently only applicable for clusters
102+
return
103+
}
104+
105+
for {
106+
if err := r.fetchClusterShardSyncState(); err != nil {
107+
log.Debug().Err(err).Msg("Failed to fetch deployment shard sync state")
108+
deploymentSyncFetchesCounters.WithLabelValues(deploymentName, metrics.Failed).Inc()
109+
} else {
110+
deploymentSyncFetchesCounters.WithLabelValues(deploymentName, metrics.Success).Inc()
111+
}
112+
select {
113+
case <-time.After(time.Second * 30):
114+
// Continue
115+
case <-stopCh:
116+
// We're done
117+
return
118+
}
119+
}
120+
}
121+
122+
// InvalidateSyncStatus resets the sync state to false and triggers an inspection
123+
func (r *Resources) InvalidateSyncStatus() {
124+
r.log.Debug().Msg("Invalidating sync status due to previous events")
125+
r.shardSync.mutex.Lock()
126+
defer r.shardSync.mutex.Unlock()
127+
r.shardSync.allInSync = false
128+
r.shardSync.triggerSyncInspection.Trigger()
129+
}
130+
131+
// fetchClusterShardSyncState performs a single fetch of the cluster inventory and
132+
// checks if all shards are in sync
133+
func (r *Resources) fetchClusterShardSyncState() error {
134+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
135+
defer cancel()
136+
c, err := r.context.GetDatabaseClient(ctx)
137+
if err != nil {
138+
return err
139+
}
140+
cluster, err := c.Cluster(ctx)
141+
if err != nil {
142+
return err
143+
}
144+
dbs, err := c.Databases(ctx)
145+
if err != nil {
146+
return err
147+
}
148+
149+
allInSync := true
150+
dbloop:
151+
for _, db := range dbs {
152+
inv, err := cluster.DatabaseInventory(ctx, db)
153+
if err != nil {
154+
return err
155+
}
156+
157+
for _, col := range inv.Collections {
158+
if !col.AllInSync {
159+
r.log.Debug().Str("db", db.Name()).Str("col", col.Parameters.Name).Msg("Collection not in sync")
160+
allInSync = false
161+
break dbloop
162+
}
163+
}
164+
}
165+
166+
r.shardSync.mutex.Lock()
167+
oldSyncState := r.shardSync.allInSync
168+
r.shardSync.allInSync = allInSync
169+
r.shardSync.timestamp = time.Now()
170+
r.shardSync.mutex.Unlock()
171+
172+
if !oldSyncState && allInSync {
173+
r.log.Debug().Msg("Everything is in sync by now")
174+
}
175+
176+
return nil
177+
}
178+
179+
// GetShardSyncStatus returns true if all shards are in sync
180+
func (r *Resources) GetShardSyncStatus() bool {
181+
r.shardSync.mutex.Lock()
182+
defer r.shardSync.mutex.Unlock()
183+
return r.shardSync.allInSync
184+
}

pkg/deployment/resources/pod_inspector.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ func (r *Resources) InspectPods(ctx context.Context) (util.Interval, error) {
109109
// Record termination time
110110
now := metav1.Now()
111111
memberStatus.RecentTerminations = append(memberStatus.RecentTerminations, now)
112+
r.InvalidateSyncStatus()
112113
}
113114
}
114115
} else if k8sutil.IsPodFailed(&p) {
@@ -122,6 +123,7 @@ func (r *Resources) InspectPods(ctx context.Context) (util.Interval, error) {
122123
// Record termination time
123124
now := metav1.Now()
124125
memberStatus.RecentTerminations = append(memberStatus.RecentTerminations, now)
126+
r.InvalidateSyncStatus()
125127
}
126128
}
127129
}

pkg/deployment/resources/resources.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"time"
2828

2929
driver "github.com/arangodb/go-driver"
30+
"github.com/arangodb/kube-arangodb/pkg/util/trigger"
3031
"github.com/rs/zerolog"
3132
)
3233

@@ -40,6 +41,12 @@ type Resources struct {
4041
timestamp time.Time // Timestamp of last fetch of cluster health
4142
mutex sync.Mutex // Mutex guarding fields in this struct
4243
}
44+
shardSync struct {
45+
allInSync bool
46+
timestamp time.Time
47+
mutex sync.Mutex
48+
triggerSyncInspection trigger.Trigger
49+
}
4350
}
4451

4552
// NewResources creates a new Resources service, used to

0 commit comments

Comments
 (0)