Skip to content

Commit 9c64f22

Browse files
committed
(WIP) rewrite reconciliation loop
1 parent f09c843 commit 9c64f22

File tree

2 files changed

+88
-94
lines changed

2 files changed

+88
-94
lines changed

internal/controller/etcdcluster_controller.go

Lines changed: 78 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,9 @@ type EtcdClusterReconciler struct {
7575
// Reconcile checks CR and current cluster state and performs actions to transform current state to desired.
7676
func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
7777
log.Debug(ctx, "reconciling object")
78-
instance := &etcdaenixiov1alpha1.EtcdCluster{}
79-
err := r.Get(ctx, req.NamespacedName, instance)
78+
state := observables{}
79+
state.instance = &etcdaenixiov1alpha1.EtcdCluster{}
80+
err := r.Get(ctx, req.NamespacedName, state.instance)
8081
if err != nil {
8182
if errors.IsNotFound(err) {
8283
log.Debug(ctx, "object not found")
@@ -86,15 +87,12 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
8687
return reconcile.Result{}, err
8788
}
8889
// If object is being deleted, skipping reconciliation
89-
if !instance.DeletionTimestamp.IsZero() {
90+
if !state.instance.DeletionTimestamp.IsZero() {
9091
return reconcile.Result{}, nil
9192
}
9293

93-
state := observables{}
94-
state.instance = instance
95-
9694
// create two services and the pdb
97-
err = r.ensureUnconditionalObjects(ctx, instance)
95+
err = r.ensureUnconditionalObjects(ctx, state.instance)
9896
if err != nil {
9997
return ctrl.Result{}, err
10098
}
@@ -107,7 +105,7 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
107105
state.stsExists = state.statefulSet.UID != ""
108106

109107
// fetch endpoints
110-
clusterClient, singleClients, err := factory.NewEtcdClientSet(ctx, instance, r.Client)
108+
clusterClient, singleClients, err := factory.NewEtcdClientSet(ctx, state.instance, r.Client)
111109
if err != nil {
112110
return ctrl.Result{}, err
113111
}
@@ -118,7 +116,7 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
118116
}
119117

120118
// fetch PVCs
121-
state.pvcs, err = factory.PVCs(ctx, instance, r.Client)
119+
state.pvcs, err = factory.PVCs(ctx, state.instance, r.Client)
122120
if err != nil {
123121
return ctrl.Result{}, err
124122
}
@@ -127,25 +125,27 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
127125
if !state.stsExists {
128126
return r.createClusterFromScratch(ctx, &state) // TODO: needs implementing
129127
}
130-
// else try reconciling the sts
131-
existingSts := state.statefulSet.DeepCopy()
132-
desiredSts := factory.TemplateStatefulSet() // TODO: needs implementing
133-
existingSts.Spec.Template.Spec = desiredSts.Spec.Template.Spec
134-
err := r.patchOrCreateObject(ctx, existingSts)
135-
if err != nil {
136-
return ctrl.Result{}, err
128+
129+
// update sts pod template (and only pod template) if it doesn't match desired state
130+
if !state.statefulSetPodSpecCorrect() { // TODO: needs implementing
131+
desiredSts := factory.TemplateStatefulSet() // TODO: needs implementing
132+
state.statefulSet.Spec.Template.Spec = desiredSts.Spec.Template.Spec
133+
return ctrl.Result{}, r.patchOrCreateObject(ctx, &state.statefulSet)
137134
}
138-
state.statefulSet = *existingSts
139-
if existingSts.Status.ReadyReplicas != *existingSts.Spec.Replicas { // TODO: this check might not be the best to check for a ready sts
135+
136+
if !state.statefulSetReady() { // TODO: needs improved implementation?
140137
return ctrl.Result{}, fmt.Errorf("waiting for statefulset to become ready")
141138
}
142-
if *existingSts.Spec.Replicas > 0 {
139+
140+
if *state.statefulSet.Spec.Replicas > 0 {
143141
return ctrl.Result{}, fmt.Errorf("reached an impossible state (no endpoints, but active pods)")
144142
}
145-
if *instance.Spec.Replicas == 0 {
143+
144+
if *state.instance.Spec.Replicas == 0 {
146145
// cluster successfully scaled down to zero
147146
return ctrl.Result{}, nil
148147
}
148+
149149
return r.scaleUpFromZero(ctx, &state) // TODO: needs implementing
150150
}
151151

@@ -164,108 +164,69 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
164164
wg.Wait()
165165
cancel()
166166
}
167+
168+
memberReached := false
169+
for i := range state.etcdStatuses {
170+
if state.etcdStatuses[i].endpointStatus != nil {
171+
memberReached = true
172+
break
173+
}
174+
}
175+
176+
if !memberReached {
177+
return r.createOrUpdateStatefulSet(ctx, &state, state.instance)
178+
}
179+
167180
state.setClusterID()
168181
if state.inSplitbrain() {
169182
log.Error(ctx, fmt.Errorf("etcd cluster in splitbrain"), "etcd cluster in splitbrain, dropping from reconciliation queue")
170183
meta.SetStatusCondition(
171-
&instance.Status.Conditions,
184+
&state.instance.Status.Conditions,
172185
metav1.Condition{
173186
Type: etcdaenixiov1alpha1.EtcdConditionError,
174187
Status: metav1.ConditionTrue,
175188
Reason: string(etcdaenixiov1alpha1.EtcdCondTypeSplitbrain),
176189
Message: string(etcdaenixiov1alpha1.EtcdErrorCondSplitbrainMessage),
177190
},
178191
)
179-
return r.updateStatus(ctx, instance)
192+
return r.updateStatus(ctx, state.instance)
180193
}
181-
// fill conditions
182-
if len(instance.Status.Conditions) == 0 {
183-
meta.SetStatusCondition(
184-
&instance.Status.Conditions,
185-
metav1.Condition{
186-
Type: etcdaenixiov1alpha1.EtcdConditionInitialized,
187-
Status: metav1.ConditionFalse,
188-
Reason: string(etcdaenixiov1alpha1.EtcdCondTypeInitStarted),
189-
Message: string(etcdaenixiov1alpha1.EtcdInitCondNegMessage),
190-
},
191-
)
192-
meta.SetStatusCondition(
193-
&instance.Status.Conditions,
194-
metav1.Condition{
195-
Type: etcdaenixiov1alpha1.EtcdConditionReady,
196-
Status: metav1.ConditionFalse,
197-
Reason: string(etcdaenixiov1alpha1.EtcdCondTypeWaitingForFirstQuorum),
198-
Message: string(etcdaenixiov1alpha1.EtcdReadyCondNegWaitingForQuorum),
199-
},
200-
)
194+
195+
if !state.clusterHasQuorum() {
196+
// we can't do anything about this but we still return an error to check on the cluster from time to time
197+
return ctrl.Result{}, fmt.Errorf("cluster has lost quorum")
201198
}
202199

203-
// if size is different we have to remove statefulset it will be recreated in the next step
204-
if err := r.checkAndDeleteStatefulSetIfNecessary(ctx, &state, instance); err != nil {
200+
if state.hasLearners() {
201+
return ctrl.Result{}, r.promoteLearners(ctx, &state)
202+
}
203+
204+
if err := r.createOrUpdateClusterStateConfigMap(ctx, &state); err != nil {
205205
return ctrl.Result{}, err
206206
}
207207

208-
// ensure managed resources
209-
if err = r.ensureConditionalClusterObjects(ctx, instance); err != nil {
210-
return r.updateStatusOnErr(ctx, instance, fmt.Errorf("cannot create Cluster auxiliary objects: %w", err))
208+
if !state.statefulSetPodSpecCorrect() {
209+
return ctrl.Result{}, r.createOrUpdateStatefulSet(ctx, &state)
210+
}
211+
212+
// if size is different we have to remove statefulset it will be recreated in the next step
213+
if err := r.checkAndDeleteStatefulSetIfNecessary(ctx, &state, state.instance); err != nil {
214+
return ctrl.Result{}, err
211215
}
212216

217+
/* Saved as an example
213218
// set cluster initialization condition
214219
meta.SetStatusCondition(
215-
&instance.Status.Conditions,
220+
&state.instance.Status.Conditions,
216221
metav1.Condition{
217222
Type: etcdaenixiov1alpha1.EtcdConditionInitialized,
218223
Status: metav1.ConditionTrue,
219224
Reason: string(etcdaenixiov1alpha1.EtcdCondTypeInitComplete),
220225
Message: string(etcdaenixiov1alpha1.EtcdInitCondPosMessage),
221226
},
222227
)
223-
224-
// check sts condition
225-
clusterReady, err := r.isStatefulSetReady(ctx, instance)
226-
if err != nil {
227-
log.Error(ctx, err, "failed to check etcd cluster state")
228-
return r.updateStatusOnErr(ctx, instance, fmt.Errorf("cannot check Cluster readiness: %w", err))
229-
}
230-
231-
if clusterReady && *instance.Spec.Replicas != int32(0) {
232-
err := r.configureAuth(ctx, instance)
233-
if err != nil {
234-
return ctrl.Result{}, err
235-
}
236-
}
237-
238-
// set cluster readiness condition
239-
existingCondition := meta.FindStatusCondition(instance.Status.Conditions, etcdaenixiov1alpha1.EtcdConditionReady)
240-
if existingCondition != nil &&
241-
existingCondition.Reason == string(etcdaenixiov1alpha1.EtcdCondTypeWaitingForFirstQuorum) &&
242-
!clusterReady {
243-
// if we are still "waiting for first quorum establishment" and the StatefulSet
244-
// isn't ready yet, don't update the EtcdConditionReady, but circuit-break.
245-
return r.updateStatus(ctx, instance)
246-
}
247-
248-
// otherwise, EtcdConditionReady is set to true/false with the reason that the
249-
// StatefulSet is or isn't ready.
250-
reason := etcdaenixiov1alpha1.EtcdCondTypeStatefulSetNotReady
251-
message := etcdaenixiov1alpha1.EtcdReadyCondNegMessage
252-
ready := metav1.ConditionFalse
253-
if clusterReady {
254-
reason = etcdaenixiov1alpha1.EtcdCondTypeStatefulSetReady
255-
message = etcdaenixiov1alpha1.EtcdReadyCondPosMessage
256-
ready = metav1.ConditionTrue
257-
}
258-
259-
meta.SetStatusCondition(
260-
&instance.Status.Conditions,
261-
metav1.Condition{
262-
Type: etcdaenixiov1alpha1.EtcdConditionReady,
263-
Status: ready,
264-
Reason: string(reason),
265-
Message: string(message),
266-
},
267-
)
268-
return r.updateStatus(ctx, instance)
228+
*/
229+
return r.updateStatus(ctx, state.instance)
269230
}
270231

271232
// checkAndDeleteStatefulSetIfNecessary deletes the StatefulSet if the specified storage size has changed.
@@ -717,6 +678,29 @@ func (r *EtcdClusterReconciler) createClusterFromScratch(ctx context.Context, st
717678
if err != nil {
718679
return ctrl.Result{}, err
719680
}
681+
meta.SetStatusCondition(
682+
&state.instance.Status.Conditions,
683+
metav1.Condition{
684+
Type: etcdaenixiov1alpha1.EtcdConditionInitialized,
685+
Status: metav1.ConditionFalse,
686+
Reason: string(etcdaenixiov1alpha1.EtcdCondTypeInitStarted),
687+
Message: string(etcdaenixiov1alpha1.EtcdInitCondNegMessage),
688+
},
689+
)
690+
meta.SetStatusCondition(
691+
&state.instance.Status.Conditions,
692+
metav1.Condition{
693+
Type: etcdaenixiov1alpha1.EtcdConditionReady,
694+
Status: metav1.ConditionFalse,
695+
Reason: string(etcdaenixiov1alpha1.EtcdCondTypeWaitingForFirstQuorum),
696+
Message: string(etcdaenixiov1alpha1.EtcdReadyCondNegWaitingForQuorum),
697+
},
698+
)
699+
700+
// ensure managed resources
701+
if err = r.ensureConditionalClusterObjects(ctx, state.instance); err != nil {
702+
return r.updateStatusOnErr(ctx, state.instance, fmt.Errorf("cannot create Cluster auxiliary objects: %w", err))
703+
}
720704
panic("not yet implemented")
721705
}
722706

internal/controller/observables.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,3 +173,13 @@ func (o *observables) desiredReplicas() (max int) {
173173
}
174174
return max + 1
175175
}
176+
177+
// TODO: compare the desired sts with what exists
178+
func (o *observables) statefulSetPodSpecCorrect() bool {
179+
return true
180+
}
181+
182+
// TODO: also use updated replicas field?
183+
func (o *observables) statefulSetReady() bool {
184+
return o.statefulSet.Status.ReadyReplicas == *o.statefulSet.Spec.Replicas
185+
}

0 commit comments

Comments
 (0)