Skip to content

Commit 37eb983

Browse files
committed
account for pods that go unready and revive themself
1 parent 43ea5c7 commit 37eb983

File tree

3 files changed

+110
-59
lines changed

3 files changed

+110
-59
lines changed

controllers/elbv2/targetgroupbinding_controller.go

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ func (r *targetGroupBindingReconciler) reconcileTargetGroupBinding(ctx context.C
113113
return err
114114
}
115115

116-
checkPoint, deferred, err := r.tgbResourceManager.Reconcile(ctx, tgb)
116+
deferred, err := r.tgbResourceManager.Reconcile(ctx, tgb)
117117

118118
if err != nil {
119119
return err
@@ -124,7 +124,7 @@ func (r *targetGroupBindingReconciler) reconcileTargetGroupBinding(ctx context.C
124124
return nil
125125
}
126126

127-
if err := r.updateTargetGroupBindingStatus(ctx, tgb, checkPoint); err != nil {
127+
if err := r.updateTargetGroupBindingStatus(ctx, tgb); err != nil {
128128
r.eventRecorder.Event(tgb, corev1.EventTypeWarning, k8s.TargetGroupBindingEventReasonFailedUpdateStatus, fmt.Sprintf("Failed update status due to %v", err))
129129
return err
130130
}
@@ -147,18 +147,12 @@ func (r *targetGroupBindingReconciler) cleanupTargetGroupBinding(ctx context.Con
147147
return nil
148148
}
149149

150-
func (r *targetGroupBindingReconciler) updateTargetGroupBindingStatus(ctx context.Context, tgb *elbv2api.TargetGroupBinding, newCheckPoint string) error {
151-
savedCheckPoint := targetgroupbinding.GetTGBReconcileCheckpoint(tgb)
152-
if aws.ToInt64(tgb.Status.ObservedGeneration) == tgb.Generation && savedCheckPoint == newCheckPoint {
150+
func (r *targetGroupBindingReconciler) updateTargetGroupBindingStatus(ctx context.Context, tgb *elbv2api.TargetGroupBinding) error {
151+
if aws.ToInt64(tgb.Status.ObservedGeneration) == tgb.Generation {
153152
return nil
154153
}
155154

156155
tgbOld := tgb.DeepCopy()
157-
targetgroupbinding.SaveTGBReconcileCheckpoint(tgb, newCheckPoint)
158-
159-
if err := r.k8sClient.Patch(ctx, tgb, client.MergeFrom(tgbOld)); err != nil {
160-
return errors.Wrapf(err, "failed to update targetGroupBinding checkpoint: %v", k8s.NamespacedName(tgb))
161-
}
162156

163157
tgb.Status.ObservedGeneration = aws.Int64(tgb.Generation)
164158
if err := r.k8sClient.Status().Patch(ctx, tgb, client.MergeFrom(tgbOld)); err != nil {

pkg/backend/endpoint_resolver.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,8 +170,10 @@ func (r *defaultEndpointResolver) resolvePodEndpointsWithEndpointsData(ctx conte
170170
containsPotentialReadyEndpoints = true
171171
continue
172172
}
173+
173174
podEndpoint := buildPodEndpoint(pod, epAddr, epPort)
174-
if ep.Conditions.Ready != nil && *ep.Conditions.Ready {
175+
// Recommendation from Kubernetes is to consider unknown ready status as ready (ready == nil)
176+
if ep.Conditions.Ready == nil || *ep.Conditions.Ready {
175177
readyPodEndpoints = append(readyPodEndpoints, podEndpoint)
176178
continue
177179
}

pkg/targetgroupbinding/resource_manager.go

Lines changed: 103 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ import (
2626
"sigs.k8s.io/controller-runtime/pkg/client"
2727
)
2828

29-
const defaultTargetHealthRequeueDuration = 15 * time.Second
29+
const defaultRequeueDuration = 15 * time.Second
3030

3131
// ResourceManager manages the TargetGroupBinding resource.
3232
type ResourceManager interface {
33-
Reconcile(ctx context.Context, tgb *elbv2api.TargetGroupBinding) (string, bool, error)
33+
Reconcile(ctx context.Context, tgb *elbv2api.TargetGroupBinding) (bool, error)
3434
Cleanup(ctx context.Context, tgb *elbv2api.TargetGroupBinding) error
3535
}
3636

@@ -60,7 +60,7 @@ func NewDefaultResourceManager(k8sClient client.Client, elbv2Client services.ELB
6060
vpcInfoProvider: vpcInfoProvider,
6161
podInfoRepo: podInfoRepo,
6262

63-
targetHealthRequeueDuration: defaultTargetHealthRequeueDuration,
63+
requeueDuration: defaultRequeueDuration,
6464
}
6565
}
6666

@@ -78,17 +78,34 @@ type defaultResourceManager struct {
7878
podInfoRepo k8s.PodInfoRepo
7979
vpcID string
8080

81-
targetHealthRequeueDuration time.Duration
81+
requeueDuration time.Duration
8282
}
8383

84-
func (m *defaultResourceManager) Reconcile(ctx context.Context, tgb *elbv2api.TargetGroupBinding) (string, bool, error) {
84+
func (m *defaultResourceManager) Reconcile(ctx context.Context, tgb *elbv2api.TargetGroupBinding) (bool, error) {
8585
if tgb.Spec.TargetType == nil {
86-
return "", false, errors.Errorf("targetType is not specified: %v", k8s.NamespacedName(tgb).String())
86+
return false, errors.Errorf("targetType is not specified: %v", k8s.NamespacedName(tgb).String())
8787
}
88+
89+
var newCheckPoint string
90+
var oldCheckPoint string
91+
var isDeferred bool
92+
var err error
93+
8894
if *tgb.Spec.TargetType == elbv2api.TargetTypeIP {
89-
return m.reconcileWithIPTargetType(ctx, tgb)
95+
newCheckPoint, oldCheckPoint, isDeferred, err = m.reconcileWithIPTargetType(ctx, tgb)
96+
} else {
97+
newCheckPoint, oldCheckPoint, isDeferred, err = m.reconcileWithInstanceTargetType(ctx, tgb)
98+
}
99+
100+
if err != nil {
101+
return false, err
90102
}
91-
return m.reconcileWithInstanceTargetType(ctx, tgb)
103+
104+
if isDeferred {
105+
return true, nil
106+
}
107+
108+
return false, m.updateTGBCheckPoint(ctx, tgb, newCheckPoint, oldCheckPoint)
92109
}
93110

94111
func (m *defaultResourceManager) Cleanup(ctx context.Context, tgb *elbv2api.TargetGroupBinding) error {
@@ -104,7 +121,7 @@ func (m *defaultResourceManager) Cleanup(ctx context.Context, tgb *elbv2api.Targ
104121
return nil
105122
}
106123

107-
func (m *defaultResourceManager) reconcileWithIPTargetType(ctx context.Context, tgb *elbv2api.TargetGroupBinding) (string, bool, error) {
124+
func (m *defaultResourceManager) reconcileWithIPTargetType(ctx context.Context, tgb *elbv2api.TargetGroupBinding) (string, string, bool, error) {
108125
svcKey := buildServiceReferenceKey(tgb, tgb.Spec.ServiceRef)
109126

110127
targetHealthCondType := BuildTargetHealthPodConditionType(tgb)
@@ -121,128 +138,152 @@ func (m *defaultResourceManager) reconcileWithIPTargetType(ctx context.Context,
121138
if err != nil {
122139
if errors.Is(err, backend.ErrNotFound) {
123140
m.eventRecorder.Event(tgb, corev1.EventTypeWarning, k8s.TargetGroupBindingEventReasonBackendNotFound, err.Error())
124-
return "", false, m.Cleanup(ctx, tgb)
141+
return "", "", false, m.Cleanup(ctx, tgb)
125142
}
126-
return "", false, err
143+
return "", "", false, err
127144
}
128145

129-
currentCheckPoint, err := calculateTGBReconcileCheckpoint(endpoints, tgb)
146+
newCheckPoint, err := calculateTGBReconcileCheckpoint(endpoints, tgb)
130147

131148
if err != nil {
132-
return "", false, err
149+
return "", "", false, err
133150
}
134151

135-
savedCheckPoint := GetTGBReconcileCheckpoint(tgb)
152+
oldCheckPoint := GetTGBReconcileCheckpoint(tgb)
136153

137-
if currentCheckPoint == savedCheckPoint {
138-
m.logger.Info("Skipping targetgroupbinding reconcile", "TGB", k8s.NamespacedName(tgb), "calculated hash", currentCheckPoint)
139-
return currentCheckPoint, true, nil
154+
if !containsPotentialReadyEndpoints && oldCheckPoint == newCheckPoint {
155+
m.logger.Info("Skipping targetgroupbinding reconcile", "TGB", k8s.NamespacedName(tgb), "calculated hash", newCheckPoint)
156+
return newCheckPoint, oldCheckPoint, true, nil
140157
}
141158

142159
tgARN := tgb.Spec.TargetGroupARN
143160
vpcID := tgb.Spec.VpcID
144161
targets, err := m.targetsManager.ListTargets(ctx, tgARN)
145162
if err != nil {
146-
return "", false, err
163+
return "", "", false, err
147164
}
148-
notDrainingTargets, drainingTargets := partitionTargetsByDrainingStatus(targets)
165+
notDrainingTargets, _ := partitionTargetsByDrainingStatus(targets)
149166
matchedEndpointAndTargets, unmatchedEndpoints, unmatchedTargets := matchPodEndpointWithTargets(endpoints, notDrainingTargets)
150167

151168
needNetworkingRequeue := false
152169
if err := m.networkingManager.ReconcileForPodEndpoints(ctx, tgb, endpoints); err != nil {
170+
m.logger.Error(err, "Requesting network requeue due to error from ReconcileForPodEndpoints")
153171
m.eventRecorder.Event(tgb, corev1.EventTypeWarning, k8s.TargetGroupBindingEventReasonFailedNetworkReconcile, err.Error())
154172
needNetworkingRequeue = true
155173
}
174+
175+
if len(unmatchedEndpoints) > 0 || len(unmatchedTargets) > 0 || needNetworkingRequeue {
176+
// Set to an empty checkpoint, to ensure that no matter what we try to reconcile atleast one more time.
177+
// Consider this ordering of events (without using this method of overriding the checkpoint)
178+
// 1. Register some pod IP, don't update TGB checkpoint.
179+
// 2. Before next invocation of reconcile happens, the pod is removed.
180+
// 3. The next reconcile loop has no knowledge that it needs to deregister the pod ip, therefore it skips deregistering the removed pod ip.
181+
err = m.updateTGBCheckPoint(ctx, tgb, "", oldCheckPoint)
182+
if err != nil {
183+
m.logger.Error(err, "Unable to update checkpoint before mutating change")
184+
return "", "", false, err
185+
}
186+
}
187+
156188
if len(unmatchedTargets) > 0 {
157189
if err := m.deregisterTargets(ctx, tgARN, unmatchedTargets); err != nil {
158-
return "", false, err
190+
return "", "", false, err
159191
}
160192
}
161193
if len(unmatchedEndpoints) > 0 {
162194
if err := m.registerPodEndpoints(ctx, tgARN, vpcID, unmatchedEndpoints); err != nil {
163-
return "", false, err
195+
return "", "", false, err
164196
}
165197
}
166198

167199
anyPodNeedFurtherProbe, err := m.updateTargetHealthPodCondition(ctx, targetHealthCondType, matchedEndpointAndTargets, unmatchedEndpoints)
168200
if err != nil {
169-
return "", false, err
201+
return "", "", false, err
170202
}
171203

172204
if anyPodNeedFurtherProbe {
173-
if containsTargetsInInitialState(matchedEndpointAndTargets) || len(unmatchedEndpoints) != 0 {
174-
return "", false, runtime.NewRequeueNeededAfter("monitor targetHealth", m.targetHealthRequeueDuration)
175-
}
176-
return "", false, runtime.NewRequeueNeeded("monitor targetHealth")
205+
m.logger.Info("Requeue for target monitor target health")
206+
return "", "", false, runtime.NewRequeueNeededAfter("monitor targetHealth", m.requeueDuration)
177207
}
178208

179209
if containsPotentialReadyEndpoints {
180-
return "", false, runtime.NewRequeueNeeded("monitor potential ready endpoints")
210+
m.logger.Info("Requeue for potentially ready endpoints")
211+
return "", "", false, runtime.NewRequeueNeededAfter("monitor potential ready endpoints", m.requeueDuration)
181212
}
182213

183-
_ = drainingTargets
184-
185214
if needNetworkingRequeue {
186-
return "", false, runtime.NewRequeueNeeded("networking reconciliation")
215+
m.logger.Info("Requeue for networking requeue")
216+
return "", "", false, runtime.NewRequeueNeededAfter("networking reconciliation", m.requeueDuration)
187217
}
188218

189-
return currentCheckPoint, false, nil
219+
m.logger.Info("Successful reconcile", "checkpoint", newCheckPoint, "TGB", k8s.NamespacedName(tgb))
220+
return newCheckPoint, oldCheckPoint, false, nil
190221
}
191222

192-
func (m *defaultResourceManager) reconcileWithInstanceTargetType(ctx context.Context, tgb *elbv2api.TargetGroupBinding) (string, bool, error) {
223+
func (m *defaultResourceManager) reconcileWithInstanceTargetType(ctx context.Context, tgb *elbv2api.TargetGroupBinding) (string, string, bool, error) {
193224
svcKey := buildServiceReferenceKey(tgb, tgb.Spec.ServiceRef)
194225
nodeSelector, err := backend.GetTrafficProxyNodeSelector(tgb)
195226
if err != nil {
196-
return "", false, err
227+
return "", "", false, err
197228
}
198229

199230
resolveOpts := []backend.EndpointResolveOption{backend.WithNodeSelector(nodeSelector)}
200231
endpoints, err := m.endpointResolver.ResolveNodePortEndpoints(ctx, svcKey, tgb.Spec.ServiceRef.Port, resolveOpts...)
201232
if err != nil {
202233
if errors.Is(err, backend.ErrNotFound) {
203234
m.eventRecorder.Event(tgb, corev1.EventTypeWarning, k8s.TargetGroupBindingEventReasonBackendNotFound, err.Error())
204-
return "", false, m.Cleanup(ctx, tgb)
235+
return "", "", false, m.Cleanup(ctx, tgb)
205236
}
206-
return "", false, err
237+
return "", "", false, err
207238
}
208239

209-
currentCheckPoint, err := calculateTGBReconcileCheckpoint(endpoints, tgb)
240+
newCheckPoint, err := calculateTGBReconcileCheckpoint(endpoints, tgb)
210241

211242
if err != nil {
212-
return "", false, err
243+
return "", "", false, err
213244
}
214245

215-
savedCheckPoint := GetTGBReconcileCheckpoint(tgb)
246+
oldCheckPoint := GetTGBReconcileCheckpoint(tgb)
216247

217-
if currentCheckPoint == savedCheckPoint {
218-
m.logger.Info("Skipping targetgroupbinding reconcile", "TGB", k8s.NamespacedName(tgb), "calculated hash", currentCheckPoint)
219-
return currentCheckPoint, true, nil
248+
if newCheckPoint == oldCheckPoint {
249+
m.logger.Info("Skipping targetgroupbinding reconcile", "TGB", k8s.NamespacedName(tgb), "calculated hash", newCheckPoint)
250+
return newCheckPoint, oldCheckPoint, true, nil
220251
}
221252

222253
tgARN := tgb.Spec.TargetGroupARN
223254
targets, err := m.targetsManager.ListTargets(ctx, tgARN)
224255
if err != nil {
225-
return "", false, err
256+
return "", "", false, err
226257
}
227-
notDrainingTargets, drainingTargets := partitionTargetsByDrainingStatus(targets)
258+
notDrainingTargets, _ := partitionTargetsByDrainingStatus(targets)
228259
_, unmatchedEndpoints, unmatchedTargets := matchNodePortEndpointWithTargets(endpoints, notDrainingTargets)
229260

230261
if err := m.networkingManager.ReconcileForNodePortEndpoints(ctx, tgb, endpoints); err != nil {
231-
return "", false, err
262+
m.logger.Error(err, "Requesting network requeue due to error from ReconcileForNodePortEndpoints")
263+
return "", "", false, err
264+
}
265+
266+
if len(unmatchedEndpoints) > 0 || len(unmatchedTargets) > 0 {
267+
// Same thought process, see the IP target registration code as to why we clear out the check point.
268+
err = m.updateTGBCheckPoint(ctx, tgb, "", oldCheckPoint)
269+
if err != nil {
270+
m.logger.Error(err, "Unable to update checkpoint before mutating change")
271+
return "", "", false, err
272+
}
232273
}
233274

234275
if len(unmatchedTargets) > 0 {
235276
if err := m.deregisterTargets(ctx, tgARN, unmatchedTargets); err != nil {
236-
return "", false, err
277+
return "", "", false, err
237278
}
238279
}
239280
if len(unmatchedEndpoints) > 0 {
240281
if err := m.registerNodePortEndpoints(ctx, tgARN, unmatchedEndpoints); err != nil {
241-
return "", false, err
282+
return "", "", false, err
242283
}
243284
}
244-
_ = drainingTargets
245-
return currentCheckPoint, false, nil
285+
m.logger.Info("Successful reconcile", "checkpoint", newCheckPoint)
286+
return newCheckPoint, oldCheckPoint, false, nil
246287
}
247288

248289
func (m *defaultResourceManager) cleanupTargets(ctx context.Context, tgb *elbv2api.TargetGroupBinding) error {
@@ -461,6 +502,20 @@ func (m *defaultResourceManager) registerNodePortEndpoints(ctx context.Context,
461502
return m.targetsManager.RegisterTargets(ctx, tgARN, sdkTargets)
462503
}
463504

505+
func (m *defaultResourceManager) updateTGBCheckPoint(ctx context.Context, tgb *elbv2api.TargetGroupBinding, newCheckPoint, previousCheckPoint string) error {
506+
if newCheckPoint == previousCheckPoint {
507+
return nil
508+
}
509+
510+
tgbOld := tgb.DeepCopy()
511+
SaveTGBReconcileCheckpoint(tgb, newCheckPoint)
512+
513+
if err := m.k8sClient.Patch(ctx, tgb, client.MergeFrom(tgbOld)); err != nil {
514+
return errors.Wrapf(err, "failed to update targetGroupBinding checkpoint: %v", k8s.NamespacedName(tgb))
515+
}
516+
return nil
517+
}
518+
464519
type podEndpointAndTargetPair struct {
465520
endpoint backend.PodEndpoint
466521
target TargetInfo

0 commit comments

Comments
 (0)