Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
331 changes: 303 additions & 28 deletions internal/controller/etcdcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"github.com/aenix-io/etcd-operator/internal/log"
policyv1 "k8s.io/api/policy/v1"

"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/predicate"

Expand Down Expand Up @@ -121,19 +122,20 @@
return ctrl.Result{}, err
}

// If endpoints are not found
if !state.endpointsFound {
if !state.stsExists {
return r.createClusterFromScratch(ctx, state) // TODO: needs implementing
return r.createClusterFromScratch(ctx, state)
}

// update sts pod template (and only pod template) if it doesn't match desired state
if !state.statefulSetPodSpecCorrect() { // TODO: needs implementing
desiredSts := factory.TemplateStatefulSet() // TODO: needs implementing
// Update StatefulSet pod template if necessary
if !state.statefulSetPodSpecCorrect() {
desiredSts := factory.TemplateStatefulSet()
state.statefulSet.Spec.Template.Spec = desiredSts.Spec.Template.Spec
return ctrl.Result{}, r.patchOrCreateObject(ctx, &state.statefulSet)
}

if !state.statefulSetReady() { // TODO: needs improved implementation?
if !state.statefulSetReady() {
return ctrl.Result{}, fmt.Errorf("waiting for statefulset to become ready")
}

Expand All @@ -142,11 +144,12 @@
}

if *state.instance.Spec.Replicas == 0 {
// cluster successfully scaled down to zero
// Cluster successfully scaled down to zero
return ctrl.Result{}, nil
}

return ctrl.Result{}, r.scaleUpFromZero(ctx) // TODO: needs implementing
// **Updated function call with `state` parameter**
return ctrl.Result{}, r.scaleUpFromZero(ctx, state)
}

// get status of every endpoint and member list from every endpoint
Expand Down Expand Up @@ -174,7 +177,7 @@
}

if !memberReached {
return ctrl.Result{}, r.createOrUpdateStatefulSet(ctx)
return ctrl.Result{}, r.createOrUpdateStatefulSet(ctx, state)
}

state.setClusterID()
Expand All @@ -198,15 +201,15 @@
}

if state.hasLearners() {
return ctrl.Result{}, r.promoteLearners(ctx)
return ctrl.Result{}, r.promoteLearners(ctx, state)
}

if err := r.createOrUpdateClusterStateConfigMap(ctx); err != nil {
return ctrl.Result{}, err
if err := r.createOrUpdateClusterStateConfigMap(ctx, state, "new", 1); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to create or update cluster state ConfigMap: %w", err)
}

if !state.statefulSetPodSpecCorrect() {
return ctrl.Result{}, r.createOrUpdateStatefulSet(ctx)
return ctrl.Result{}, r.createOrUpdateStatefulSet(ctx, state)
}

// if size is different we have to remove statefulset it will be recreated in the next step
Expand Down Expand Up @@ -703,26 +706,298 @@
panic("not yet implemented")
}

// TODO!
// nolint:unused
func (r *EtcdClusterReconciler) scaleUpFromZero(ctx context.Context) error {
return fmt.Errorf("not yet implemented")
// scaleUpFromZero scales the etcd cluster from zero replicas to the desired replica count following the provided flowchart.
func (r *EtcdClusterReconciler) scaleUpFromZero(ctx context.Context, state *observables) error {
// Step 1: Ensure services (client service, headless service, PDB)
if err := r.ensureUnconditionalObjects(ctx, state); err != nil {
return fmt.Errorf("failed to ensure services: %w", err)
}

// Step 2: Check if there are any endpoints
if !state.endpointsFound {
// No endpoints found
// Check if the StatefulSet is present
if !state.stsExists {
// StatefulSet is not present
// Create cluster from scratch
_, err := r.createClusterFromScratch(ctx, state)
return err
} else {
// StatefulSet exists
// Check if it has the correct pod spec
if !state.statefulSetPodSpecCorrect() {
// Patch the podspec
desiredSts := factory.TemplateStatefulSet()
state.statefulSet.Spec.Template.Spec = desiredSts.Spec.Template.Spec
if err := r.patchOrCreateObject(ctx, &state.statefulSet); err != nil {
return fmt.Errorf("failed to patch StatefulSet: %w", err)
}
return nil
}

// Check if StatefulSet is ready
if !state.statefulSetReady() {
// Wait and hope it becomes ready
return fmt.Errorf("waiting for StatefulSet to become ready")
}

// Check if EtcdCluster.spec.replicas == 0
if *state.instance.Spec.Replicas == 0 {
// Cluster successfully scaled to zero, stop
return nil
}

// Ensure ConfigMap with initial cluster state "new" and initial cluster peers with single member name-0
// Increment StatefulSet size
if err := r.createOrUpdateClusterStateConfigMap(ctx, state, "new", 1); err != nil {
return fmt.Errorf("failed to create or update cluster state ConfigMap: %w", err)
}

// Update StatefulSet replicas to 1
state.statefulSet.Spec.Replicas = pointer.Int32(1)
if err := r.patchOrCreateObject(ctx, &state.statefulSet); err != nil {
return fmt.Errorf("failed to scale up StatefulSet: %w", err)
}

return nil
}
} else {
// Endpoints found
// Connect to the cluster and fetch all statuses
clusterClient, singleClients, err := factory.NewEtcdClientSet(ctx, state.instance, r.Client)
if err != nil {
return fmt.Errorf("failed to create etcd client: %w", err)
}
defer clusterClient.Close()
for _, sc := range singleClients {
defer sc.Close()
}

// Fetch etcd statuses
state.etcdStatuses = make([]etcdStatus, len(singleClients))
var wg sync.WaitGroup
ctxWithTimeout, cancel := context.WithTimeout(ctx, etcdDefaultTimeout)
defer cancel()
for i := range singleClients {
wg.Add(1)
go func(i int) {
defer wg.Done()
state.etcdStatuses[i].fill(ctxWithTimeout, singleClients[i])
}(i)
}
wg.Wait()

// Check if all reachable members have the same cluster ID
state.setClusterID()
if state.inSplitbrain() {
// Cluster is in split-brain, set error status
return fmt.Errorf("cluster is in split-brain")
}

// Check if cluster has quorum
if !state.clusterHasQuorum() {
// Cluster does not have quorum, wait or require user intervention
return fmt.Errorf("cluster has lost quorum")
}

// Check if all members are managed by the operator
if !state.allMembersManaged() {
// Not all members are managed by the operator
return fmt.Errorf("not all members are managed by the operator")
}

// Promote any learners
if state.hasLearners() {
if err := r.promoteLearners(ctx, state); err != nil {
return fmt.Errorf("failed to promote learners: %w", err)
}
}

// Ensure ConfigMap with initial cluster matching existing members and cluster state = existing
if err := r.createOrUpdateClusterStateConfigMap(ctx, state, "existing", state.desiredReplicas()); err != nil {
return fmt.Errorf("failed to create or update cluster state ConfigMap: %w", err)
}

// Ensure StatefulSet with replicas = max member ordinal + 1
maxOrdinal := state.maxMemberOrdinal()
state.instance.Spec.Replicas = pointer.Int32(int32(maxOrdinal + 1))
if err := r.createOrUpdateStatefulSet(ctx, state); err != nil {
return fmt.Errorf("failed to create or update StatefulSet: %w", err)
}

// Check if all members are healthy
if !state.allMembersHealthy() {
// On timeout, evict member
return fmt.Errorf("not all members are healthy")
}

// Check if all StatefulSet pods are present in the member list
if !state.allPodsInMemberList() {
// Handle missing members
return fmt.Errorf("some pods are not present in the member list")
}

// Check if EtcdCluster size equals StatefulSet size
if *state.instance.Spec.Replicas == *state.statefulSet.Spec.Replicas {
// Set cluster status to ready
// Update status conditions accordingly
return nil
} else if *state.instance.Spec.Replicas > *state.statefulSet.Spec.Replicas {
// Scale up
// Member add API call, increment StatefulSet size
if err := r.scaleUpCluster(ctx, state); err != nil {
return fmt.Errorf("failed to scale up cluster: %w", err)
}
return nil
} else {
// Scale down
// Member remove API call, decrement StatefulSet size, delete PVC
if err := r.scaleDownCluster(ctx, state); err != nil {
return fmt.Errorf("failed to scale down cluster: %w", err)
}
return nil
}
}

return nil

Check failure on line 862 in internal/controller/etcdcluster_controller.go

View workflow job for this annotation

GitHub Actions / pre-commit

unreachable code

Check failure on line 862 in internal/controller/etcdcluster_controller.go

View workflow job for this annotation

GitHub Actions / test on k8s penultimate version

unreachable code

Check failure on line 862 in internal/controller/etcdcluster_controller.go

View workflow job for this annotation

GitHub Actions / test on k8s previous version

unreachable code

Check failure on line 862 in internal/controller/etcdcluster_controller.go

View workflow job for this annotation

GitHub Actions / test on k8s latest version

unreachable code
}

// TODO!
// nolint:unused
func (r *EtcdClusterReconciler) createOrUpdateClusterStateConfigMap(ctx context.Context) error {
return fmt.Errorf("not yet implemented")
// scaleUpCluster adds a new member to the etcd cluster and scales up the StatefulSet.
func (r *EtcdClusterReconciler) scaleUpCluster(ctx context.Context, state *observables) error {
// Add a new member to the etcd cluster
clusterClient, _, err := factory.NewEtcdClientSet(ctx, state.instance, r.Client)
if err != nil {
return fmt.Errorf("failed to create etcd client: %w", err)
}
defer clusterClient.Close()

// Determine the new member's peer URLs
newMemberOrdinal := state.maxMemberOrdinal() + 1
newMemberName := fmt.Sprintf("%s-%d", state.instance.Name, newMemberOrdinal)
newMemberPeerURL := fmt.Sprintf("https://%s.%s.%s.svc:2380", newMemberName, factory.GetHeadlessServiceName(state.instance), state.instance.Namespace)

// Add the new member
_, err = clusterClient.MemberAdd(ctx, []string{newMemberPeerURL})

Check failure on line 880 in internal/controller/etcdcluster_controller.go

View workflow job for this annotation

GitHub Actions / nilaway-lint

error: Potential nil panic detected. Observed nil flow from source to dereference point:
if err != nil {
return fmt.Errorf("failed to add new member: %w", err)
}

// Update StatefulSet replicas
state.instance.Spec.Replicas = pointer.Int32(int32(newMemberOrdinal + 1))
if err := r.createOrUpdateStatefulSet(ctx, state); err != nil {
return fmt.Errorf("failed to update StatefulSet: %w", err)
}

return nil
}

// TODO!
// nolint:unused
func (r *EtcdClusterReconciler) createOrUpdateStatefulSet(ctx context.Context) error {
return fmt.Errorf("not yet implemented")
// scaleDownCluster removes a member from the etcd cluster and scales down the StatefulSet.
func (r *EtcdClusterReconciler) scaleDownCluster(ctx context.Context, state *observables) error {
// Remove a member from the etcd cluster
clusterClient, _, err := factory.NewEtcdClientSet(ctx, state.instance, r.Client)
if err != nil {
return fmt.Errorf("failed to create etcd client: %w", err)
}
defer clusterClient.Close()

// Determine the member to remove (highest ordinal)
memberToRemoveOrdinal := state.maxMemberOrdinal()
memberToRemoveName := fmt.Sprintf("%s-%d", state.instance.Name, memberToRemoveOrdinal)

// Find the member ID to remove
var memberIDToRemove uint64
memberListResp, err := clusterClient.MemberList(ctx)
if err != nil {
return fmt.Errorf("failed to get member list: %w", err)
}
for _, member := range memberListResp.Members {
if member.Name == memberToRemoveName {
memberIDToRemove = member.ID
break
}
}
if memberIDToRemove == 0 {
return fmt.Errorf("member to remove not found: %s", memberToRemoveName)
}

// Remove the member
if _, err := clusterClient.MemberRemove(ctx, memberIDToRemove); err != nil {
return fmt.Errorf("failed to remove member: %w", err)
}

// Update StatefulSet replicas
state.instance.Spec.Replicas = pointer.Int32(int32(memberToRemoveOrdinal))
if err := r.createOrUpdateStatefulSet(ctx, state); err != nil {
return fmt.Errorf("failed to update StatefulSet: %w", err)
}

// Delete the PVC associated with the member
if err := r.deletePVC(ctx, state.instance, memberToRemoveOrdinal); err != nil {
return fmt.Errorf("failed to delete PVC: %w", err)
}

return nil
}

// TODO!
// nolint:unused
func (r *EtcdClusterReconciler) promoteLearners(ctx context.Context) error {
return fmt.Errorf("not yet implemented")
func (r *EtcdClusterReconciler) createOrUpdateClusterStateConfigMap(ctx context.Context, state *observables, clusterState string, replicas int) error {
cm := factory.TemplateClusterStateConfigMap(state.instance, clusterState, replicas)
if err := ctrl.SetControllerReference(state.instance, cm, r.Scheme); err != nil {
return fmt.Errorf("cannot set controller reference: %w", err)
}
if err := r.patchOrCreateObject(ctx, cm); err != nil {
return fmt.Errorf("failed to create or update cluster state ConfigMap: %w", err)
}
return nil
}

// createOrUpdateStatefulSet ensures the StatefulSet is up-to-date with the desired state.
func (r *EtcdClusterReconciler) createOrUpdateStatefulSet(ctx context.Context, state *observables) error {
if err := factory.CreateOrUpdateStatefulSet(ctx, state.instance, r.Client); err != nil {
return fmt.Errorf("failed to create or update StatefulSet: %w", err)
}
return nil
}

// promoteLearners promotes any learner members in the etcd cluster to full voting members.
func (r *EtcdClusterReconciler) promoteLearners(ctx context.Context, state *observables) error {
// Get a client to the etcd cluster
clusterClient, _, err := factory.NewEtcdClientSet(ctx, state.instance, r.Client)
if err != nil {
return fmt.Errorf("failed to create etcd client: %w", err)
}
defer clusterClient.Close()

// Retrieve the current member list
memberListResp, err := clusterClient.MemberList(ctx)
if err != nil {
return fmt.Errorf("failed to get member list: %w", err)
}

// Promote any learner members to full voting members
for _, member := range memberListResp.Members {
if member.IsLearner {
_, err := clusterClient.MemberPromote(ctx, member.ID)
if err != nil {
return fmt.Errorf("failed to promote member %s: %w", member.Name, err)
}
log.Info(ctx, "Promoted learner to member", "member", member.Name)
}
}

return nil
}

// deletePVC deletes the PersistentVolumeClaim for a given member ordinal.
func (r *EtcdClusterReconciler) deletePVC(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster, ordinal int) error {
pvcName := fmt.Sprintf("%s-%s-%d", factory.GetPVCName(cluster), cluster.Name, ordinal)
pvc := &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: pvcName,
Namespace: cluster.Namespace,
},
}
if err := r.Client.Delete(ctx, pvc); err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("failed to delete PVC %s: %w", pvcName, err)
}
return nil
}
Loading
Loading