Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions controllers/add_process_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type addProcessGroups struct{}

// reconcile runs the reconciler's work.
func (a addProcessGroups) reconcile(
ctx context.Context,
_ context.Context,
r *FoundationDBClusterReconciler,
cluster *fdbv1beta2.FoundationDBCluster,
status *fdbv1beta2.FoundationDBStatus,
Expand Down Expand Up @@ -64,7 +64,6 @@ func (a addProcessGroups) reconcile(
logger.Error(err, "Error getting exclusion list")
}

hasNewProcessGroups := false
for _, processClass := range fdbv1beta2.ProcessClasses {
desiredCount := desiredCounts[processClass]
if desiredCount < 0 {
Expand All @@ -80,7 +79,6 @@ func (a addProcessGroups) reconcile(
processGroupIDs[processClass] = map[int]bool{}
}

hasNewProcessGroups = true
logger.Info(
"Adding new Process Groups",
"processClass",
Expand Down Expand Up @@ -120,13 +118,6 @@ func (a addProcessGroups) reconcile(
}
}

if hasNewProcessGroups {
err = r.updateOrApply(ctx, cluster)
if err != nil {
return &requeue{curError: err}
}
}

if getLocalitiesErr != nil {
return &requeue{curError: getLocalitiesErr, delayedRequeue: true}
}
Expand Down
2 changes: 0 additions & 2 deletions controllers/add_process_groups_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ var _ = Describe("add_process_groups", func() {
Expect(requeue.curError).NotTo(HaveOccurred())
}

_, err = reloadCluster(cluster)
Expect(err).NotTo(HaveOccurred())
newProcessCounts = fdbv1beta2.CreateProcessCountsFromProcessGroupStatus(
cluster.Status.ProcessGroups,
true,
Expand Down
2 changes: 1 addition & 1 deletion controllers/backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (r *FoundationDBBackupReconciler) Reconcile(
continue
}

return processRequeue(req, subReconciler, backup, r.Recorder, backupLog)
return processResult(processRequeue(req, subReconciler, backup, r.Recorder, backupLog))
}

if backup.Status.Generations.Reconciled < originalGeneration {
Expand Down
7 changes: 1 addition & 6 deletions controllers/change_coordinators.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type changeCoordinators struct{}

// reconcile runs the reconciler's work.
func (c changeCoordinators) reconcile(
ctx context.Context,
_ context.Context,
r *FoundationDBClusterReconciler,
cluster *fdbv1beta2.FoundationDBCluster,
status *fdbv1beta2.FoundationDBStatus,
Expand Down Expand Up @@ -130,10 +130,5 @@ func (c changeCoordinators) reconcile(
// Reset the SecondsSinceLastRecovered sine the operator just changed the coordinators, which will cause a recovery.
status.Cluster.RecoveryState.SecondsSinceLastRecovered = 0.0

err = r.updateOrApply(ctx, cluster)
if err != nil {
return &requeue{curError: err, delayedRequeue: true}
}

return nil
}
6 changes: 1 addition & 5 deletions controllers/choose_removals.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type chooseRemovals struct{}

// reconcile runs the reconciler's work.
func (c chooseRemovals) reconcile(
ctx context.Context,
_ context.Context,
r *FoundationDBClusterReconciler,
cluster *fdbv1beta2.FoundationDBCluster,
status *fdbv1beta2.FoundationDBStatus,
Expand Down Expand Up @@ -151,10 +151,6 @@ func (c chooseRemovals) reconcile(
processGroup.MarkForRemoval()
}
}
err := r.updateOrApply(ctx, cluster)
if err != nil {
return &requeue{curError: err, delayedRequeue: true}
}
}

return nil
Expand Down
4 changes: 0 additions & 4 deletions controllers/choose_removals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
var _ = Describe("choose_removals", func() {
var cluster *fdbv1beta2.FoundationDBCluster
var adminClient *mock.AdminClient
var err error
var requeue *requeue
var removals []fdbv1beta2.ProcessGroupID

Expand Down Expand Up @@ -63,9 +62,6 @@ var _ = Describe("choose_removals", func() {
nil,
globalControllerLogger,
)
Expect(err).NotTo(HaveOccurred())
_, err = reloadCluster(cluster)
Expect(err).NotTo(HaveOccurred())

removals = nil
for _, processGroup := range cluster.Status.ProcessGroups {
Expand Down
54 changes: 41 additions & 13 deletions controllers/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"syscall"
"time"

"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/utils/ptr"

"github.com/FoundationDB/fdb-kubernetes-operator/v2/pkg/fdbadminclient"
Expand Down Expand Up @@ -180,15 +181,23 @@ func (r *FoundationDBClusterReconciler) Reconcile(
ctx context.Context,
request ctrl.Request,
) (ctrl.Result, error) {
return processResult(r.doReconcile(ctx, request))
}

// doReconcile will run the reconcile logic for the FoundationDBClusterReconciler.
func (r *FoundationDBClusterReconciler) doReconcile(
ctx context.Context,
request ctrl.Request,
) (*ctrl.Result, error) {
cluster := &fdbv1beta2.FoundationDBCluster{}

err := r.Get(ctx, request.NamespacedName, cluster)
if err != nil {
if k8serrors.IsNotFound(err) {
return ctrl.Result{}, nil
return nil, nil
}
// Error reading the object - requeue the request.
return ctrl.Result{}, err
return nil, err
}

clusterLog := globalControllerLogger.WithValues(
Expand All @@ -202,9 +211,28 @@ func (r *FoundationDBClusterReconciler) Reconcile(
cacheStatus := cluster.CacheDatabaseStatusForReconciliation(
r.CacheDatabaseStatusForReconciliationDefault,
)
// Printout the duration of the reconciliation, independent if the reconciliation was successful or had an error.

originalStatus := cluster.Status.DeepCopy()
startTime := time.Now()
result := &ctrl.Result{}
defer func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any scenario where we don't want to do the update? like if there is an error on a subreconciler?

// If the cluster.Status has changed compared to the original Status, we have to update the status.
// See: https://github.com/kubernetes-sigs/kubebuilder/issues/592
// If we use the default reflect.DeepEqual method it will be recreating the clusterStatus multiple times
// because the pointers are different.
if !equality.Semantic.DeepEqual(cluster.Status, *originalStatus) {
clusterLog.Info("cluster status was changed, will be updating the cluster status")
err = r.updateOrApply(ctx, cluster)
if err != nil {
clusterLog.Error(err, "Error updating cluster clusterStatus")
// If no requeue is planned, we should target a requeue to ensure that the status will be updated.
if result.IsZero() || result.RequeueAfter == 0 {
result.RequeueAfter = 2 * time.Second
}
}
}

// Printout the duration of the reconciliation, independent if the reconciliation was successful or had an error.
clusterLog.Info(
"Reconciliation run finished",
"duration_seconds",
Expand All @@ -217,33 +245,33 @@ func (r *FoundationDBClusterReconciler) Reconcile(
if cluster.Spec.Skip {
clusterLog.Info("Skipping cluster with skip value true", "skip", cluster.Spec.Skip)
// Don't requeue
return ctrl.Result{}, nil
return nil, nil
}

err = internal.NormalizeClusterSpec(cluster, r.DeprecationOptions)
if err != nil {
return ctrl.Result{}, err
return nil, nil
}

err = cluster.Validate()
if err != nil {
r.Recorder.Event(cluster, corev1.EventTypeWarning, "ClusterSpec not valid", err.Error())
return ctrl.Result{}, fmt.Errorf("ClusterSpec is not valid: %w", err)
return nil, fmt.Errorf("ClusterSpec is not valid: %w", err)
}

adminClient, err := r.getAdminClient(clusterLog, cluster)
if err != nil {
return ctrl.Result{}, err
return nil, err
}
defer func() {
_ = adminClient.Close()
}()
supportedVersion, err := adminClient.VersionSupported(cluster.Spec.Version)
if err != nil {
return ctrl.Result{}, err
return nil, err
}
if !supportedVersion {
return ctrl.Result{}, fmt.Errorf("version %s is not supported", cluster.Spec.Version)
return nil, fmt.Errorf("version %s is not supported", cluster.Spec.Version)
}

// When using DNS entries in the cluster file, we want to make sure to create pods if required before doing any
Expand Down Expand Up @@ -321,13 +349,13 @@ func (r *FoundationDBClusterReconciler) Reconcile(
process, err := os.FindProcess(os.Getpid())
if err != nil {
fmt.Printf("Error finding process: %v\n", err)
return ctrl.Result{RequeueAfter: 5 * time.Second}, err
return &ctrl.Result{RequeueAfter: 5 * time.Second}, err
}

err = process.Signal(syscall.SIGTERM)
if err != nil {
fmt.Printf("Error sending signal: %v\n", err)
return ctrl.Result{RequeueAfter: 5 * time.Second}, err
return &ctrl.Result{RequeueAfter: 5 * time.Second}, err
}
}
}
Expand Down Expand Up @@ -384,7 +412,7 @@ func (r *FoundationDBClusterReconciler) Reconcile(
delayedRequeueDuration = 2 * time.Second
}

return ctrl.Result{RequeueAfter: delayedRequeueDuration}, nil
return &ctrl.Result{RequeueAfter: delayedRequeueDuration}, nil
}

clusterLog.Info("Reconciliation complete", "generation", cluster.Status.Generations.Reconciled)
Expand All @@ -395,7 +423,7 @@ func (r *FoundationDBClusterReconciler) Reconcile(
fmt.Sprintf("Reconciled generation %d", cluster.Status.Generations.Reconciled),
)

return ctrl.Result{}, nil
return result, nil
}

// runClusterSubReconciler will start the subReconciler and will log the duration of the subReconciler.
Expand Down
20 changes: 17 additions & 3 deletions controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func processRequeue(
object runtime.Object,
recorder record.EventRecorder,
logger logr.Logger,
) (ctrl.Result, error) {
) (*ctrl.Result, error) {
curLog := logger.WithValues(
"reconciler",
fmt.Sprintf("%T", subReconciler),
Expand All @@ -90,9 +90,23 @@ func processRequeue(
recorder.Event(object, corev1.EventTypeNormal, "ReconciliationTerminatedEarly", requeue.message)
if err != nil {
curLog.Error(err, "Error in reconciliation")
return ctrl.Result{}, err
return nil, err
}
curLog.Info("Reconciliation terminated early", "message", requeue.message)

return ctrl.Result{RequeueAfter: requeue.delay}, nil
return &ctrl.Result{RequeueAfter: requeue.delay}, nil
}

// processResult will return a ctrl.Result and error based on the input, e.g. if the result is nil, it will ensure
// that an empty ctrl.Result is returned.
func processResult(result *ctrl.Result, err error) (ctrl.Result, error) {
if err != nil {
return ctrl.Result{}, err
}

if result.IsZero() {
return ctrl.Result{}, nil
}

return *result, nil
}
7 changes: 1 addition & 6 deletions controllers/exclude_processes.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type excludeEntry struct {

// reconcile runs the reconciler's work.
func (e excludeProcesses) reconcile(
ctx context.Context,
_ context.Context,
r *FoundationDBClusterReconciler,
cluster *fdbv1beta2.FoundationDBCluster,
status *fdbv1beta2.FoundationDBStatus,
Expand Down Expand Up @@ -336,11 +336,6 @@ func (e excludeProcesses) reconcile(
if coordinatorErr != nil {
return &requeue{curError: coordinatorErr, delayedRequeue: true}
}

err = r.updateOrApply(ctx, cluster)
if err != nil {
return &requeue{curError: err, delayedRequeue: true}
}
}

// If not all processes are excluded, ensure we requeue after 5 minutes.
Expand Down
10 changes: 0 additions & 10 deletions controllers/exclude_processes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1730,8 +1730,6 @@ var _ = Describe("exclude_processes", func() {
Expect(req).To(BeNil())
Expect(adminClient.ExcludedAddresses).To(HaveLen(1))

_, err = reloadCluster(cluster)
Expect(err).NotTo(HaveOccurred())
Expect(initialConnectionString).NotTo(Equal(cluster.Status.ConnectionString))
})

Expand All @@ -1749,8 +1747,6 @@ var _ = Describe("exclude_processes", func() {
Expect(req).To(BeNil())
Expect(adminClient.ExcludedAddresses).To(HaveLen(1))

_, err = reloadCluster(cluster)
Expect(err).NotTo(HaveOccurred())
Expect(
initialConnectionString,
).NotTo(Equal(cluster.Status.ConnectionString))
Expand Down Expand Up @@ -1918,9 +1914,6 @@ var _ = Describe("exclude_processes", func() {

Expect(req).To(BeNil())
Expect(adminClient.ExcludedAddresses).To(HaveLen(1))

_, err = reloadCluster(cluster)
Expect(err).NotTo(HaveOccurred())
Expect(initialConnectionString).NotTo(Equal(cluster.Status.ConnectionString))
})

Expand All @@ -1937,9 +1930,6 @@ var _ = Describe("exclude_processes", func() {

Expect(req).To(BeNil())
Expect(adminClient.ExcludedAddresses).To(HaveLen(1))

_, err = reloadCluster(cluster)
Expect(err).NotTo(HaveOccurred())
Expect(
initialConnectionString,
).NotTo(Equal(cluster.Status.ConnectionString))
Expand Down
4 changes: 0 additions & 4 deletions controllers/generate_initial_cluster_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,6 @@ func (g generateInitialClusterFile) reconcile(
}

cluster.Status.ConnectionString = connectionString.String()
err = r.updateOrApply(ctx, cluster)
if err != nil {
return &requeue{curError: err}
}

return nil
}
Loading
Loading