Skip to content

Commit b2daadd

Browse files
committed
instancetype add list
Signed-off-by: helen <helenfrank@protonmail.com>
1 parent b217f2f commit b2daadd

File tree

8 files changed

+706
-15
lines changed

8 files changed

+706
-15
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ require (
117117
k8s.io/klog v1.0.0 // indirect
118118
k8s.io/klog/v2 v2.130.1 // indirect
119119
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect
120-
knative.dev/pkg v0.0.0-20230712131115-7051d301e7f4 // indirect
120+
knative.dev/pkg v0.0.0-20230712131115-7051d301e7f4
121121
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
122122
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
123123
)

pkg/cache/cache.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
Copyright 2024 The CloudPilot AI Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package cache
18+
19+
import "time"
20+
21+
const (
22+
// KubernetesVersionTTL is the time before the detected Kubernetes version is removed from cache,
23+
// to be re-detected next time it is needed.
24+
KubernetesVersionTTL = 15 * time.Minute
25+
// UnavailableOfferingsTTL is the time before offerings that were marked as unavailable
26+
// are removed from the cache and are available for launch again
27+
UnavailableOfferingsTTL = 3 * time.Minute
28+
29+
// DefaultCleanupInterval triggers cache cleanup (lazy eviction) at this interval.
30+
DefaultCleanupInterval = 1 * time.Minute
31+
// UnavailableOfferingsCleanupInterval triggers cache cleanup (lazy eviction) at this interval.
32+
// We drop the cleanup interval down for the ICE cache to get quicker reactivity to offerings
33+
// that become available after they get evicted from the cache
34+
UnavailableOfferingsCleanupInterval = time.Second * 10
35+
)

pkg/cache/unavailableofferings.go

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
Copyright 2024 The CloudPilot AI Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package cache
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"sync/atomic"
23+
"time"
24+
25+
"github.com/patrickmn/go-cache"
26+
"knative.dev/pkg/logging"
27+
"sigs.k8s.io/karpenter/pkg/apis/v1beta1"
28+
)
29+
30+
var (
31+
spotKey = key("", "", v1beta1.CapacityTypeSpot)
32+
)
33+
34+
// UnavailableOfferings stores any offerings that return ICE (insufficient capacity errors) when
35+
// attempting to launch the capacity. These offerings are ignored as long as they are in the cache on
36+
// GetInstanceTypes responses
37+
type UnavailableOfferings struct {
38+
// key: <capacityType>:<instanceType>:<zone>, value: struct{}{}
39+
cache *cache.Cache
40+
SeqNum uint64
41+
}
42+
43+
func NewUnavailableOfferingsWithCache(c *cache.Cache) *UnavailableOfferings {
44+
uo := &UnavailableOfferings{
45+
cache: c,
46+
SeqNum: 0,
47+
}
48+
uo.cache.OnEvicted(func(_ string, _ interface{}) {
49+
atomic.AddUint64(&uo.SeqNum, 1)
50+
})
51+
return uo
52+
}
53+
54+
func NewUnavailableOfferings() *UnavailableOfferings {
55+
return NewUnavailableOfferingsWithCache(
56+
cache.New(UnavailableOfferingsTTL, UnavailableOfferingsCleanupInterval))
57+
}
58+
59+
// IsUnavailable returns true if the offering appears in the cache
60+
func (u *UnavailableOfferings) IsUnavailable(instanceType, zone, capacityType string) bool {
61+
if capacityType == v1beta1.CapacityTypeSpot {
62+
if _, found := u.cache.Get(spotKey); found {
63+
return true
64+
}
65+
}
66+
_, found := u.cache.Get(key(instanceType, zone, capacityType))
67+
return found
68+
}
69+
70+
// MarkUnavailableWithTTL allows us to mark an offering unavailable with a custom TTL
71+
func (u *UnavailableOfferings) MarkUnavailableWithTTL(ctx context.Context, unavailableReason, instanceType, zone, capacityType string, ttl time.Duration) {
72+
// even if the key is already in the cache, we still need to call Set to extend the cached entry's TTL
73+
logging.FromContext(ctx).With(
74+
"unavailable", unavailableReason,
75+
"instance-type", instanceType,
76+
"zone", zone,
77+
"capacity-type", capacityType,
78+
"ttl", ttl).Debugf("removing offering from offerings")
79+
u.cache.Set(key(instanceType, zone, capacityType), struct{}{}, ttl)
80+
atomic.AddUint64(&u.SeqNum, 1)
81+
}
82+
83+
// MarkUnavailable communicates recently observed temporary capacity shortages in the provided offerings
84+
func (u *UnavailableOfferings) MarkUnavailable(ctx context.Context, unavailableReason, instanceType, zone, capacityType string) {
85+
u.MarkUnavailableWithTTL(ctx, unavailableReason, instanceType, zone, capacityType, UnavailableOfferingsTTL)
86+
}
87+
88+
func (u *UnavailableOfferings) Flush() {
89+
u.cache.Flush()
90+
atomic.AddUint64(&u.SeqNum, 1)
91+
}
92+
93+
// key returns the cache key for all offerings in the cache
94+
func key(instanceType string, zone string, capacityType string) string {
95+
return fmt.Sprintf("%s:%s:%s", capacityType, instanceType, zone)
96+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
Copyright 2024 The CloudPilot AI Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package cache
18+
19+
import (
20+
"context"
21+
"testing"
22+
"time"
23+
24+
"github.com/patrickmn/go-cache"
25+
)
26+
27+
func TestUnavailableOfferings(t *testing.T) {
28+
// create a new cache with a short TTL
29+
c := cache.New(time.Second, time.Second)
30+
u := NewUnavailableOfferingsWithCache(c)
31+
32+
// test that an offering is not marked as unavailable initially
33+
if u.IsUnavailable("NV16as_v4", "westus", "spot") {
34+
t.Error("Offering should not be marked as unavailable initially")
35+
}
36+
37+
// mark the offering as unavailable
38+
u.MarkUnavailableWithTTL(context.TODO(), "test reason", "NV16as_v4", "westus", "spot", time.Second)
39+
40+
// test that the offering is now marked as unavailable
41+
if !u.IsUnavailable("NV16as_v4", "westus", "spot") {
42+
t.Error("Offering should be marked as unavailable after being marked as such")
43+
}
44+
45+
// wait for the cache entry to expire
46+
time.Sleep(time.Second)
47+
48+
// test that the offering is no longer marked as unavailable
49+
if u.IsUnavailable("NV16as_v4", "westus", "spot") {
50+
t.Error("Offering should not be marked as unavailable after cache entry has expired")
51+
}
52+
}
53+
54+
func TestUnavailableOfferings_KeyGeneration(t *testing.T) {
55+
expectedKey := "spot:NV16as_v4:westus"
56+
key := key("NV16as_v4", "westus", "spot")
57+
if key != expectedKey {
58+
t.Errorf("Expected key to be %s, but got %s", expectedKey, key)
59+
}
60+
}

pkg/operator/options/options.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import (
2525

2626
coreoptions "sigs.k8s.io/karpenter/pkg/operator/options"
2727
"sigs.k8s.io/karpenter/pkg/utils/env"
28+
29+
"github.com/cloudpilot-ai/karpenter-provider-alicloud/pkg/utils"
2830
)
2931

3032
func init() {
@@ -34,15 +36,17 @@ func init() {
3436
type optionsKey struct{}
3537

3638
type Options struct {
37-
ClusterCABundle string
38-
ClusterName string
39-
ClusterEndpoint string
39+
ClusterCABundle string
40+
ClusterName string
41+
ClusterEndpoint string
42+
VMMemoryOverheadPercent float64
4043
}
4144

4245
func (o *Options) AddFlags(fs *coreoptions.FlagSet) {
4346
fs.StringVar(&o.ClusterCABundle, "cluster-ca-bundle", env.WithDefaultString("CLUSTER_CA_BUNDLE", ""), "Cluster CA bundle for nodes to use for TLS connections with the API server. If not set, this is taken from the controller's TLS configuration.")
4447
fs.StringVar(&o.ClusterName, "cluster-name", env.WithDefaultString("CLUSTER_NAME", ""), "[REQUIRED] The kubernetes cluster name for resource discovery.")
4548
fs.StringVar(&o.ClusterEndpoint, "cluster-endpoint", env.WithDefaultString("CLUSTER_ENDPOINT", ""), "The external kubernetes cluster endpoint for new nodes to connect with. If not specified, will discover the cluster endpoint using DescribeCluster API.")
49+
fs.Float64Var(&o.VMMemoryOverheadPercent, "vm-memory-overhead-percent", utils.WithDefaultFloat64("VM_MEMORY_OVERHEAD_PERCENT", 0.075), "The VM memory overhead as a percent that will be subtracted from the total memory for all instance types.")
4650
}
4751

4852
func (o *Options) Parse(fs *coreoptions.FlagSet, args ...string) error {

0 commit comments

Comments
 (0)