@@ -20,13 +20,15 @@ import (
2020 "context"
2121 "fmt"
2222 "strings"
23+ "time"
2324
2425 "github.com/pkg/errors"
2526 corev1 "k8s.io/api/core/v1"
2627 apierrors "k8s.io/apimachinery/pkg/api/errors"
2728 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2829 "k8s.io/apimachinery/pkg/labels"
2930 kerrors "k8s.io/apimachinery/pkg/util/errors"
31+ "k8s.io/apimachinery/pkg/util/wait"
3032 "k8s.io/client-go/tools/record"
3133 "k8s.io/klog/v2"
3234 "k8s.io/utils/ptr"
@@ -303,6 +305,93 @@ func (r *Reconciler) reconcile(ctx context.Context, s *scope) error {
303305 return errors .Errorf ("unexpected deployment strategy type: %s" , md .Spec .Rollout .Strategy .Type )
304306}
305307
308+ // createOrUpdateMachineSetsAndSyncMachineDeploymentRevision applies changes identified by the rolloutPlanner to both newMS and oldMSs.
309+ // Note: Both newMS and oldMS include the full intent for the SSA apply call with mandatory labels,
310+ // in place propagated fields, the annotations derived from the MachineDeployment, revision annotations
311+ // and also annotations influencing how to perform scale up/down operations.
312+ // scaleIntents instead are handled separately in the rolloutPlanner and should be applied to MachineSets
313+ // before persisting changes.
314+ // Note: When the newMS has been created by the rollout planner, also wait for the cache to be up to date.
315+ func (r * Reconciler ) createOrUpdateMachineSetsAndSyncMachineDeploymentRevision (ctx context.Context , p * rolloutPlanner ) error {
316+ log := ctrl .LoggerFrom (ctx )
317+ allMSs := append (p .oldMSs , p .newMS )
318+
319+ for _ , ms := range allMSs {
320+ log = log .WithValues ("MachineSet" , klog .KObj (ms ))
321+ ctx = ctrl .LoggerInto (ctx , log )
322+
323+ originalReplicas := ptr .Deref (ms .Spec .Replicas , 0 )
324+ if scaleIntent , ok := p .scaleIntents [ms .Name ]; ok {
325+ ms .Spec .Replicas = & scaleIntent
326+ }
327+
328+ if ms .GetUID () == "" {
329+ // Create the MachineSet.
330+ if err := ssa .Patch (ctx , r .Client , machineDeploymentManagerName , ms ); err != nil {
331+ r .recorder .Eventf (p .md , corev1 .EventTypeWarning , "FailedCreate" , "Failed to create MachineSet %s: %v" , klog .KObj (ms ), err )
332+ return errors .Wrapf (err , "failed to create new MachineSet %s" , klog .KObj (ms ))
333+ }
334+ log .Info (fmt .Sprintf ("MachineSet created (%s)" , p .createReason ))
335+ r .recorder .Eventf (p .md , corev1 .EventTypeNormal , "SuccessfulCreate" , "Created MachineSet %s with %d replicas" , klog .KObj (ms ), ptr .Deref (ms .Spec .Replicas , 0 ))
336+
337+ // Keep trying to get the MachineSet. This will force the cache to update and prevent any future reconciliation of
338+ // the MachineDeployment to reconcile with an outdated list of MachineSets which could lead to unwanted creation of
339+ // a duplicate MachineSet.
340+ var pollErrors []error
341+ tmpMS := & clusterv1.MachineSet {}
342+ if err := wait .PollUntilContextTimeout (ctx , 100 * time .Millisecond , 10 * time .Second , true , func (ctx context.Context ) (bool , error ) {
343+ if err := r .Client .Get (ctx , client .ObjectKeyFromObject (ms ), tmpMS ); err != nil {
344+ // Do not return error here. Continue to poll even if we hit an error
345+ // so that we avoid existing because of transient errors like network flakes.
346+ // Capture all the errors and return the aggregate error if the poll fails eventually.
347+ pollErrors = append (pollErrors , err )
348+ return false , nil
349+ }
350+ return true , nil
351+ }); err != nil {
352+ return errors .Wrapf (kerrors .NewAggregate (pollErrors ), "failed to get the MachineSet %s after creation" , klog .KObj (ms ))
353+ }
354+
355+ // Report back creation timestamp, because legacy scale func uses it to sort machines.
356+ // TODO(in-place): drop this as soon as handling of MD with paused rollouts is moved into rollout planner (see scale in machinedeployment_sync.go).
357+ ms .CreationTimestamp = tmpMS .CreationTimestamp
358+ continue
359+ }
360+
361+ // Update the MachineSet to propagate in-place mutable fields from the MachineDeployment and/or changes applied by the rollout planner.
362+ originalMS , ok := p .originalMS [ms .Name ]
363+ if ! ok {
364+ return errors .Errorf ("failed to update MachineSet %s, original MS is missing" , klog .KObj (ms ))
365+ }
366+
367+ err := ssa .Patch (ctx , r .Client , machineDeploymentManagerName , ms , ssa.WithCachingProxy {Cache : r .ssaCache , Original : originalMS })
368+ if err != nil {
369+ r .recorder .Eventf (p .md , corev1 .EventTypeWarning , "FailedUpdate" , "Failed to update MachineSet %s: %v" , klog .KObj (ms ), err )
370+ return errors .Wrapf (err , "failed to update MachineSet %s" , klog .KObj (ms ))
371+ }
372+
373+ newReplicas := ptr .Deref (ms .Spec .Replicas , 0 )
374+ if newReplicas < originalReplicas {
375+ log .Info (fmt .Sprintf ("Scaled down MachineSet %s to %d replicas (-%d)" , ms .Name , newReplicas , originalReplicas - newReplicas ))
376+ r .recorder .Eventf (p .md , corev1 .EventTypeNormal , "SuccessfulScale" , "Scaled down MachineSet %v: %d -> %d" , ms .Name , originalReplicas , newReplicas )
377+ }
378+ if newReplicas > originalReplicas {
379+ log .Info (fmt .Sprintf ("Scaled up MachineSet %s to %d replicas (+%d)" , ms .Name , newReplicas , newReplicas - originalReplicas ))
380+ r .recorder .Eventf (p .md , corev1 .EventTypeNormal , "SuccessfulScale" , "Scaled up MachineSet %v: %d -> %d" , ms .Name , originalReplicas , newReplicas )
381+ }
382+ }
383+
384+ // Surface the revision annotation on the MD level
385+ if p .md .Annotations == nil {
386+ p .md .Annotations = make (map [string ]string )
387+ }
388+ if p .md .Annotations [clusterv1 .RevisionAnnotation ] != p .revision {
389+ p .md .Annotations [clusterv1 .RevisionAnnotation ] = p .revision
390+ }
391+
392+ return nil
393+ }
394+
306395func (r * Reconciler ) reconcileDelete (ctx context.Context , s * scope ) error {
307396 log := ctrl .LoggerFrom (ctx )
308397 if err := r .getAndAdoptMachineSetsForDeployment (ctx , s ); err != nil {
0 commit comments