Skip to content

Commit a5725af

Browse files
committed
Implement Cluster Scaling and StatefulSet Management
1 parent 3d8b85c commit a5725af

File tree

1 file changed

+94
-21
lines changed

1 file changed

+94
-21
lines changed

internal/controller/etcdcluster_controller.go

Lines changed: 94 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
146146
return ctrl.Result{}, nil
147147
}
148148

149-
return ctrl.Result{}, r.scaleUpFromZero(ctx) // TODO: needs implementing
149+
return ctrl.Result{}, r.scaleUpFromZero(ctx, state) // TODO: needs implementing
150150
}
151151

152152
// get status of every endpoint and member list from every endpoint
@@ -174,7 +174,7 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
174174
}
175175

176176
if !memberReached {
177-
return ctrl.Result{}, r.createOrUpdateStatefulSet(ctx)
177+
return ctrl.Result{}, r.createOrUpdateStatefulSet(ctx, state)
178178
}
179179

180180
state.setClusterID()
@@ -197,16 +197,46 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
197197
return ctrl.Result{}, fmt.Errorf("cluster has lost quorum")
198198
}
199199

200+
// If endpoints are not found
201+
if !state.endpointsFound {
202+
if !state.stsExists {
203+
return r.createClusterFromScratch(ctx, state)
204+
}
205+
206+
// Update StatefulSet pod template if necessary
207+
if !state.statefulSetPodSpecCorrect() {
208+
desiredSts := factory.TemplateStatefulSet()
209+
state.statefulSet.Spec.Template.Spec = desiredSts.Spec.Template.Spec
210+
return ctrl.Result{}, r.patchOrCreateObject(ctx, &state.statefulSet)
211+
}
212+
213+
if !state.statefulSetReady() {
214+
return ctrl.Result{}, fmt.Errorf("waiting for statefulset to become ready")
215+
}
216+
217+
if *state.statefulSet.Spec.Replicas > 0 {
218+
return ctrl.Result{}, fmt.Errorf("reached an impossible state (no endpoints, but active pods)")
219+
}
220+
221+
if *state.instance.Spec.Replicas == 0 {
222+
// Cluster successfully scaled down to zero
223+
return ctrl.Result{}, nil
224+
}
225+
226+
// **Updated function call with `state` parameter**
227+
return ctrl.Result{}, r.scaleUpFromZero(ctx, state)
228+
}
229+
200230
if state.hasLearners() {
201-
return ctrl.Result{}, r.promoteLearners(ctx)
231+
return ctrl.Result{}, r.promoteLearners(ctx, state)
202232
}
203233

204-
if err := r.createOrUpdateClusterStateConfigMap(ctx); err != nil {
234+
if err := r.createOrUpdateClusterStateConfigMap(ctx, state); err != nil {
205235
return ctrl.Result{}, err
206236
}
207237

208238
if !state.statefulSetPodSpecCorrect() {
209-
return ctrl.Result{}, r.createOrUpdateStatefulSet(ctx)
239+
return ctrl.Result{}, r.createOrUpdateStatefulSet(ctx, state)
210240
}
211241

212242
// if size is different we have to remove statefulset it will be recreated in the next step
@@ -703,26 +733,69 @@ func (r *EtcdClusterReconciler) createClusterFromScratch(ctx context.Context, st
703733
panic("not yet implemented")
704734
}
705735

706-
// TODO!
707-
// nolint:unused
708-
func (r *EtcdClusterReconciler) scaleUpFromZero(ctx context.Context) error {
709-
return fmt.Errorf("not yet implemented")
736+
// scaleUpFromZero scales the etcd cluster from zero replicas to the desired replica count.
737+
func (r *EtcdClusterReconciler) scaleUpFromZero(ctx context.Context, state *observables) error {
738+
// Update the cluster state ConfigMap to set ETCD_INITIAL_CLUSTER_STATE to "existing"
739+
if err := r.createOrUpdateClusterStateConfigMap(ctx, state); err != nil {
740+
return fmt.Errorf("failed to create or update cluster state ConfigMap: %w", err)
741+
}
742+
743+
// Update the StatefulSet to have the desired number of replicas
744+
if err := r.createOrUpdateStatefulSet(ctx, state); err != nil {
745+
return fmt.Errorf("failed to create or update StatefulSet: %w", err)
746+
}
747+
748+
return nil
710749
}
711750

712-
// TODO!
713-
// nolint:unused
714-
func (r *EtcdClusterReconciler) createOrUpdateClusterStateConfigMap(ctx context.Context) error {
715-
return fmt.Errorf("not yet implemented")
751+
// createOrUpdateClusterStateConfigMap ensures the cluster state ConfigMap is up-to-date.
752+
func (r *EtcdClusterReconciler) createOrUpdateClusterStateConfigMap(ctx context.Context, state *observables) error {
753+
// Create a ConfigMap with ETCD_INITIAL_CLUSTER_STATE set to "existing"
754+
cm := factory.TemplateClusterStateConfigMap(state.instance, "existing", int(*state.instance.Spec.Replicas))
755+
if err := ctrl.SetControllerReference(state.instance, cm, r.Scheme); err != nil {
756+
return fmt.Errorf("cannot set controller reference: %w", err)
757+
}
758+
759+
if err := r.patchOrCreateObject(ctx, cm); err != nil {
760+
return fmt.Errorf("failed to create or update cluster state ConfigMap: %w", err)
761+
}
762+
763+
return nil
716764
}
717765

718-
// TODO!
719-
// nolint:unused
720-
func (r *EtcdClusterReconciler) createOrUpdateStatefulSet(ctx context.Context) error {
721-
return fmt.Errorf("not yet implemented")
766+
// createOrUpdateStatefulSet ensures the StatefulSet is up-to-date with the desired state.
767+
func (r *EtcdClusterReconciler) createOrUpdateStatefulSet(ctx context.Context, state *observables) error {
768+
if err := factory.CreateOrUpdateStatefulSet(ctx, state.instance, r.Client); err != nil {
769+
return fmt.Errorf("failed to create or update StatefulSet: %w", err)
770+
}
771+
return nil
722772
}
723773

724-
// TODO!
725-
// nolint:unused
726-
func (r *EtcdClusterReconciler) promoteLearners(ctx context.Context) error {
727-
return fmt.Errorf("not yet implemented")
774+
// promoteLearners promotes any learner members in the etcd cluster to full voting members.
775+
func (r *EtcdClusterReconciler) promoteLearners(ctx context.Context, state *observables) error {
776+
// Get a client to the etcd cluster
777+
clusterClient, _, err := factory.NewEtcdClientSet(ctx, state.instance, r.Client)
778+
if err != nil {
779+
return fmt.Errorf("failed to create etcd client: %w", err)
780+
}
781+
defer clusterClient.Close()
782+
783+
// Retrieve the current member list
784+
memberListResp, err := clusterClient.MemberList(ctx)
785+
if err != nil {
786+
return fmt.Errorf("failed to get member list: %w", err)
787+
}
788+
789+
// Promote any learner members to full voting members
790+
for _, member := range memberListResp.Members {
791+
if member.IsLearner {
792+
_, err := clusterClient.MemberPromote(ctx, member.ID)
793+
if err != nil {
794+
return fmt.Errorf("failed to promote member %s: %w", member.Name, err)
795+
}
796+
log.Info(ctx, "Promoted learner to member", "member", member.Name)
797+
}
798+
}
799+
800+
return nil
728801
}

0 commit comments

Comments
 (0)