Skip to content

Commit 0987b97

Browse files
committed
Implement Cluster Scaling and StatefulSet Management
1 parent 3d8b85c commit 0987b97

File tree

2 files changed

+378
-28
lines changed

2 files changed

+378
-28
lines changed

internal/controller/etcdcluster_controller.go

Lines changed: 303 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/aenix-io/etcd-operator/internal/log"
3232
policyv1 "k8s.io/api/policy/v1"
3333

34+
"k8s.io/utils/pointer"
3435
"sigs.k8s.io/controller-runtime/pkg/builder"
3536
"sigs.k8s.io/controller-runtime/pkg/predicate"
3637

@@ -121,19 +122,20 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
121122
return ctrl.Result{}, err
122123
}
123124

125+
// If endpoints are not found
124126
if !state.endpointsFound {
125127
if !state.stsExists {
126-
return r.createClusterFromScratch(ctx, state) // TODO: needs implementing
128+
return r.createClusterFromScratch(ctx, state)
127129
}
128130

129-
// update sts pod template (and only pod template) if it doesn't match desired state
130-
if !state.statefulSetPodSpecCorrect() { // TODO: needs implementing
131-
desiredSts := factory.TemplateStatefulSet() // TODO: needs implementing
131+
// Update StatefulSet pod template if necessary
132+
if !state.statefulSetPodSpecCorrect() {
133+
desiredSts := factory.TemplateStatefulSet()
132134
state.statefulSet.Spec.Template.Spec = desiredSts.Spec.Template.Spec
133135
return ctrl.Result{}, r.patchOrCreateObject(ctx, &state.statefulSet)
134136
}
135137

136-
if !state.statefulSetReady() { // TODO: needs improved implementation?
138+
if !state.statefulSetReady() {
137139
return ctrl.Result{}, fmt.Errorf("waiting for statefulset to become ready")
138140
}
139141

@@ -142,11 +144,12 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
142144
}
143145

144146
if *state.instance.Spec.Replicas == 0 {
145-
// cluster successfully scaled down to zero
147+
// Cluster successfully scaled down to zero
146148
return ctrl.Result{}, nil
147149
}
148150

149-
return ctrl.Result{}, r.scaleUpFromZero(ctx) // TODO: needs implementing
151+
// **Updated function call with `state` parameter**
152+
return ctrl.Result{}, r.scaleUpFromZero(ctx, state)
150153
}
151154

152155
// get status of every endpoint and member list from every endpoint
@@ -174,7 +177,7 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
174177
}
175178

176179
if !memberReached {
177-
return ctrl.Result{}, r.createOrUpdateStatefulSet(ctx)
180+
return ctrl.Result{}, r.createOrUpdateStatefulSet(ctx, state)
178181
}
179182

180183
state.setClusterID()
@@ -198,15 +201,15 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
198201
}
199202

200203
if state.hasLearners() {
201-
return ctrl.Result{}, r.promoteLearners(ctx)
204+
return ctrl.Result{}, r.promoteLearners(ctx, state)
202205
}
203206

204-
if err := r.createOrUpdateClusterStateConfigMap(ctx); err != nil {
205-
return ctrl.Result{}, err
207+
if err := r.createOrUpdateClusterStateConfigMap(ctx, state, "new", 1); err != nil {
208+
return ctrl.Result{}, fmt.Errorf("failed to create or update cluster state ConfigMap: %w", err)
206209
}
207210

208211
if !state.statefulSetPodSpecCorrect() {
209-
return ctrl.Result{}, r.createOrUpdateStatefulSet(ctx)
212+
return ctrl.Result{}, r.createOrUpdateStatefulSet(ctx, state)
210213
}
211214

212215
// if size is different we have to remove statefulset it will be recreated in the next step
@@ -703,26 +706,298 @@ func (r *EtcdClusterReconciler) createClusterFromScratch(ctx context.Context, st
703706
panic("not yet implemented")
704707
}
705708

706-
// TODO!
707-
// nolint:unused
708-
func (r *EtcdClusterReconciler) scaleUpFromZero(ctx context.Context) error {
709-
return fmt.Errorf("not yet implemented")
709+
// scaleUpFromZero scales the etcd cluster from zero replicas to the desired replica count following the provided flowchart.
710+
func (r *EtcdClusterReconciler) scaleUpFromZero(ctx context.Context, state *observables) error {
711+
// Step 1: Ensure services (client service, headless service, PDB)
712+
if err := r.ensureUnconditionalObjects(ctx, state); err != nil {
713+
return fmt.Errorf("failed to ensure services: %w", err)
714+
}
715+
716+
// Step 2: Check if there are any endpoints
717+
if !state.endpointsFound {
718+
// No endpoints found
719+
// Check if the StatefulSet is present
720+
if !state.stsExists {
721+
// StatefulSet is not present
722+
// Create cluster from scratch
723+
_, err := r.createClusterFromScratch(ctx, state)
724+
return err
725+
} else {
726+
// StatefulSet exists
727+
// Check if it has the correct pod spec
728+
if !state.statefulSetPodSpecCorrect() {
729+
// Patch the podspec
730+
desiredSts := factory.TemplateStatefulSet()
731+
state.statefulSet.Spec.Template.Spec = desiredSts.Spec.Template.Spec
732+
if err := r.patchOrCreateObject(ctx, &state.statefulSet); err != nil {
733+
return fmt.Errorf("failed to patch StatefulSet: %w", err)
734+
}
735+
return nil
736+
}
737+
738+
// Check if StatefulSet is ready
739+
if !state.statefulSetReady() {
740+
// Wait and hope it becomes ready
741+
return fmt.Errorf("waiting for StatefulSet to become ready")
742+
}
743+
744+
// Check if EtcdCluster.spec.replicas == 0
745+
if *state.instance.Spec.Replicas == 0 {
746+
// Cluster successfully scaled to zero, stop
747+
return nil
748+
}
749+
750+
// Ensure ConfigMap with initial cluster state "new" and initial cluster peers with single member name-0
751+
// Increment StatefulSet size
752+
if err := r.createOrUpdateClusterStateConfigMap(ctx, state, "new", 1); err != nil {
753+
return fmt.Errorf("failed to create or update cluster state ConfigMap: %w", err)
754+
}
755+
756+
// Update StatefulSet replicas to 1
757+
state.statefulSet.Spec.Replicas = pointer.Int32(1)
758+
if err := r.patchOrCreateObject(ctx, &state.statefulSet); err != nil {
759+
return fmt.Errorf("failed to scale up StatefulSet: %w", err)
760+
}
761+
762+
return nil
763+
}
764+
} else {
765+
// Endpoints found
766+
// Connect to the cluster and fetch all statuses
767+
clusterClient, singleClients, err := factory.NewEtcdClientSet(ctx, state.instance, r.Client)
768+
if err != nil {
769+
return fmt.Errorf("failed to create etcd client: %w", err)
770+
}
771+
defer clusterClient.Close()
772+
for _, sc := range singleClients {
773+
defer sc.Close()
774+
}
775+
776+
// Fetch etcd statuses
777+
state.etcdStatuses = make([]etcdStatus, len(singleClients))
778+
var wg sync.WaitGroup
779+
ctxWithTimeout, cancel := context.WithTimeout(ctx, etcdDefaultTimeout)
780+
defer cancel()
781+
for i := range singleClients {
782+
wg.Add(1)
783+
go func(i int) {
784+
defer wg.Done()
785+
state.etcdStatuses[i].fill(ctxWithTimeout, singleClients[i])
786+
}(i)
787+
}
788+
wg.Wait()
789+
790+
// Check if all reachable members have the same cluster ID
791+
state.setClusterID()
792+
if state.inSplitbrain() {
793+
// Cluster is in split-brain, set error status
794+
return fmt.Errorf("cluster is in split-brain")
795+
}
796+
797+
// Check if cluster has quorum
798+
if !state.clusterHasQuorum() {
799+
// Cluster does not have quorum, wait or require user intervention
800+
return fmt.Errorf("cluster has lost quorum")
801+
}
802+
803+
// Check if all members are managed by the operator
804+
if !state.allMembersManaged() {
805+
// Not all members are managed by the operator
806+
return fmt.Errorf("not all members are managed by the operator")
807+
}
808+
809+
// Promote any learners
810+
if state.hasLearners() {
811+
if err := r.promoteLearners(ctx, state); err != nil {
812+
return fmt.Errorf("failed to promote learners: %w", err)
813+
}
814+
}
815+
816+
// Ensure ConfigMap with initial cluster matching existing members and cluster state = existing
817+
if err := r.createOrUpdateClusterStateConfigMap(ctx, state, "existing", state.desiredReplicas()); err != nil {
818+
return fmt.Errorf("failed to create or update cluster state ConfigMap: %w", err)
819+
}
820+
821+
// Ensure StatefulSet with replicas = max member ordinal + 1
822+
maxOrdinal := state.maxMemberOrdinal()
823+
state.instance.Spec.Replicas = pointer.Int32(int32(maxOrdinal + 1))
824+
if err := r.createOrUpdateStatefulSet(ctx, state); err != nil {
825+
return fmt.Errorf("failed to create or update StatefulSet: %w", err)
826+
}
827+
828+
// Check if all members are healthy
829+
if !state.allMembersHealthy() {
830+
// On timeout, evict member
831+
return fmt.Errorf("not all members are healthy")
832+
}
833+
834+
// Check if all StatefulSet pods are present in the member list
835+
if !state.allPodsInMemberList() {
836+
// Handle missing members
837+
return fmt.Errorf("some pods are not present in the member list")
838+
}
839+
840+
// Check if EtcdCluster size equals StatefulSet size
841+
if *state.instance.Spec.Replicas == *state.statefulSet.Spec.Replicas {
842+
// Set cluster status to ready
843+
// Update status conditions accordingly
844+
return nil
845+
} else if *state.instance.Spec.Replicas > *state.statefulSet.Spec.Replicas {
846+
// Scale up
847+
// Member add API call, increment StatefulSet size
848+
if err := r.scaleUpCluster(ctx, state); err != nil {
849+
return fmt.Errorf("failed to scale up cluster: %w", err)
850+
}
851+
return nil
852+
} else {
853+
// Scale down
854+
// Member remove API call, decrement StatefulSet size, delete PVC
855+
if err := r.scaleDownCluster(ctx, state); err != nil {
856+
return fmt.Errorf("failed to scale down cluster: %w", err)
857+
}
858+
return nil
859+
}
860+
}
861+
862+
return nil
710863
}
711864

712-
// TODO!
713-
// nolint:unused
714-
func (r *EtcdClusterReconciler) createOrUpdateClusterStateConfigMap(ctx context.Context) error {
715-
return fmt.Errorf("not yet implemented")
865+
// scaleUpCluster adds a new member to the etcd cluster and scales up the StatefulSet.
866+
func (r *EtcdClusterReconciler) scaleUpCluster(ctx context.Context, state *observables) error {
867+
// Add a new member to the etcd cluster
868+
clusterClient, _, err := factory.NewEtcdClientSet(ctx, state.instance, r.Client)
869+
if err != nil {
870+
return fmt.Errorf("failed to create etcd client: %w", err)
871+
}
872+
defer clusterClient.Close()
873+
874+
// Determine the new member's peer URLs
875+
newMemberOrdinal := state.maxMemberOrdinal() + 1
876+
newMemberName := fmt.Sprintf("%s-%d", state.instance.Name, newMemberOrdinal)
877+
newMemberPeerURL := fmt.Sprintf("https://%s.%s.%s.svc:2380", newMemberName, factory.GetHeadlessServiceName(state.instance), state.instance.Namespace)
878+
879+
// Add the new member
880+
_, err = clusterClient.MemberAdd(ctx, []string{newMemberPeerURL})
881+
if err != nil {
882+
return fmt.Errorf("failed to add new member: %w", err)
883+
}
884+
885+
// Update StatefulSet replicas
886+
state.instance.Spec.Replicas = pointer.Int32(int32(newMemberOrdinal + 1))
887+
if err := r.createOrUpdateStatefulSet(ctx, state); err != nil {
888+
return fmt.Errorf("failed to update StatefulSet: %w", err)
889+
}
890+
891+
return nil
716892
}
717893

718-
// TODO!
719-
// nolint:unused
720-
func (r *EtcdClusterReconciler) createOrUpdateStatefulSet(ctx context.Context) error {
721-
return fmt.Errorf("not yet implemented")
894+
// scaleDownCluster removes a member from the etcd cluster and scales down the StatefulSet.
895+
func (r *EtcdClusterReconciler) scaleDownCluster(ctx context.Context, state *observables) error {
896+
// Remove a member from the etcd cluster
897+
clusterClient, _, err := factory.NewEtcdClientSet(ctx, state.instance, r.Client)
898+
if err != nil {
899+
return fmt.Errorf("failed to create etcd client: %w", err)
900+
}
901+
defer clusterClient.Close()
902+
903+
// Determine the member to remove (highest ordinal)
904+
memberToRemoveOrdinal := state.maxMemberOrdinal()
905+
memberToRemoveName := fmt.Sprintf("%s-%d", state.instance.Name, memberToRemoveOrdinal)
906+
907+
// Find the member ID to remove
908+
var memberIDToRemove uint64
909+
memberListResp, err := clusterClient.MemberList(ctx)
910+
if err != nil {
911+
return fmt.Errorf("failed to get member list: %w", err)
912+
}
913+
for _, member := range memberListResp.Members {
914+
if member.Name == memberToRemoveName {
915+
memberIDToRemove = member.ID
916+
break
917+
}
918+
}
919+
if memberIDToRemove == 0 {
920+
return fmt.Errorf("member to remove not found: %s", memberToRemoveName)
921+
}
922+
923+
// Remove the member
924+
if _, err := clusterClient.MemberRemove(ctx, memberIDToRemove); err != nil {
925+
return fmt.Errorf("failed to remove member: %w", err)
926+
}
927+
928+
// Update StatefulSet replicas
929+
state.instance.Spec.Replicas = pointer.Int32(int32(memberToRemoveOrdinal))
930+
if err := r.createOrUpdateStatefulSet(ctx, state); err != nil {
931+
return fmt.Errorf("failed to update StatefulSet: %w", err)
932+
}
933+
934+
// Delete the PVC associated with the member
935+
if err := r.deletePVC(ctx, state.instance, memberToRemoveOrdinal); err != nil {
936+
return fmt.Errorf("failed to delete PVC: %w", err)
937+
}
938+
939+
return nil
722940
}
723941

724-
// TODO!
725-
// nolint:unused
726-
func (r *EtcdClusterReconciler) promoteLearners(ctx context.Context) error {
727-
return fmt.Errorf("not yet implemented")
942+
func (r *EtcdClusterReconciler) createOrUpdateClusterStateConfigMap(ctx context.Context, state *observables, clusterState string, replicas int) error {
943+
cm := factory.TemplateClusterStateConfigMap(state.instance, clusterState, replicas)
944+
if err := ctrl.SetControllerReference(state.instance, cm, r.Scheme); err != nil {
945+
return fmt.Errorf("cannot set controller reference: %w", err)
946+
}
947+
if err := r.patchOrCreateObject(ctx, cm); err != nil {
948+
return fmt.Errorf("failed to create or update cluster state ConfigMap: %w", err)
949+
}
950+
return nil
951+
}
952+
953+
// createOrUpdateStatefulSet ensures the StatefulSet is up-to-date with the desired state.
954+
func (r *EtcdClusterReconciler) createOrUpdateStatefulSet(ctx context.Context, state *observables) error {
955+
if err := factory.CreateOrUpdateStatefulSet(ctx, state.instance, r.Client); err != nil {
956+
return fmt.Errorf("failed to create or update StatefulSet: %w", err)
957+
}
958+
return nil
959+
}
960+
961+
// promoteLearners promotes any learner members in the etcd cluster to full voting members.
962+
func (r *EtcdClusterReconciler) promoteLearners(ctx context.Context, state *observables) error {
963+
// Get a client to the etcd cluster
964+
clusterClient, _, err := factory.NewEtcdClientSet(ctx, state.instance, r.Client)
965+
if err != nil {
966+
return fmt.Errorf("failed to create etcd client: %w", err)
967+
}
968+
defer clusterClient.Close()
969+
970+
// Retrieve the current member list
971+
memberListResp, err := clusterClient.MemberList(ctx)
972+
if err != nil {
973+
return fmt.Errorf("failed to get member list: %w", err)
974+
}
975+
976+
// Promote any learner members to full voting members
977+
for _, member := range memberListResp.Members {
978+
if member.IsLearner {
979+
_, err := clusterClient.MemberPromote(ctx, member.ID)
980+
if err != nil {
981+
return fmt.Errorf("failed to promote member %s: %w", member.Name, err)
982+
}
983+
log.Info(ctx, "Promoted learner to member", "member", member.Name)
984+
}
985+
}
986+
987+
return nil
988+
}
989+
990+
// deletePVC deletes the PersistentVolumeClaim for a given member ordinal.
991+
func (r *EtcdClusterReconciler) deletePVC(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster, ordinal int) error {
992+
pvcName := fmt.Sprintf("%s-%s-%d", factory.GetPVCName(cluster), cluster.Name, ordinal)
993+
pvc := &corev1.PersistentVolumeClaim{
994+
ObjectMeta: metav1.ObjectMeta{
995+
Name: pvcName,
996+
Namespace: cluster.Namespace,
997+
},
998+
}
999+
if err := r.Client.Delete(ctx, pvc); err != nil && !errors.IsNotFound(err) {
1000+
return fmt.Errorf("failed to delete PVC %s: %w", pvcName, err)
1001+
}
1002+
return nil
7281003
}

0 commit comments

Comments
 (0)