Skip to content

Commit 43ea5c7

Browse files
committed
Feature: Deferred TGB queue for no-op reconciles
1 parent 0700e85 commit 43ea5c7

File tree

14 files changed

+759
-74
lines changed

14 files changed

+759
-74
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ all: controller
3232

3333
# Run tests
3434
test: generate fmt vet manifests helm-lint
35-
go test -race ./pkg/... ./webhooks/... -coverprofile cover.out
35+
go test -race ./pkg/... ./webhooks/... ./controllers/... -coverprofile cover.out
3636

3737
# Build controller binary
3838
controller: generate fmt vet

controllers/elbv2/targetgroupbinding_controller.go

Lines changed: 53 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@ package controllers
1919
import (
2020
"context"
2121
"fmt"
22+
discv1 "k8s.io/api/discovery/v1"
23+
"sigs.k8s.io/controller-runtime/pkg/handler"
2224
"time"
2325

2426
"github.com/aws/aws-sdk-go-v2/aws"
2527
"github.com/pkg/errors"
2628
corev1 "k8s.io/api/core/v1"
27-
discv1 "k8s.io/api/discovery/v1"
2829
"k8s.io/client-go/tools/record"
2930
"k8s.io/client-go/util/workqueue"
3031
"sigs.k8s.io/aws-load-balancer-controller/controllers/elbv2/eventhandlers"
@@ -48,15 +49,16 @@ const (
4849

4950
// NewTargetGroupBindingReconciler constructs new targetGroupBindingReconciler
5051
func NewTargetGroupBindingReconciler(k8sClient client.Client, eventRecorder record.EventRecorder, finalizerManager k8s.FinalizerManager,
51-
tgbResourceManager targetgroupbinding.ResourceManager, config config.ControllerConfig,
52+
tgbResourceManager targetgroupbinding.ResourceManager, config config.ControllerConfig, deferredTargetGroupBindingReconciler DeferredTargetGroupBindingReconciler,
5253
logger logr.Logger) *targetGroupBindingReconciler {
5354

5455
return &targetGroupBindingReconciler{
55-
k8sClient: k8sClient,
56-
eventRecorder: eventRecorder,
57-
finalizerManager: finalizerManager,
58-
tgbResourceManager: tgbResourceManager,
59-
logger: logger,
56+
k8sClient: k8sClient,
57+
eventRecorder: eventRecorder,
58+
finalizerManager: finalizerManager,
59+
tgbResourceManager: tgbResourceManager,
60+
deferredTargetGroupBindingReconciler: deferredTargetGroupBindingReconciler,
61+
logger: logger,
6062

6163
maxConcurrentReconciles: config.TargetGroupBindingMaxConcurrentReconciles,
6264
maxExponentialBackoffDelay: config.TargetGroupBindingMaxExponentialBackoffDelay,
@@ -66,11 +68,12 @@ func NewTargetGroupBindingReconciler(k8sClient client.Client, eventRecorder reco
6668

6769
// targetGroupBindingReconciler reconciles a TargetGroupBinding object
6870
type targetGroupBindingReconciler struct {
69-
k8sClient client.Client
70-
eventRecorder record.EventRecorder
71-
finalizerManager k8s.FinalizerManager
72-
tgbResourceManager targetgroupbinding.ResourceManager
73-
logger logr.Logger
71+
k8sClient client.Client
72+
eventRecorder record.EventRecorder
73+
finalizerManager k8s.FinalizerManager
74+
tgbResourceManager targetgroupbinding.ResourceManager
75+
deferredTargetGroupBindingReconciler DeferredTargetGroupBindingReconciler
76+
logger logr.Logger
7477

7578
maxConcurrentReconciles int
7679
maxExponentialBackoffDelay time.Duration
@@ -110,11 +113,18 @@ func (r *targetGroupBindingReconciler) reconcileTargetGroupBinding(ctx context.C
110113
return err
111114
}
112115

113-
if err := r.tgbResourceManager.Reconcile(ctx, tgb); err != nil {
116+
checkPoint, deferred, err := r.tgbResourceManager.Reconcile(ctx, tgb)
117+
118+
if err != nil {
114119
return err
115120
}
116121

117-
if err := r.updateTargetGroupBindingStatus(ctx, tgb); err != nil {
122+
if deferred {
123+
r.deferredTargetGroupBindingReconciler.Enqueue(tgb)
124+
return nil
125+
}
126+
127+
if err := r.updateTargetGroupBindingStatus(ctx, tgb, checkPoint); err != nil {
118128
r.eventRecorder.Event(tgb, corev1.EventTypeWarning, k8s.TargetGroupBindingEventReasonFailedUpdateStatus, fmt.Sprintf("Failed update status due to %v", err))
119129
return err
120130
}
@@ -137,15 +147,24 @@ func (r *targetGroupBindingReconciler) cleanupTargetGroupBinding(ctx context.Con
137147
return nil
138148
}
139149

140-
func (r *targetGroupBindingReconciler) updateTargetGroupBindingStatus(ctx context.Context, tgb *elbv2api.TargetGroupBinding) error {
141-
if aws.ToInt64(tgb.Status.ObservedGeneration) == tgb.Generation {
150+
func (r *targetGroupBindingReconciler) updateTargetGroupBindingStatus(ctx context.Context, tgb *elbv2api.TargetGroupBinding, newCheckPoint string) error {
151+
savedCheckPoint := targetgroupbinding.GetTGBReconcileCheckpoint(tgb)
152+
if aws.ToInt64(tgb.Status.ObservedGeneration) == tgb.Generation && savedCheckPoint == newCheckPoint {
142153
return nil
143154
}
155+
144156
tgbOld := tgb.DeepCopy()
157+
targetgroupbinding.SaveTGBReconcileCheckpoint(tgb, newCheckPoint)
158+
159+
if err := r.k8sClient.Patch(ctx, tgb, client.MergeFrom(tgbOld)); err != nil {
160+
return errors.Wrapf(err, "failed to update targetGroupBinding checkpoint: %v", k8s.NamespacedName(tgb))
161+
}
162+
145163
tgb.Status.ObservedGeneration = aws.Int64(tgb.Generation)
146164
if err := r.k8sClient.Status().Patch(ctx, tgb, client.MergeFrom(tgbOld)); err != nil {
147165
return errors.Wrapf(err, "failed to update targetGroupBinding status: %v", k8s.NamespacedName(tgb))
148166
}
167+
149168
return nil
150169
}
151170

@@ -159,34 +178,29 @@ func (r *targetGroupBindingReconciler) SetupWithManager(ctx context.Context, mgr
159178
nodeEventsHandler := eventhandlers.NewEnqueueRequestsForNodeEvent(r.k8sClient,
160179
r.logger.WithName("eventHandlers").WithName("node"))
161180

162-
// Use the config flag to decide whether to use and watch an Endpoints event handler or an EndpointSlices event handler
181+
var eventHandler handler.EventHandler
182+
var clientObj client.Object
183+
163184
if r.enableEndpointSlices {
164-
epSliceEventsHandler := eventhandlers.NewEnqueueRequestsForEndpointSlicesEvent(r.k8sClient,
185+
clientObj = &discv1.EndpointSlice{}
186+
eventHandler = eventhandlers.NewEnqueueRequestsForEndpointSlicesEvent(r.k8sClient,
165187
r.logger.WithName("eventHandlers").WithName("endpointslices"))
166-
return ctrl.NewControllerManagedBy(mgr).
167-
For(&elbv2api.TargetGroupBinding{}).
168-
Named(controllerName).
169-
Watches(&corev1.Service{}, svcEventHandler).
170-
Watches(&discv1.EndpointSlice{}, epSliceEventsHandler).
171-
Watches(&corev1.Node{}, nodeEventsHandler).
172-
WithOptions(controller.Options{
173-
MaxConcurrentReconciles: r.maxConcurrentReconciles,
174-
RateLimiter: workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, r.maxExponentialBackoffDelay)}).
175-
Complete(r)
176188
} else {
177-
epsEventsHandler := eventhandlers.NewEnqueueRequestsForEndpointsEvent(r.k8sClient,
189+
clientObj = &corev1.Endpoints{}
190+
eventHandler = eventhandlers.NewEnqueueRequestsForEndpointsEvent(r.k8sClient,
178191
r.logger.WithName("eventHandlers").WithName("endpoints"))
179-
return ctrl.NewControllerManagedBy(mgr).
180-
For(&elbv2api.TargetGroupBinding{}).
181-
Named(controllerName).
182-
Watches(&corev1.Service{}, svcEventHandler).
183-
Watches(&corev1.Endpoints{}, epsEventsHandler).
184-
Watches(&corev1.Node{}, nodeEventsHandler).
185-
WithOptions(controller.Options{
186-
MaxConcurrentReconciles: r.maxConcurrentReconciles,
187-
RateLimiter: workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, r.maxExponentialBackoffDelay)}).
188-
Complete(r)
189192
}
193+
194+
return ctrl.NewControllerManagedBy(mgr).
195+
For(&elbv2api.TargetGroupBinding{}).
196+
Named(controllerName).
197+
Watches(&corev1.Service{}, svcEventHandler).
198+
Watches(clientObj, eventHandler).
199+
Watches(&corev1.Node{}, nodeEventsHandler).
200+
WithOptions(controller.Options{
201+
MaxConcurrentReconciles: r.maxConcurrentReconciles,
202+
RateLimiter: workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, r.maxExponentialBackoffDelay)}).
203+
Complete(r)
190204
}
191205

192206
func (r *targetGroupBindingReconciler) setupIndexes(ctx context.Context, fieldIndexer client.FieldIndexer) error {
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
package controllers
2+
3+
import (
4+
"context"
5+
"github.com/go-logr/logr"
6+
"k8s.io/apimachinery/pkg/types"
7+
"k8s.io/client-go/util/workqueue"
8+
"math/rand"
9+
elbv2api "sigs.k8s.io/aws-load-balancer-controller/apis/elbv2/v1beta1"
10+
"sigs.k8s.io/aws-load-balancer-controller/pkg/k8s"
11+
"sigs.k8s.io/aws-load-balancer-controller/pkg/targetgroupbinding"
12+
"sigs.k8s.io/controller-runtime/pkg/client"
13+
"time"
14+
)
15+
16+
const (
17+
// The time to delay the reconcile. Generally, this number should be large enough so we can perform all reconciles
18+
// that have changes before processing reconciles that have no detected changes.
19+
defaultDelayedReconcileTime = 30 * time.Minute
20+
// The max amount of jitter to add to delayedReconcileTime. This is used to ensure that all deferred TGBs are not
21+
// reconciled together.
22+
defaultMaxJitter = 15 * time.Minute
23+
24+
// The hash to set that is guaranteed to trigger a new reconcile loop (the hash calculation always has an '/')
25+
resetHash = ""
26+
)
27+
28+
type DeferredTargetGroupBindingReconciler interface {
29+
Enqueue(tgb *elbv2api.TargetGroupBinding)
30+
Run()
31+
}
32+
33+
type deferredTargetGroupBindingReconcilerImpl struct {
34+
delayQueue workqueue.DelayingInterface
35+
syncPeriod time.Duration
36+
k8sClient client.Client
37+
logger logr.Logger
38+
39+
delayedReconcileTime time.Duration
40+
maxJitter time.Duration
41+
}
42+
43+
func NewDeferredTargetGroupBindingReconciler(delayQueue workqueue.DelayingInterface, syncPeriod time.Duration, k8sClient client.Client, logger logr.Logger) DeferredTargetGroupBindingReconciler {
44+
return &deferredTargetGroupBindingReconcilerImpl{
45+
syncPeriod: syncPeriod,
46+
logger: logger,
47+
delayQueue: delayQueue,
48+
k8sClient: k8sClient,
49+
50+
delayedReconcileTime: defaultDelayedReconcileTime,
51+
maxJitter: defaultMaxJitter,
52+
}
53+
}
54+
55+
func (d *deferredTargetGroupBindingReconcilerImpl) Enqueue(tgb *elbv2api.TargetGroupBinding) {
56+
nsn := k8s.NamespacedName(tgb)
57+
if d.isEligibleForDefer(tgb) {
58+
d.enqueue(nsn)
59+
d.logger.Info("enqueued new deferred TGB", "TGB", nsn.Name)
60+
}
61+
}
62+
63+
func (d *deferredTargetGroupBindingReconcilerImpl) Run() {
64+
var item interface{}
65+
shutDown := false
66+
for !shutDown {
67+
item, shutDown = d.delayQueue.Get()
68+
if item != nil {
69+
deferredNamespacedName := item.(types.NamespacedName)
70+
d.logger.Info("Processing deferred TGB", "item", deferredNamespacedName)
71+
d.handleDeferredItem(deferredNamespacedName)
72+
d.delayQueue.Done(deferredNamespacedName)
73+
}
74+
}
75+
76+
d.logger.Info("Shutting down deferred TGB queue")
77+
}
78+
79+
func (d *deferredTargetGroupBindingReconcilerImpl) handleDeferredItem(nsn types.NamespacedName) {
80+
tgb := &elbv2api.TargetGroupBinding{}
81+
82+
err := d.k8sClient.Get(context.Background(), nsn, tgb)
83+
84+
if err != nil {
85+
d.handleDeferredItemError(nsn, err, "Failed to get TGB in deferred queue")
86+
return
87+
}
88+
89+
// Re-check that this tgb hasn't been updated since it was enqueued
90+
if !d.isEligibleForDefer(tgb) {
91+
d.logger.Info("TGB not eligible for deferral", "TGB", nsn)
92+
return
93+
}
94+
95+
tgbOld := tgb.DeepCopy()
96+
targetgroupbinding.SaveTGBReconcileCheckpoint(tgb, resetHash)
97+
98+
if err := d.k8sClient.Patch(context.Background(), tgb, client.MergeFrom(tgbOld)); err != nil {
99+
d.handleDeferredItemError(nsn, err, "Failed to reset TGB checkpoint")
100+
return
101+
}
102+
d.logger.Info("TGB checkpoint reset", "TGB", nsn)
103+
}
104+
105+
func (d *deferredTargetGroupBindingReconcilerImpl) handleDeferredItemError(nsn types.NamespacedName, err error, msg string) {
106+
err = client.IgnoreNotFound(err)
107+
if err != nil {
108+
d.logger.Error(err, msg, "TGB", nsn)
109+
d.enqueue(nsn)
110+
}
111+
}
112+
113+
func (d *deferredTargetGroupBindingReconcilerImpl) isEligibleForDefer(tgb *elbv2api.TargetGroupBinding) bool {
114+
then := time.Unix(targetgroupbinding.GetTGBReconcileCheckpointTimestamp(tgb), 0)
115+
return time.Now().Sub(then) > d.syncPeriod
116+
}
117+
118+
func (d *deferredTargetGroupBindingReconcilerImpl) enqueue(nsn types.NamespacedName) {
119+
delayedTime := d.jitterReconcileTime()
120+
d.delayQueue.AddAfter(nsn, delayedTime)
121+
}
122+
123+
func (d *deferredTargetGroupBindingReconcilerImpl) jitterReconcileTime() time.Duration {
124+
125+
if d.maxJitter == 0 {
126+
return d.delayedReconcileTime
127+
}
128+
129+
return d.delayedReconcileTime + time.Duration(rand.Int63n(int64(d.maxJitter)))
130+
}

0 commit comments

Comments
 (0)