Skip to content

Commit 1506aec

Browse files
author
helen
authored
Merge pull request #56 from jwcesign/main
feat: select the cheapest zone and add user data
2 parents ba7b8a0 + aa2e109 commit 1506aec

File tree

11 files changed

+38102
-51
lines changed

11 files changed

+38102
-51
lines changed

examples/workload/inflate.yaml

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ kind: Deployment
33
metadata:
44
name: inflate
55
spec:
6-
replicas: 2
6+
replicas: 1
77
selector:
88
matchLabels:
99
app: inflate
@@ -12,6 +12,13 @@ spec:
1212
labels:
1313
app: inflate
1414
spec:
15+
affinity:
16+
nodeAffinity:
17+
requiredDuringSchedulingIgnoredDuringExecution:
18+
nodeSelectorTerms:
19+
- matchExpressions:
20+
- key: karpenter.sh/capacity-type
21+
operator: Exists
1522
securityContext:
1623
runAsUser: 1000
1724
runAsGroup: 3000
@@ -21,7 +28,7 @@ spec:
2128
name: inflate
2229
resources:
2330
requests:
24-
cpu: "4"
25-
memory: 4Gi
31+
cpu: 250m
32+
memory: 250Mi
2633
securityContext:
2734
allowPrivilegeEscalation: false

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module github.com/cloudpilot-ai/karpenter-provider-alicloud
33
go 1.23
44

55
require (
6+
github.com/alibabacloud-go/cs-20151215/v5 v5.7.9
67
github.com/alibabacloud-go/darabonba-openapi/v2 v2.0.10
78
github.com/alibabacloud-go/ecs-20140526/v4 v4.26.4
89
github.com/alibabacloud-go/tea v1.2.2

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ github.com/alibabacloud-go/alibabacloud-gateway-pop v0.0.6/go.mod h1:4EUIoxs/do2
5050
github.com/alibabacloud-go/alibabacloud-gateway-spi v0.0.4/go.mod h1:sCavSAvdzOjul4cEqeVtvlSaSScfNsTQ+46HwlTL1hc=
5151
github.com/alibabacloud-go/alibabacloud-gateway-spi v0.0.5 h1:zE8vH9C7JiZLNJJQ5OwjU9mSi4T9ef9u3BURT6LCLC8=
5252
github.com/alibabacloud-go/alibabacloud-gateway-spi v0.0.5/go.mod h1:tWnyE9AjF8J8qqLk645oUmVUnFybApTQWklQmi5tY6g=
53+
github.com/alibabacloud-go/cs-20151215/v5 v5.7.9 h1:zQdpRgOhpYQVAXfpywnpz3vMxP+bLUoi2QfaFbg3WU4=
54+
github.com/alibabacloud-go/cs-20151215/v5 v5.7.9/go.mod h1:1pR8T0PY7DtF/t+yrOnG2aXj/TRGm1KYK7REaKyvPvM=
5355
github.com/alibabacloud-go/darabonba-array v0.1.0 h1:vR8s7b1fWAQIjEjWnuF0JiKsCvclSRTfDzZHTYqfufY=
5456
github.com/alibabacloud-go/darabonba-array v0.1.0/go.mod h1:BLKxr0brnggqOJPqT09DFJ8g3fsDshapUD3C3aOEFaI=
5557
github.com/alibabacloud-go/darabonba-encode-util v0.0.2 h1:1uJGrbsGEVqWcWxrS9MyC2NG0Ax+GpOM5gtupki31XE=

pkg/operator/operator.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,16 @@ import (
2020
"context"
2121
"os"
2222

23+
ackclient "github.com/alibabacloud-go/cs-20151215/v5/client"
2324
ecs "github.com/alibabacloud-go/ecs-20140526/v4/client"
2425
vpc "github.com/alibabacloud-go/vpc-20160428/v6/client"
2526
"github.com/patrickmn/go-cache"
2627
"sigs.k8s.io/controller-runtime/pkg/log"
2728
"sigs.k8s.io/karpenter/pkg/operator"
2829

2930
alicache "github.com/cloudpilot-ai/karpenter-provider-alicloud/pkg/cache"
31+
"github.com/cloudpilot-ai/karpenter-provider-alicloud/pkg/operator/options"
32+
"github.com/cloudpilot-ai/karpenter-provider-alicloud/pkg/providers/ack"
3033
"github.com/cloudpilot-ai/karpenter-provider-alicloud/pkg/providers/imagefamily"
3134
"github.com/cloudpilot-ai/karpenter-provider-alicloud/pkg/providers/instance"
3235
"github.com/cloudpilot-ai/karpenter-provider-alicloud/pkg/providers/instancetype"
@@ -67,6 +70,12 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont
6770
log.FromContext(ctx).Error(err, "Failed to create VPC client")
6871
os.Exit(1)
6972
}
73+
ackClient, err := ackclient.NewClient(clientConfig)
74+
if err != nil {
75+
log.FromContext(ctx).Error(err, "Failed to create ACK client")
76+
os.Exit(1)
77+
}
78+
clusterID := options.FromContext(ctx).ClusterID
7079
region := *ecsClient.RegionId
7180

7281
pricingProvider, err := pricing.NewDefaultProvider(ctx, region)
@@ -80,14 +89,14 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont
8089
securityGroupProvider := securitygroup.NewDefaultProvider(region, ecsClient, cache.New(alicache.DefaultTTL, alicache.DefaultCleanupInterval))
8190
imageProvider := imagefamily.NewDefaultProvider(region, ecsClient, cache.New(alicache.DefaultTTL, alicache.DefaultCleanupInterval))
8291
imageResolver := imagefamily.NewDefaultResolver(region, ecsClient, cache.New(alicache.InstanceTypeAvailableDiskTTL, alicache.DefaultCleanupInterval))
92+
ackProvider := ack.NewDefaultProvider(clusterID, ackClient)
8393

8494
instanceProvider := instance.NewDefaultProvider(
85-
ctx,
8695
region,
87-
"",
8896
ecsClient,
8997
imageResolver,
9098
vSwitchProvider,
99+
ackProvider,
91100
)
92101

93102
unavailableOfferingsCache := alicache.NewUnavailableOfferings()

pkg/operator/options/options.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,14 @@ func init() {
3636
type optionsKey struct{}
3737

3838
type Options struct {
39-
ClusterCABundle string
4039
ClusterName string
41-
ClusterEndpoint string
40+
ClusterID string
4241
VMMemoryOverheadPercent float64
4342
}
4443

4544
func (o *Options) AddFlags(fs *coreoptions.FlagSet) {
46-
fs.StringVar(&o.ClusterCABundle, "cluster-ca-bundle", env.WithDefaultString("CLUSTER_CA_BUNDLE", ""), "Cluster CA bundle for nodes to use for TLS connections with the API server. If not set, this is taken from the controller's TLS configuration.")
4745
fs.StringVar(&o.ClusterName, "cluster-name", env.WithDefaultString("CLUSTER_NAME", ""), "[REQUIRED] The kubernetes cluster name for resource discovery.")
48-
fs.StringVar(&o.ClusterEndpoint, "cluster-endpoint", env.WithDefaultString("CLUSTER_ENDPOINT", ""), "The external kubernetes cluster endpoint for new nodes to connect with. If not specified, will discover the cluster endpoint using DescribeCluster API.")
46+
fs.StringVar(&o.ClusterID, "cluster-id", env.WithDefaultString("CLUSTER_ID", ""), "The external kubernetes cluster id for new nodes to connect with.")
4947
fs.Float64Var(&o.VMMemoryOverheadPercent, "vm-memory-overhead-percent", utils.WithDefaultFloat64("VM_MEMORY_OVERHEAD_PERCENT", 0.075), "The VM memory overhead as a percent that will be subtracted from the total memory for all instance types.")
5048
}
5149

pkg/operator/options/options_validation.go

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,34 +18,22 @@ package options
1818

1919
import (
2020
"fmt"
21-
"net/url"
2221

2322
"go.uber.org/multierr"
2423
)
2524

2625
func (o Options) Validate() error {
2726
return multierr.Combine(
28-
o.validateEndpoint(),
2927
o.validateRequiredFields(),
3028
)
3129
}
3230

33-
func (o Options) validateEndpoint() error {
34-
if o.ClusterEndpoint == "" {
35-
return nil
36-
}
37-
endpoint, err := url.Parse(o.ClusterEndpoint)
38-
// url.Parse() will accept a lot of input without error; make
39-
// sure it's a real URL
40-
if err != nil || !endpoint.IsAbs() || endpoint.Hostname() == "" {
41-
return fmt.Errorf("%q is not a valid cluster-endpoint URL", o.ClusterEndpoint)
42-
}
43-
return nil
44-
}
45-
4631
func (o Options) validateRequiredFields() error {
4732
if o.ClusterName == "" {
4833
return fmt.Errorf("missing field, cluster-name")
4934
}
35+
if o.ClusterID == "" {
36+
return fmt.Errorf("missing field, cluster-id")
37+
}
5038
return nil
5139
}

pkg/providers/ack/ack.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
Copyright 2024 The CloudPilot AI Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package ack
18+
19+
import (
20+
"context"
21+
"encoding/base64"
22+
"errors"
23+
"fmt"
24+
"regexp"
25+
"strings"
26+
27+
ackclient "github.com/alibabacloud-go/cs-20151215/v5/client"
28+
"github.com/alibabacloud-go/tea/tea"
29+
"sigs.k8s.io/controller-runtime/pkg/log"
30+
)
31+
32+
type Provider interface {
33+
GetNodeRegisterScript(ctx context.Context, labels map[string]string) (string, error)
34+
}
35+
36+
type DefaultProvider struct {
37+
clusterID string
38+
ackClient *ackclient.Client
39+
}
40+
41+
func NewDefaultProvider(clusterID string, ackClient *ackclient.Client) *DefaultProvider {
42+
return &DefaultProvider{
43+
clusterID: clusterID,
44+
ackClient: ackClient,
45+
}
46+
}
47+
48+
func (p *DefaultProvider) GetNodeRegisterScript(ctx context.Context, labels map[string]string) (string, error) {
49+
reqPara := &ackclient.DescribeClusterAttachScriptsRequest{
50+
KeepInstanceName: tea.Bool(true),
51+
}
52+
53+
// TODO: Build a cache to store this
54+
resp, err := p.ackClient.DescribeClusterAttachScripts(tea.String(p.clusterID), reqPara)
55+
if err != nil {
56+
log.FromContext(ctx).Error(err, "Failed to get node registration script")
57+
return "", err
58+
}
59+
s := tea.StringValue(resp.Body)
60+
if s == "" {
61+
err := errors.New("empty node registration script")
62+
log.FromContext(ctx).Error(err, "")
63+
return "", err
64+
}
65+
66+
return p.resolveUserData(s, labels), nil
67+
}
68+
69+
func (p *DefaultProvider) resolveUserData(respStr string, labels map[string]string) string {
70+
cleanupStr := strings.ReplaceAll(respStr, "\r\n", "")
71+
72+
labelsFormated := fmt.Sprintf("ack.aliyun.com=%s", p.clusterID)
73+
for labelKey, labelValue := range labels {
74+
labelsFormated = fmt.Sprintf("%s,%s=%s", labelsFormated, labelKey, labelValue)
75+
}
76+
77+
re := regexp.MustCompile(`--labels\s+\S+`)
78+
updatedCommand := re.ReplaceAllString(cleanupStr, "--labels "+labelsFormated)
79+
80+
finalScript := fmt.Sprintf("#!/bin/bash\n\n%s", updatedCommand)
81+
82+
return base64.StdEncoding.EncodeToString([]byte(finalScript))
83+
}

pkg/providers/instance/instance.go

Lines changed: 48 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737

3838
"github.com/cloudpilot-ai/karpenter-provider-alicloud/pkg/apis/v1alpha1"
3939
"github.com/cloudpilot-ai/karpenter-provider-alicloud/pkg/operator/options"
40+
"github.com/cloudpilot-ai/karpenter-provider-alicloud/pkg/providers/ack"
4041
"github.com/cloudpilot-ai/karpenter-provider-alicloud/pkg/providers/imagefamily"
4142
"github.com/cloudpilot-ai/karpenter-provider-alicloud/pkg/providers/vswitch"
4243
"github.com/cloudpilot-ai/karpenter-provider-alicloud/pkg/utils/alierrors"
@@ -57,26 +58,24 @@ type Provider interface {
5758
}
5859

5960
type DefaultProvider struct {
60-
ecsClient *ecsclient.Client
61-
region string
62-
clusterEndpoint string
61+
ecsClient *ecsclient.Client
62+
region string
6363

64-
imageFamily imagefamily.Resolver
65-
66-
vSwitchProvider vswitch.Provider
64+
imageFamilyResolver imagefamily.Resolver
65+
vSwitchProvider vswitch.Provider
66+
ackProvider ack.Provider
6767
}
6868

69-
func NewDefaultProvider(ctx context.Context, region, clusterEndpoint string, ecsClient *ecsclient.Client,
70-
imageFamily imagefamily.Resolver,
71-
vSwitchProvider vswitch.Provider) *DefaultProvider {
69+
func NewDefaultProvider(region string, ecsClient *ecsclient.Client,
70+
imageFamilyResolver imagefamily.Resolver, vSwitchProvider vswitch.Provider,
71+
ackProvider ack.Provider) *DefaultProvider {
7272
return &DefaultProvider{
73-
ecsClient: ecsClient,
74-
region: region,
75-
clusterEndpoint: clusterEndpoint,
76-
77-
imageFamily: imageFamily,
73+
ecsClient: ecsClient,
74+
region: region,
7875

79-
vSwitchProvider: vSwitchProvider,
76+
imageFamilyResolver: imageFamilyResolver,
77+
vSwitchProvider: vSwitchProvider,
78+
ackProvider: ackProvider,
8079
}
8180
}
8281

@@ -387,7 +386,7 @@ func (p *DefaultProvider) getProvisioningGroup(ctx context.Context, nodeClass *v
387386
break
388387
}
389388

390-
vSwitchID := p.getVSwitchID(launchtemplate.InstanceTypes[i], zonalVSwitchs, requirements)
389+
vSwitchID := p.getVSwitchID(launchtemplate.InstanceTypes[i], zonalVSwitchs, requirements, capacityType)
391390
if vSwitchID == "" {
392391
return nil, errors.New("vSwitchID not found")
393392
}
@@ -409,6 +408,13 @@ func (p *DefaultProvider) getProvisioningGroup(ctx context.Context, nodeClass *v
409408
})
410409
}
411410

411+
labels := lo.Assign(nodeClaim.Labels, map[string]string{karpv1.CapacityTypeLabelKey: capacityType})
412+
userData, err := p.ackProvider.GetNodeRegisterScript(ctx, labels)
413+
if err != nil {
414+
log.FromContext(ctx).Error(err, "Failed to resolve user data for node")
415+
return nil, err
416+
}
417+
412418
createAutoProvisioningGroupRequest := &ecsclient.CreateAutoProvisioningGroupRequest{
413419
RegionId: tea.String(p.region),
414420
TotalTargetCapacity: tea.String("1"),
@@ -420,6 +426,7 @@ func (p *DefaultProvider) getProvisioningGroup(ctx context.Context, nodeClass *v
420426
LaunchConfiguration: &ecsclient.CreateAutoProvisioningGroupRequestLaunchConfiguration{
421427
ImageId: tea.String(launchtemplate.ImageID),
422428
SecurityGroupIds: launchtemplate.SecurityGroupIds,
429+
UserData: tea.String(userData),
423430

424431
// TODO: AutoProvisioningGroup is not compatible with SecurityGroupIds, waiting for Aliyun developers to fix it,
425432
// so here we only take the first one.
@@ -456,7 +463,12 @@ func (p *DefaultProvider) checkODFallback(nodeClaim *karpv1.NodeClaim, instanceT
456463
return nil
457464
}
458465

459-
func (p *DefaultProvider) getVSwitchID(instanceType *cloudprovider.InstanceType, zonalVSwitchs map[string]*vswitch.VSwitch, reqs scheduling.Requirements) string {
466+
func (p *DefaultProvider) getVSwitchID(instanceType *cloudprovider.InstanceType,
467+
zonalVSwitchs map[string]*vswitch.VSwitch, reqs scheduling.Requirements, capacityType string) string {
468+
cheapestVSwitchID := ""
469+
cheapestPrice := math.MaxFloat64
470+
471+
// For different AZ, the spot price may differ. So we need to get the cheapest vSwitch in the zone
460472
for i := range instanceType.Offerings {
461473
if reqs.Compatible(instanceType.Offerings[i].Requirements, scheduling.AllowUndefinedWellKnownLabels) != nil {
462474
continue
@@ -465,9 +477,17 @@ func (p *DefaultProvider) getVSwitchID(instanceType *cloudprovider.InstanceType,
465477
if !ok {
466478
continue
467479
}
468-
return vswitch.ID
480+
if capacityType == karpv1.CapacityTypeOnDemand {
481+
return vswitch.ID
482+
}
483+
484+
if instanceType.Offerings[i].Price < cheapestPrice {
485+
cheapestVSwitchID = vswitch.ID
486+
cheapestPrice = instanceType.Offerings[i].Price
487+
}
469488
}
470-
return ""
489+
490+
return cheapestVSwitchID
471491
}
472492

473493
type LaunchTemplate struct {
@@ -477,12 +497,14 @@ type LaunchTemplate struct {
477497
SystemDisk *v1alpha1.SystemDisk
478498
}
479499

480-
func (p *DefaultProvider) EnsureAll(ctx context.Context, nodeClass *v1alpha1.ECSNodeClass, nodeClaim *karpv1.NodeClaim, instanceTypes []*cloudprovider.InstanceType, capacityType string, tags map[string]string) ([]*LaunchTemplate, error) {
500+
func (p *DefaultProvider) EnsureAll(ctx context.Context, nodeClass *v1alpha1.ECSNodeClass,
501+
nodeClaim *karpv1.NodeClaim, instanceTypes []*cloudprovider.InstanceType,
502+
capacityType string, tags map[string]string) ([]*LaunchTemplate, error) {
481503
imageOptions, err := p.resolveImageOptions(ctx, nodeClass, lo.Assign(nodeClaim.Labels, map[string]string{karpv1.CapacityTypeLabelKey: capacityType}), tags)
482504
if err != nil {
483505
return nil, err
484506
}
485-
resolvedLaunchTemplates, err := p.imageFamily.Resolve(ctx, nodeClass, nodeClaim, instanceTypes, capacityType, imageOptions)
507+
resolvedLaunchTemplates, err := p.imageFamilyResolver.Resolve(ctx, nodeClass, nodeClaim, instanceTypes, capacityType, imageOptions)
486508
if err != nil {
487509
return nil, err
488510
}
@@ -526,11 +548,10 @@ func (p *DefaultProvider) resolveImageOptions(ctx context.Context, nodeClass *v1
526548
return nil, fmt.Errorf("no security groups are present in the status")
527549
}
528550
return &imagefamily.Options{
529-
ClusterName: options.FromContext(ctx).ClusterName,
530-
ClusterEndpoint: p.clusterEndpoint,
531-
SecurityGroups: nodeClass.Status.SecurityGroups,
532-
Tags: tags,
533-
Labels: labels,
534-
NodeClassName: nodeClass.Name,
551+
ClusterName: options.FromContext(ctx).ClusterName,
552+
SecurityGroups: nodeClass.Status.SecurityGroups,
553+
Tags: tags,
554+
Labels: labels,
555+
NodeClassName: nodeClass.Name,
535556
}, nil
536557
}

0 commit comments

Comments
 (0)