Skip to content

Commit 90b1d87

Browse files
author
lamai93
committed
Monitor sync status concurrently. Only create new upgrade plan if everything is good.
1 parent 8749bfa commit 90b1d87

File tree

9 files changed

+114
-24
lines changed

9 files changed

+114
-24
lines changed

pkg/deployment/context_impl.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,3 +380,8 @@ func (d *Deployment) GetExpectedPodArguments(apiObject metav1.Object, deplSpec a
380380
agents api.MemberStatusList, id string, version driver.Version) []string {
381381
return d.resources.GetExpectedPodArguments(apiObject, deplSpec, group, agents, id, version)
382382
}
383+
384+
// GetShardSyncStatus returns true if all shards are in sync
385+
func (d *Deployment) GetShardSyncStatus() bool {
386+
return d.resources.GetShardSyncStatus()
387+
}

pkg/deployment/deployment.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoDeployment) (*De
141141
d.clusterScalingIntegration = ci
142142
go ci.ListenForClusterEvents(d.stopCh)
143143
go d.resources.RunDeploymentHealthLoop(d.stopCh)
144+
go d.resources.RunDeploymentShardSyncLoop(d.stopCh)
144145
}
145146
if config.AllowChaos {
146147
d.chaosMonkey = chaos.NewMonkey(deps.Log, d)

pkg/deployment/reconcile/action_wait_for_member_up.go

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -167,26 +167,6 @@ func (a *actionWaitForMemberUp) checkProgressCluster(ctx context.Context) (bool,
167167
log.Debug().Str("status", string(sh.Status)).Msg("Member set status not yet good")
168168
return false, false, nil
169169
}
170-
if a.action.Group == api.ServerGroupDBServers {
171-
dbs, err := c.Databases(ctx)
172-
if err != nil {
173-
return false, false, err
174-
}
175-
for _, db := range dbs {
176-
inv, err := cluster.DatabaseInventory(ctx, db)
177-
if err != nil {
178-
return false, false, err
179-
}
180-
181-
for _, col := range inv.Collections {
182-
if !col.AllInSync {
183-
log.Debug().Str("col", col.Parameters.Name).Msg("Not in sync")
184-
return false, false, nil
185-
}
186-
}
187-
}
188-
189-
}
190170
// Wait for the member to become ready from a kubernetes point of view
191171
// otherwise the coordinators may be rotated to fast and thus none of them
192172
// is ready resulting in a short downtime

pkg/deployment/reconcile/context.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,4 +93,6 @@ type Context interface {
9393
// GetExpectedPodArguments creates command line arguments for a server in the given group with given ID.
9494
GetExpectedPodArguments(apiObject metav1.Object, deplSpec api.DeploymentSpec, group api.ServerGroup,
9595
agents api.MemberStatusList, id string, version driver.Version) []string
96+
// GetShardSyncStatus returns true if all shards are in sync
97+
GetShardSyncStatus() bool
9698
}

pkg/deployment/reconcile/plan_builder.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -209,12 +209,17 @@ func createPlan(log zerolog.Logger, apiObject k8sutil.APIObject,
209209
})
210210
return newPlan, upgradeNotAllowed, fromVersion, toVersion, fromLicense, toLicense
211211
}
212+
212213
if newPlan, upgradeNotAllowed, fromVersion, toVersion, fromLicense, toLicense := createRotateOrUpgradePlan(); upgradeNotAllowed {
213214
// Upgrade is needed, but not allowed
214215
context.CreateEvent(k8sutil.NewUpgradeNotAllowedEvent(apiObject, fromVersion, toVersion, fromLicense, toLicense))
215-
} else {
216-
// Use the new plan
217-
plan = newPlan
216+
} else if len(newPlan) > 0 {
217+
if clusterReadyForUpgrade(context) {
218+
// Use the new plan
219+
plan = newPlan
220+
} else {
221+
log.Info().Msg("Pod needs upgrade but cluster is not ready. Either some shards are not in sync or some member is not ready.")
222+
}
218223
}
219224
}
220225

@@ -237,6 +242,15 @@ func createPlan(log zerolog.Logger, apiObject k8sutil.APIObject,
237242
return plan, true
238243
}
239244

245+
// clusterReadyForUpgrade returns true if the cluster is ready for the next update, that is:
246+
// - all shards are in sync
247+
// - all members are ready and fine
248+
func clusterReadyForUpgrade(context PlanBuilderContext) bool {
249+
status, _ := context.GetStatus()
250+
allInSync := context.GetShardSyncStatus()
251+
return allInSync && status.Conditions.IsTrue(api.ConditionTypeReady)
252+
}
253+
240254
// podNeedsUpgrading decides if an upgrade of the pod is needed (to comply with
241255
// the given spec) and if that is allowed.
242256
func podNeedsUpgrading(log zerolog.Logger, p v1.Pod, spec api.DeploymentSpec, images api.ImageInfoList) upgradeDecision {

pkg/deployment/reconcile/plan_builder_context.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ type PlanBuilderContext interface {
4646
// GetExpectedPodArguments creates command line arguments for a server in the given group with given ID.
4747
GetExpectedPodArguments(apiObject metav1.Object, deplSpec api.DeploymentSpec, group api.ServerGroup,
4848
agents api.MemberStatusList, id string, version driver.Version) []string
49+
// GetShardSyncStatus returns true if all shards are in sync
50+
GetShardSyncStatus() bool
51+
// GetStatus returns the current status of the deployment
52+
GetStatus() (api.DeploymentStatus, int32)
4953
}
5054

5155
// newPlanBuilderContext creates a PlanBuilderContext from the given context

pkg/deployment/reconcile/timeouts.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,5 +33,5 @@ const (
3333
rotateMemberTimeout = time.Minute * 30
3434
shutdownMemberTimeout = time.Minute * 30
3535
upgradeMemberTimeout = time.Hour * 6
36-
waitForMemberUpTimeout = time.Minute * 15
36+
waitForMemberUpTimeout = time.Minute * 45
3737
)

pkg/deployment/resources/deployment_health.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,12 @@ import (
2828

2929
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
3030
"github.com/arangodb/kube-arangodb/pkg/metrics"
31+
"github.com/rs/zerolog/log"
3132
)
3233

3334
var (
3435
deploymentHealthFetchesCounters = metrics.MustRegisterCounterVec(metricsComponent, "deployment_health_fetches", "Number of times the health of the deployment was fetched", metrics.DeploymentName, metrics.Result)
36+
deploymentSyncFetchesCounters = metrics.MustRegisterCounterVec(metricsComponent, "deployment_sync_fetches", "Number of times the sync status of shards of the deplyoment was fetched", metrics.DeploymentName, metrics.Result)
3537
)
3638

3739
// RunDeploymentHealthLoop creates a loop to fetch the health of the deployment.
@@ -88,3 +90,80 @@ func (r *Resources) fetchDeploymentHealth() error {
8890
r.health.timestamp = time.Now()
8991
return nil
9092
}
93+
94+
// RunDeploymentShardSyncLoop creates a loop to fetch the sync status of shards of the deployment.
95+
// The loop ends when the given channel is closed.
96+
func (r *Resources) RunDeploymentShardSyncLoop(stopCh <-chan struct{}) {
97+
log := r.log
98+
deploymentName := r.context.GetAPIObject().GetName()
99+
100+
if r.context.GetSpec().GetMode() != api.DeploymentModeCluster {
101+
// Deployment health is currently only applicable for clusters
102+
return
103+
}
104+
105+
for {
106+
if err := r.fetchClusterShardSyncState(); err != nil {
107+
log.Debug().Err(err).Msg("Failed to fetch deployment shard sync state")
108+
deploymentSyncFetchesCounters.WithLabelValues(deploymentName, metrics.Failed).Inc()
109+
} else {
110+
deploymentSyncFetchesCounters.WithLabelValues(deploymentName, metrics.Success).Inc()
111+
}
112+
select {
113+
case <-time.After(time.Second * 30):
114+
// Continue
115+
case <-stopCh:
116+
// We're done
117+
return
118+
}
119+
}
120+
}
121+
122+
// fetchClusterShardSyncState performs a single fetch of the cluster inventory and
123+
// checks if all shards are in sync
124+
func (r *Resources) fetchClusterShardSyncState() error {
125+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
126+
defer cancel()
127+
c, err := r.context.GetDatabaseClient(ctx)
128+
if err != nil {
129+
return err
130+
}
131+
cluster, err := c.Cluster(ctx)
132+
if err != nil {
133+
return err
134+
}
135+
dbs, err := c.Databases(ctx)
136+
if err != nil {
137+
return err
138+
}
139+
140+
allInSync := true
141+
dbloop:
142+
for _, db := range dbs {
143+
inv, err := cluster.DatabaseInventory(ctx, db)
144+
if err != nil {
145+
return err
146+
}
147+
148+
for _, col := range inv.Collections {
149+
if !col.AllInSync {
150+
log.Debug().Str("col", col.Parameters.Name).Msg("Not in sync")
151+
allInSync = false
152+
break dbloop
153+
}
154+
}
155+
}
156+
157+
r.shardSync.mutex.Lock()
158+
defer r.shardSync.mutex.Unlock()
159+
r.shardSync.allInSync = allInSync
160+
r.shardSync.timestamp = time.Now()
161+
return nil
162+
}
163+
164+
// GetShardSyncStatus returns true if all shards are in sync
165+
func (r *Resources) GetShardSyncStatus() bool {
166+
r.shardSync.mutex.Lock()
167+
defer r.shardSync.mutex.Unlock()
168+
return r.shardSync.allInSync
169+
}

pkg/deployment/resources/resources.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@ type Resources struct {
4040
timestamp time.Time // Timestamp of last fetch of cluster health
4141
mutex sync.Mutex // Mutex guarding fields in this struct
4242
}
43+
shardSync struct {
44+
allInSync bool
45+
timestamp time.Time
46+
mutex sync.Mutex
47+
}
4348
}
4449

4550
// NewResources creates a new Resources service, used to

0 commit comments

Comments
 (0)