diff --git a/internal/controller/etcdcluster_controller.go b/internal/controller/etcdcluster_controller.go index 879cfb34..ffdb4dc2 100644 --- a/internal/controller/etcdcluster_controller.go +++ b/internal/controller/etcdcluster_controller.go @@ -31,6 +31,7 @@ import ( "github.com/aenix-io/etcd-operator/internal/log" policyv1 "k8s.io/api/policy/v1" + "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/predicate" @@ -121,19 +122,20 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, err } + // If endpoints are not found if !state.endpointsFound { if !state.stsExists { - return r.createClusterFromScratch(ctx, state) // TODO: needs implementing + return r.createClusterFromScratch(ctx, state) } - // update sts pod template (and only pod template) if it doesn't match desired state - if !state.statefulSetPodSpecCorrect() { // TODO: needs implementing - desiredSts := factory.TemplateStatefulSet() // TODO: needs implementing + // Update StatefulSet pod template if necessary + if !state.statefulSetPodSpecCorrect() { + desiredSts := factory.TemplateStatefulSet() state.statefulSet.Spec.Template.Spec = desiredSts.Spec.Template.Spec return ctrl.Result{}, r.patchOrCreateObject(ctx, &state.statefulSet) } - if !state.statefulSetReady() { // TODO: needs improved implementation? + if !state.statefulSetReady() { return ctrl.Result{}, fmt.Errorf("waiting for statefulset to become ready") } @@ -142,11 +144,12 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) } if *state.instance.Spec.Replicas == 0 { - // cluster successfully scaled down to zero + // Cluster successfully scaled down to zero return ctrl.Result{}, nil } - return ctrl.Result{}, r.scaleUpFromZero(ctx) // TODO: needs implementing + // **Updated function call with `state` parameter** + return ctrl.Result{}, r.scaleUpFromZero(ctx, state) } // get status of every endpoint and member list from every endpoint @@ -174,7 +177,7 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) } if !memberReached { - return ctrl.Result{}, r.createOrUpdateStatefulSet(ctx) + return ctrl.Result{}, r.createOrUpdateStatefulSet(ctx, state) } state.setClusterID() @@ -198,15 +201,15 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) } if state.hasLearners() { - return ctrl.Result{}, r.promoteLearners(ctx) + return ctrl.Result{}, r.promoteLearners(ctx, state) } - if err := r.createOrUpdateClusterStateConfigMap(ctx); err != nil { - return ctrl.Result{}, err + if err := r.createOrUpdateClusterStateConfigMap(ctx, state, "new", 1); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to create or update cluster state ConfigMap: %w", err) } if !state.statefulSetPodSpecCorrect() { - return ctrl.Result{}, r.createOrUpdateStatefulSet(ctx) + return ctrl.Result{}, r.createOrUpdateStatefulSet(ctx, state) } // if size is different we have to remove statefulset it will be recreated in the next step @@ -703,26 +706,298 @@ func (r *EtcdClusterReconciler) createClusterFromScratch(ctx context.Context, st panic("not yet implemented") } -// TODO! -// nolint:unused -func (r *EtcdClusterReconciler) scaleUpFromZero(ctx context.Context) error { - return fmt.Errorf("not yet implemented") +// scaleUpFromZero scales the etcd cluster from zero replicas to the desired replica count following the provided flowchart. +func (r *EtcdClusterReconciler) scaleUpFromZero(ctx context.Context, state *observables) error { + // Step 1: Ensure services (client service, headless service, PDB) + if err := r.ensureUnconditionalObjects(ctx, state); err != nil { + return fmt.Errorf("failed to ensure services: %w", err) + } + + // Step 2: Check if there are any endpoints + if !state.endpointsFound { + // No endpoints found + // Check if the StatefulSet is present + if !state.stsExists { + // StatefulSet is not present + // Create cluster from scratch + _, err := r.createClusterFromScratch(ctx, state) + return err + } else { + // StatefulSet exists + // Check if it has the correct pod spec + if !state.statefulSetPodSpecCorrect() { + // Patch the podspec + desiredSts := factory.TemplateStatefulSet() + state.statefulSet.Spec.Template.Spec = desiredSts.Spec.Template.Spec + if err := r.patchOrCreateObject(ctx, &state.statefulSet); err != nil { + return fmt.Errorf("failed to patch StatefulSet: %w", err) + } + return nil + } + + // Check if StatefulSet is ready + if !state.statefulSetReady() { + // Wait and hope it becomes ready + return fmt.Errorf("waiting for StatefulSet to become ready") + } + + // Check if EtcdCluster.spec.replicas == 0 + if *state.instance.Spec.Replicas == 0 { + // Cluster successfully scaled to zero, stop + return nil + } + + // Ensure ConfigMap with initial cluster state "new" and initial cluster peers with single member name-0 + // Increment StatefulSet size + if err := r.createOrUpdateClusterStateConfigMap(ctx, state, "new", 1); err != nil { + return fmt.Errorf("failed to create or update cluster state ConfigMap: %w", err) + } + + // Update StatefulSet replicas to 1 + state.statefulSet.Spec.Replicas = pointer.Int32(1) + if err := r.patchOrCreateObject(ctx, &state.statefulSet); err != nil { + return fmt.Errorf("failed to scale up StatefulSet: %w", err) + } + + return nil + } + } else { + // Endpoints found + // Connect to the cluster and fetch all statuses + clusterClient, singleClients, err := factory.NewEtcdClientSet(ctx, state.instance, r.Client) + if err != nil { + return fmt.Errorf("failed to create etcd client: %w", err) + } + defer clusterClient.Close() + for _, sc := range singleClients { + defer sc.Close() + } + + // Fetch etcd statuses + state.etcdStatuses = make([]etcdStatus, len(singleClients)) + var wg sync.WaitGroup + ctxWithTimeout, cancel := context.WithTimeout(ctx, etcdDefaultTimeout) + defer cancel() + for i := range singleClients { + wg.Add(1) + go func(i int) { + defer wg.Done() + state.etcdStatuses[i].fill(ctxWithTimeout, singleClients[i]) + }(i) + } + wg.Wait() + + // Check if all reachable members have the same cluster ID + state.setClusterID() + if state.inSplitbrain() { + // Cluster is in split-brain, set error status + return fmt.Errorf("cluster is in split-brain") + } + + // Check if cluster has quorum + if !state.clusterHasQuorum() { + // Cluster does not have quorum, wait or require user intervention + return fmt.Errorf("cluster has lost quorum") + } + + // Check if all members are managed by the operator + if !state.allMembersManaged() { + // Not all members are managed by the operator + return fmt.Errorf("not all members are managed by the operator") + } + + // Promote any learners + if state.hasLearners() { + if err := r.promoteLearners(ctx, state); err != nil { + return fmt.Errorf("failed to promote learners: %w", err) + } + } + + // Ensure ConfigMap with initial cluster matching existing members and cluster state = existing + if err := r.createOrUpdateClusterStateConfigMap(ctx, state, "existing", state.desiredReplicas()); err != nil { + return fmt.Errorf("failed to create or update cluster state ConfigMap: %w", err) + } + + // Ensure StatefulSet with replicas = max member ordinal + 1 + maxOrdinal := state.maxMemberOrdinal() + state.instance.Spec.Replicas = pointer.Int32(int32(maxOrdinal + 1)) + if err := r.createOrUpdateStatefulSet(ctx, state); err != nil { + return fmt.Errorf("failed to create or update StatefulSet: %w", err) + } + + // Check if all members are healthy + if !state.allMembersHealthy() { + // On timeout, evict member + return fmt.Errorf("not all members are healthy") + } + + // Check if all StatefulSet pods are present in the member list + if !state.allPodsInMemberList() { + // Handle missing members + return fmt.Errorf("some pods are not present in the member list") + } + + // Check if EtcdCluster size equals StatefulSet size + if *state.instance.Spec.Replicas == *state.statefulSet.Spec.Replicas { + // Set cluster status to ready + // Update status conditions accordingly + return nil + } else if *state.instance.Spec.Replicas > *state.statefulSet.Spec.Replicas { + // Scale up + // Member add API call, increment StatefulSet size + if err := r.scaleUpCluster(ctx, state); err != nil { + return fmt.Errorf("failed to scale up cluster: %w", err) + } + return nil + } else { + // Scale down + // Member remove API call, decrement StatefulSet size, delete PVC + if err := r.scaleDownCluster(ctx, state); err != nil { + return fmt.Errorf("failed to scale down cluster: %w", err) + } + return nil + } + } + + return nil } -// TODO! -// nolint:unused -func (r *EtcdClusterReconciler) createOrUpdateClusterStateConfigMap(ctx context.Context) error { - return fmt.Errorf("not yet implemented") +// scaleUpCluster adds a new member to the etcd cluster and scales up the StatefulSet. +func (r *EtcdClusterReconciler) scaleUpCluster(ctx context.Context, state *observables) error { + // Add a new member to the etcd cluster + clusterClient, _, err := factory.NewEtcdClientSet(ctx, state.instance, r.Client) + if err != nil { + return fmt.Errorf("failed to create etcd client: %w", err) + } + defer clusterClient.Close() + + // Determine the new member's peer URLs + newMemberOrdinal := state.maxMemberOrdinal() + 1 + newMemberName := fmt.Sprintf("%s-%d", state.instance.Name, newMemberOrdinal) + newMemberPeerURL := fmt.Sprintf("https://%s.%s.%s.svc:2380", newMemberName, factory.GetHeadlessServiceName(state.instance), state.instance.Namespace) + + // Add the new member + _, err = clusterClient.MemberAdd(ctx, []string{newMemberPeerURL}) + if err != nil { + return fmt.Errorf("failed to add new member: %w", err) + } + + // Update StatefulSet replicas + state.instance.Spec.Replicas = pointer.Int32(int32(newMemberOrdinal + 1)) + if err := r.createOrUpdateStatefulSet(ctx, state); err != nil { + return fmt.Errorf("failed to update StatefulSet: %w", err) + } + + return nil } -// TODO! -// nolint:unused -func (r *EtcdClusterReconciler) createOrUpdateStatefulSet(ctx context.Context) error { - return fmt.Errorf("not yet implemented") +// scaleDownCluster removes a member from the etcd cluster and scales down the StatefulSet. +func (r *EtcdClusterReconciler) scaleDownCluster(ctx context.Context, state *observables) error { + // Remove a member from the etcd cluster + clusterClient, _, err := factory.NewEtcdClientSet(ctx, state.instance, r.Client) + if err != nil { + return fmt.Errorf("failed to create etcd client: %w", err) + } + defer clusterClient.Close() + + // Determine the member to remove (highest ordinal) + memberToRemoveOrdinal := state.maxMemberOrdinal() + memberToRemoveName := fmt.Sprintf("%s-%d", state.instance.Name, memberToRemoveOrdinal) + + // Find the member ID to remove + var memberIDToRemove uint64 + memberListResp, err := clusterClient.MemberList(ctx) + if err != nil { + return fmt.Errorf("failed to get member list: %w", err) + } + for _, member := range memberListResp.Members { + if member.Name == memberToRemoveName { + memberIDToRemove = member.ID + break + } + } + if memberIDToRemove == 0 { + return fmt.Errorf("member to remove not found: %s", memberToRemoveName) + } + + // Remove the member + if _, err := clusterClient.MemberRemove(ctx, memberIDToRemove); err != nil { + return fmt.Errorf("failed to remove member: %w", err) + } + + // Update StatefulSet replicas + state.instance.Spec.Replicas = pointer.Int32(int32(memberToRemoveOrdinal)) + if err := r.createOrUpdateStatefulSet(ctx, state); err != nil { + return fmt.Errorf("failed to update StatefulSet: %w", err) + } + + // Delete the PVC associated with the member + if err := r.deletePVC(ctx, state.instance, memberToRemoveOrdinal); err != nil { + return fmt.Errorf("failed to delete PVC: %w", err) + } + + return nil } -// TODO! -// nolint:unused -func (r *EtcdClusterReconciler) promoteLearners(ctx context.Context) error { - return fmt.Errorf("not yet implemented") +func (r *EtcdClusterReconciler) createOrUpdateClusterStateConfigMap(ctx context.Context, state *observables, clusterState string, replicas int) error { + cm := factory.TemplateClusterStateConfigMap(state.instance, clusterState, replicas) + if err := ctrl.SetControllerReference(state.instance, cm, r.Scheme); err != nil { + return fmt.Errorf("cannot set controller reference: %w", err) + } + if err := r.patchOrCreateObject(ctx, cm); err != nil { + return fmt.Errorf("failed to create or update cluster state ConfigMap: %w", err) + } + return nil +} + +// createOrUpdateStatefulSet ensures the StatefulSet is up-to-date with the desired state. +func (r *EtcdClusterReconciler) createOrUpdateStatefulSet(ctx context.Context, state *observables) error { + if err := factory.CreateOrUpdateStatefulSet(ctx, state.instance, r.Client); err != nil { + return fmt.Errorf("failed to create or update StatefulSet: %w", err) + } + return nil +} + +// promoteLearners promotes any learner members in the etcd cluster to full voting members. +func (r *EtcdClusterReconciler) promoteLearners(ctx context.Context, state *observables) error { + // Get a client to the etcd cluster + clusterClient, _, err := factory.NewEtcdClientSet(ctx, state.instance, r.Client) + if err != nil { + return fmt.Errorf("failed to create etcd client: %w", err) + } + defer clusterClient.Close() + + // Retrieve the current member list + memberListResp, err := clusterClient.MemberList(ctx) + if err != nil { + return fmt.Errorf("failed to get member list: %w", err) + } + + // Promote any learner members to full voting members + for _, member := range memberListResp.Members { + if member.IsLearner { + _, err := clusterClient.MemberPromote(ctx, member.ID) + if err != nil { + return fmt.Errorf("failed to promote member %s: %w", member.Name, err) + } + log.Info(ctx, "Promoted learner to member", "member", member.Name) + } + } + + return nil +} + +// deletePVC deletes the PersistentVolumeClaim for a given member ordinal. +func (r *EtcdClusterReconciler) deletePVC(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster, ordinal int) error { + pvcName := fmt.Sprintf("%s-%s-%d", factory.GetPVCName(cluster), cluster.Name, ordinal) + pvc := &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: pvcName, + Namespace: cluster.Namespace, + }, + } + if err := r.Client.Delete(ctx, pvc); err != nil && !errors.IsNotFound(err) { + return fmt.Errorf("failed to delete PVC %s: %w", pvcName, err) + } + return nil } diff --git a/internal/controller/observables.go b/internal/controller/observables.go index 6c523c35..29aea3a1 100644 --- a/internal/controller/observables.go +++ b/internal/controller/observables.go @@ -2,6 +2,7 @@ package controller import ( "context" + "fmt" "strconv" "strings" "sync" @@ -203,3 +204,77 @@ func (o *observables) hasLearners() bool { } return false } + +// allMembersManaged checks if all members are managed by the operator. +func (o *observables) allMembersManaged() bool { + for _, status := range o.etcdStatuses { + if status.memberList != nil { + for _, member := range status.memberList.Members { + if !strings.HasPrefix(member.Name, o.instance.Name) { + return false + } + } + } + } + return true +} + +// allMembersHealthy checks if all members are healthy. +func (o *observables) allMembersHealthy() bool { + for _, status := range o.etcdStatuses { + if status.endpointStatusError != nil { + return false + } + } + return true +} + +// allPodsInMemberList checks if all StatefulSet pods are present in the member list. +func (o *observables) allPodsInMemberList() bool { + memberNames := make(map[string]struct{}) + for _, status := range o.etcdStatuses { + if status.memberList != nil { + for _, member := range status.memberList.Members { + memberNames[member.Name] = struct{}{} + } + } + } + + expectedPodNames := o.getExpectedPodNames() + for _, podName := range expectedPodNames { + if _, exists := memberNames[podName]; !exists { + return false + } + } + return true +} + +// getExpectedPodNames returns the expected pod names based on the StatefulSet. +func (o *observables) getExpectedPodNames() []string { + var podNames []string + replicas := int(*o.statefulSet.Spec.Replicas) + for i := 0; i < replicas; i++ { + podNames = append(podNames, fmt.Sprintf("%s-%d", o.instance.Name, i)) + } + return podNames +} + +// maxMemberOrdinal finds the maximum ordinal of the etcd members. +func (o *observables) maxMemberOrdinal() int { + maxOrdinal := -1 + for _, status := range o.etcdStatuses { + if status.memberList != nil { + for _, member := range status.memberList.Members { + parts := strings.Split(member.Name, "-") + if len(parts) > 1 { + ordinalStr := parts[len(parts)-1] + ordinal, err := strconv.Atoi(ordinalStr) + if err == nil && ordinal > maxOrdinal { + maxOrdinal = ordinal + } + } + } + } + } + return maxOrdinal +}