Skip to content

Commit 1098b47

Browse files
author
jwcesign
authored
Merge pull request #41 from helen-frank/nodeclaim-controller
feat: nodeclaim add controller garbagecollection and tagging
2 parents afb4ec8 + 4e5cd48 commit 1098b47

File tree

7 files changed

+283
-2
lines changed

7 files changed

+283
-2
lines changed

cmd/controller/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ func main() {
5353
op.GetClient(),
5454
op.EventRecorder,
5555
cloudProvider,
56+
op.InstanceProvider,
5657
op.PricingProvider,
5758
)...).
5859
Start(ctx, cloudProvider)

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ require (
115115
k8s.io/component-base v0.30.3 // indirect
116116
k8s.io/csi-translation-lib v0.30.3 // indirect
117117
k8s.io/klog v1.0.0 // indirect
118-
k8s.io/klog/v2 v2.130.1 // indirect
118+
k8s.io/klog/v2 v2.130.1
119119
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect
120120
knative.dev/pkg v0.0.0-20230712131115-7051d301e7f4 // indirect
121121
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect

pkg/controllers/controllers.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,20 @@ import (
2626
"sigs.k8s.io/karpenter/pkg/cloudprovider"
2727
"sigs.k8s.io/karpenter/pkg/events"
2828

29+
nodeclaimgarbagecollection "github.com/cloudpilot-ai/karpenter-provider-alicloud/pkg/controllers/nodeclaim/garbagecollection"
30+
nodeclaimtagging "github.com/cloudpilot-ai/karpenter-provider-alicloud/pkg/controllers/nodeclaim/tagging"
2931
controllerspricing "github.com/cloudpilot-ai/karpenter-provider-alicloud/pkg/controllers/providers/pricing"
32+
"github.com/cloudpilot-ai/karpenter-provider-alicloud/pkg/providers/instance"
3033
"github.com/cloudpilot-ai/karpenter-provider-alicloud/pkg/providers/pricing"
3134
)
3235

3336
func NewControllers(ctx context.Context, mgr manager.Manager, clk clock.Clock, kubeClient client.Client, recorder events.Recorder,
34-
cloudProvider cloudprovider.CloudProvider, pricingProvider pricing.Provider) []controller.Controller {
37+
cloudProvider cloudprovider.CloudProvider, instanceProvider instance.Provider, pricingProvider pricing.Provider) []controller.Controller {
3538

3639
controllers := []controller.Controller{
3740
controllerspricing.NewController(pricingProvider),
41+
nodeclaimgarbagecollection.NewController(kubeClient, cloudProvider),
42+
nodeclaimtagging.NewController(kubeClient, instanceProvider),
3843
}
3944
return controllers
4045
}
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
Copyright 2024 The CloudPilot AI Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package garbagecollection
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"time"
23+
24+
"github.com/awslabs/operatorpkg/singleton"
25+
"github.com/samber/lo"
26+
"go.uber.org/multierr"
27+
corev1 "k8s.io/api/core/v1"
28+
"k8s.io/apimachinery/pkg/util/sets"
29+
"k8s.io/client-go/util/workqueue"
30+
"k8s.io/klog/v2"
31+
controllerruntime "sigs.k8s.io/controller-runtime"
32+
"sigs.k8s.io/controller-runtime/pkg/client"
33+
"sigs.k8s.io/controller-runtime/pkg/log"
34+
"sigs.k8s.io/controller-runtime/pkg/manager"
35+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
36+
karpv1 "sigs.k8s.io/karpenter/pkg/apis/v1"
37+
"sigs.k8s.io/karpenter/pkg/cloudprovider"
38+
"sigs.k8s.io/karpenter/pkg/operator/injection"
39+
)
40+
41+
type Controller struct {
42+
kubeClient client.Client
43+
cloudProvider cloudprovider.CloudProvider
44+
successfulCount uint64 // keeps track of successful reconciles for more aggressive requeueing near the start of the controller
45+
}
46+
47+
func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudProvider) *Controller {
48+
return &Controller{
49+
kubeClient: kubeClient,
50+
cloudProvider: cloudProvider,
51+
successfulCount: 0,
52+
}
53+
}
54+
55+
func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) {
56+
ctx = injection.WithControllerName(ctx, "instance.garbagecollection")
57+
58+
// We LIST machines on the CloudProvider BEFORE we grab Machines/Nodes on the cluster so that we make sure that, if
59+
// LISTing instances takes a long time, our information is more updated by the time we get to Machine and Node LIST
60+
// This works since our CloudProvider instances are deleted based on whether the Machine exists or not, not vise-versa
61+
retrieved, err := c.cloudProvider.List(ctx)
62+
if err != nil {
63+
return reconcile.Result{}, fmt.Errorf("listing cloudprovider machines, %w", err)
64+
}
65+
managedRetrieved := lo.Filter(retrieved, func(nc *karpv1.NodeClaim, _ int) bool {
66+
return nc.DeletionTimestamp.IsZero()
67+
})
68+
nodeClaimList := &karpv1.NodeClaimList{}
69+
if err = c.kubeClient.List(ctx, nodeClaimList); err != nil {
70+
return reconcile.Result{}, err
71+
}
72+
nodeList := &corev1.NodeList{}
73+
if err = c.kubeClient.List(ctx, nodeList); err != nil {
74+
return reconcile.Result{}, err
75+
}
76+
resolvedProviderIDs := sets.New[string](lo.FilterMap(nodeClaimList.Items, func(n karpv1.NodeClaim, _ int) (string, bool) {
77+
return n.Status.ProviderID, n.Status.ProviderID != ""
78+
})...)
79+
errs := make([]error, len(retrieved))
80+
workqueue.ParallelizeUntil(ctx, 100, len(managedRetrieved), func(i int) {
81+
if !resolvedProviderIDs.Has(managedRetrieved[i].Status.ProviderID) &&
82+
time.Since(managedRetrieved[i].CreationTimestamp.Time) > time.Second*30 {
83+
errs[i] = c.garbageCollect(ctx, managedRetrieved[i], nodeList)
84+
}
85+
})
86+
if err = multierr.Combine(errs...); err != nil {
87+
return reconcile.Result{}, err
88+
}
89+
c.successfulCount++
90+
return reconcile.Result{RequeueAfter: lo.Ternary(c.successfulCount <= 20, time.Second*10, time.Minute*2)}, nil
91+
}
92+
93+
func (c *Controller) garbageCollect(ctx context.Context, nodeClaim *karpv1.NodeClaim, nodeList *corev1.NodeList) error {
94+
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("provider-id", nodeClaim.Status.ProviderID))
95+
if err := c.cloudProvider.Delete(ctx, nodeClaim); err != nil {
96+
return cloudprovider.IgnoreNodeClaimNotFoundError(err)
97+
}
98+
log.FromContext(ctx).V(1).Info("garbage collected cloudprovider instance")
99+
100+
// Go ahead and cleanup the node if we know that it exists to make scheduling go quicker
101+
if node, ok := lo.Find(nodeList.Items, func(n corev1.Node) bool {
102+
return n.Spec.ProviderID == nodeClaim.Status.ProviderID
103+
}); ok {
104+
if err := c.kubeClient.Delete(ctx, &node); err != nil {
105+
return client.IgnoreNotFound(err)
106+
}
107+
log.FromContext(ctx).WithValues("Node", klog.KRef("", node.Name)).V(1).Info("garbage collected node")
108+
}
109+
return nil
110+
}
111+
112+
func (c *Controller) Register(_ context.Context, m manager.Manager) error {
113+
return controllerruntime.NewControllerManagedBy(m).
114+
Named("instance.garbagecollection").
115+
WatchesRawSource(singleton.Source()).
116+
Complete(singleton.AsReconciler(c))
117+
}
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
Copyright 2024 The CloudPilot AI Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package tagging
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"time"
23+
24+
"github.com/awslabs/operatorpkg/reasonable"
25+
"github.com/samber/lo"
26+
"k8s.io/apimachinery/pkg/api/equality"
27+
controllerruntime "sigs.k8s.io/controller-runtime"
28+
"sigs.k8s.io/controller-runtime/pkg/client"
29+
"sigs.k8s.io/controller-runtime/pkg/controller"
30+
"sigs.k8s.io/controller-runtime/pkg/log"
31+
"sigs.k8s.io/controller-runtime/pkg/manager"
32+
"sigs.k8s.io/controller-runtime/pkg/predicate"
33+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
34+
karpv1 "sigs.k8s.io/karpenter/pkg/apis/v1"
35+
"sigs.k8s.io/karpenter/pkg/cloudprovider"
36+
"sigs.k8s.io/karpenter/pkg/operator/injection"
37+
38+
"github.com/cloudpilot-ai/karpenter-provider-alicloud/pkg/apis/v1alpha1"
39+
"github.com/cloudpilot-ai/karpenter-provider-alicloud/pkg/operator/options"
40+
"github.com/cloudpilot-ai/karpenter-provider-alicloud/pkg/providers/instance"
41+
"github.com/cloudpilot-ai/karpenter-provider-alicloud/pkg/utils"
42+
)
43+
44+
type Controller struct {
45+
kubeClient client.Client
46+
instanceProvider instance.Provider
47+
}
48+
49+
func NewController(kubeClient client.Client, instanceProvider instance.Provider) *Controller {
50+
return &Controller{
51+
kubeClient: kubeClient,
52+
instanceProvider: instanceProvider,
53+
}
54+
}
55+
56+
func (c *Controller) Reconcile(ctx context.Context, nodeClaim *karpv1.NodeClaim) (reconcile.Result, error) {
57+
ctx = injection.WithControllerName(ctx, "nodeclaim.tagging")
58+
59+
stored := nodeClaim.DeepCopy()
60+
if !isTaggable(nodeClaim) {
61+
return reconcile.Result{}, nil
62+
}
63+
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("provider-id", nodeClaim.Status.ProviderID))
64+
id, err := utils.ParseInstanceID(nodeClaim.Status.ProviderID)
65+
if err != nil {
66+
// We don't throw an error here since we don't want to retry until the ProviderID has been updated.
67+
log.FromContext(ctx).Error(err, "failed parsing instance id")
68+
return reconcile.Result{}, nil
69+
}
70+
if err = c.tagInstance(ctx, nodeClaim, id); err != nil {
71+
return reconcile.Result{}, cloudprovider.IgnoreNodeClaimNotFoundError(err)
72+
}
73+
nodeClaim.Annotations = lo.Assign(nodeClaim.Annotations, map[string]string{
74+
v1alpha1.AnnotationInstanceTagged: "true",
75+
v1alpha1.AnnotationClusterNameTaggedCompatability: "true",
76+
})
77+
if !equality.Semantic.DeepEqual(nodeClaim, stored) {
78+
if err := c.kubeClient.Patch(ctx, nodeClaim, client.MergeFrom(stored)); err != nil {
79+
return reconcile.Result{}, client.IgnoreNotFound(err)
80+
}
81+
}
82+
return reconcile.Result{}, nil
83+
}
84+
85+
func (c *Controller) Register(_ context.Context, m manager.Manager) error {
86+
return controllerruntime.NewControllerManagedBy(m).
87+
Named("nodeclaim.tagging").
88+
For(&karpv1.NodeClaim{}).
89+
WithEventFilter(predicate.NewPredicateFuncs(func(o client.Object) bool {
90+
return isTaggable(o.(*karpv1.NodeClaim))
91+
})).
92+
// Ok with using the default MaxConcurrentReconciles of 1 to avoid throttling from CreateTag write API
93+
WithOptions(controller.Options{
94+
RateLimiter: reasonable.RateLimiter(),
95+
}).
96+
Complete(reconcile.AsReconciler(m.GetClient(), c))
97+
}
98+
99+
func (c *Controller) tagInstance(ctx context.Context, nc *karpv1.NodeClaim, id string) error {
100+
tags := map[string]string{
101+
v1alpha1.TagName: nc.Status.NodeName,
102+
v1alpha1.TagNodeClaim: nc.Name,
103+
v1alpha1.EKSClusterNameTagKey: options.FromContext(ctx).ClusterName,
104+
}
105+
106+
// Remove tags which have been already populated
107+
instance, err := c.instanceProvider.Get(ctx, id)
108+
if err != nil {
109+
return fmt.Errorf("tagging nodeclaim, %w", err)
110+
}
111+
tags = lo.OmitByKeys(tags, lo.Keys(instance.Tags))
112+
if len(tags) == 0 {
113+
return nil
114+
}
115+
116+
// Ensures that no more than 1 CreateTags call is made per second. Rate limiting is required since CreateTags
117+
// shares a pool with other mutating calls (e.g. CreateFleet).
118+
defer time.Sleep(time.Second)
119+
if err := c.instanceProvider.CreateTags(ctx, id, tags); err != nil {
120+
return fmt.Errorf("tagging nodeclaim, %w", err)
121+
}
122+
return nil
123+
}
124+
125+
func isTaggable(nc *karpv1.NodeClaim) bool {
126+
// Instance has already been tagged
127+
instanceTagged := nc.Annotations[v1alpha1.AnnotationInstanceTagged]
128+
clusterNameTagged := nc.Annotations[v1alpha1.AnnotationClusterNameTaggedCompatability]
129+
if instanceTagged == "true" && clusterNameTagged == "true" {
130+
return false
131+
}
132+
// Node name is not yet known
133+
if nc.Status.NodeName == "" {
134+
return false
135+
}
136+
// NodeClaim is currently terminating
137+
if !nc.DeletionTimestamp.IsZero() {
138+
return false
139+
}
140+
return true
141+
}

vendor/github.com/awslabs/operatorpkg/reasonable/reasonable.go

Lines changed: 16 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/modules.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ github.com/awslabs/operatorpkg/controller
7777
github.com/awslabs/operatorpkg/metrics
7878
github.com/awslabs/operatorpkg/object
7979
github.com/awslabs/operatorpkg/option
80+
github.com/awslabs/operatorpkg/reasonable
8081
github.com/awslabs/operatorpkg/singleton
8182
github.com/awslabs/operatorpkg/status
8283
# github.com/beorn7/perks v1.0.1

0 commit comments

Comments
 (0)