Skip to content

Commit 63f4cfc

Browse files
committed
Merge branch 'master' into use-central-flags
2 parents dba5a39 + a0ee591 commit 63f4cfc

File tree

22 files changed

+965
-164
lines changed

22 files changed

+965
-164
lines changed

cmd/csi-provisioner/csi-provisioner.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ import (
6060
_ "k8s.io/component-base/metrics/prometheus/workqueue" // register work queues in the default legacy registry
6161
csitrans "k8s.io/csi-translation-lib"
6262
"k8s.io/klog/v2"
63-
"sigs.k8s.io/sig-storage-lib-external-provisioner/v12/controller"
64-
libmetrics "sigs.k8s.io/sig-storage-lib-external-provisioner/v12/controller/metrics"
63+
"sigs.k8s.io/sig-storage-lib-external-provisioner/v13/controller"
64+
libmetrics "sigs.k8s.io/sig-storage-lib-external-provisioner/v13/controller/metrics"
6565

6666
"github.com/kubernetes-csi/csi-lib-utils/leaderelection"
6767
"github.com/kubernetes-csi/csi-lib-utils/metrics"
@@ -92,6 +92,9 @@ var (
9292

9393
defaultFSType = flag.String("default-fstype", "", "The default filesystem type of the volume to provision when fstype is unspecified in the StorageClass. If the default is not set and fstype is unset in the StorageClass, then no fstype will be set")
9494

95+
kubeAPICapacityQPS = flag.Float32("kube-api-capacity-qps", 1, "QPS to use for storage capacity updates while communicating with the kubernetes apiserver. Defaults to 1.0.")
96+
kubeAPICapacityBurst = flag.Int("kube-api-capacity-burst", 5, "Burst to use for storage capacity updates while communicating with the kubernetes apiserver. Defaults to 5.")
97+
9598
enableCapacity = flag.Bool("enable-capacity", false, "This enables producing CSIStorageCapacity objects with capacity information from the driver's GetCapacity call.")
9699
capacityImmediateBinding = flag.Bool("capacity-for-immediate-binding", false, "Enables producing capacity information for storage classes with immediate binding. Not needed for the Kubernetes scheduler, maybe useful for other consumers or for debugging.")
97100
capacityPollInterval = flag.Duration("capacity-poll-interval", time.Minute, "How long the external-provisioner waits before checking for storage capacity changes.")
@@ -387,7 +390,6 @@ func main() {
387390
controller.Threadiness(int(*workerThreads)),
388391
controller.CreateProvisionedPVLimiter(workqueue.DefaultTypedControllerRateLimiter[string]()),
389392
controller.ClaimsInformer(claimInformer),
390-
controller.NodesLister(nodeLister),
391393
controller.RetryIntervalMax(standardflags.Configuration.RetryIntervalMax),
392394
}
393395

@@ -398,6 +400,10 @@ func main() {
398400
if supportsMigrationFromInTreePluginName != "" {
399401
provisionerOptions = append(provisionerOptions, controller.AdditionalProvisionerNames([]string{supportsMigrationFromInTreePluginName}))
400402
}
403+
var pvcNodeStore *ctrl.InMemoryStore
404+
if ctrl.SupportsTopology(pluginCapabilities) {
405+
pvcNodeStore = ctrl.NewInMemoryStore()
406+
}
401407

402408
// Create the provisioner: it implements the Provisioner interface expected by
403409
// the controller
@@ -427,12 +433,15 @@ func main() {
427433
nodeDeployment,
428434
*controllerPublishReadOnly,
429435
*preventVolumeModeConversion,
436+
pvcNodeStore,
430437
)
431438

432439
var capacityController *capacity.Controller
433440
if *enableCapacity {
434441
// Publishing storage capacity information uses its own client
435442
// with separate rate limiting.
443+
config.QPS = *kubeAPICapacityQPS
444+
config.Burst = *kubeAPICapacityBurst
436445
clientset, err := kubernetes.NewForConfig(config)
437446
if err != nil {
438447
klog.Fatalf("Failed to create client: %v", err)

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ require (
3333
github.com/onsi/ginkgo/v2 v2.23.4
3434
github.com/onsi/gomega v1.37.0
3535
k8s.io/kubernetes v1.34.0
36-
sigs.k8s.io/sig-storage-lib-external-provisioner/v12 v12.0.1
36+
sigs.k8s.io/sig-storage-lib-external-provisioner/v13 v13.0.0
3737
)
3838

3939
require (

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -374,8 +374,8 @@ sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 h1:IpInykpT6ceI+QxKBbEflcR5E
374374
sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730/go.mod h1:mdzfpAEoE6DHQEN0uh9ZbOCuHbLK5wOm7dK4ctXE9Tg=
375375
sigs.k8s.io/randfill v1.0.0 h1:JfjMILfT8A6RbawdsK2JXGBR5AQVfd+9TbzrlneTyrU=
376376
sigs.k8s.io/randfill v1.0.0/go.mod h1:XeLlZ/jmk4i1HRopwe7/aU3H5n1zNUcX6TM94b3QxOY=
377-
sigs.k8s.io/sig-storage-lib-external-provisioner/v12 v12.0.1 h1:rQcMFPcZ12y82+BA7b29gCWyoI/+/gAQUZu/Cw+8bC0=
378-
sigs.k8s.io/sig-storage-lib-external-provisioner/v12 v12.0.1/go.mod h1:kPy4hBso6PNhP9PdlTDdBZqxP1RKg7DFFM7lIR1bA8k=
377+
sigs.k8s.io/sig-storage-lib-external-provisioner/v13 v13.0.0 h1:bqSqBfqtToTDMDz+FEzfqofXAp5ptt6Z7ShR0g05PGA=
378+
sigs.k8s.io/sig-storage-lib-external-provisioner/v13 v13.0.0/go.mod h1:1xSe5kgJcKbrtNdD5WoytKUoByAGDl3wVHlKP0RZIC8=
379379
sigs.k8s.io/structured-merge-diff/v6 v6.3.0 h1:jTijUJbW353oVOd9oTlifJqOGEkUw2jB/fXCbTiQEco=
380380
sigs.k8s.io/structured-merge-diff/v6 v6.3.0/go.mod h1:M3W8sfWvn2HhQDIbGWj3S099YozAsymCo/wrT5ohRUE=
381381
sigs.k8s.io/yaml v1.6.0 h1:G8fkbMSAFqgEFgh4b1wmtzDnioxFCUgTZhlbj5P9QYs=

pkg/capacity/provision.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import (
2020
"context"
2121

2222
v1 "k8s.io/api/core/v1"
23-
"sigs.k8s.io/sig-storage-lib-external-provisioner/v12/controller"
23+
"sigs.k8s.io/sig-storage-lib-external-provisioner/v13/controller"
2424
)
2525

2626
type provisionWrapper struct {

pkg/controller/cache.go

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package controller
2+
3+
import (
4+
"fmt"
5+
"sync"
6+
7+
"k8s.io/apimachinery/pkg/types"
8+
)
9+
10+
// TopologyInfo holds the data for NodeLabels and TopologyKeys
11+
type TopologyInfo struct {
12+
NodeLabels map[string]string
13+
TopologyKeys []string
14+
RequisiteTerms []topologyTerm
15+
}
16+
17+
// TopologyProvider is an interface that defines the behavior for looking up
18+
// a TopologyInfo object by its pvc UID.
19+
type TopologyProvider interface {
20+
GetByPvcUID(pvcUID types.UID) (*TopologyInfo, error)
21+
// The entry is deleted when provision succeeds or returns a final error.
22+
Delete(pvcUID types.UID) error
23+
24+
// Update methods now perform an "upsert" and don't return errors.
25+
UpdateNodeLabels(pvcUID types.UID, newLabels map[string]string)
26+
UpdateTopologyKeys(pvcUID types.UID, newKeys []string)
27+
UpdateRequisiteTerms(pvcUID types.UID, requisiteTerms []topologyTerm)
28+
}
29+
30+
// InMemoryStore is a concrete implementation of TopologyProvider.
31+
// It uses an in-memory map for quick lookups.
32+
type InMemoryStore struct {
33+
// The map key is the object's name.
34+
data map[types.UID]*TopologyInfo
35+
// Adding a mutex for thread-safe access
36+
mutex sync.RWMutex
37+
}
38+
39+
// NewInMemoryStore creates and initializes a new store.
40+
func NewInMemoryStore() *InMemoryStore {
41+
return &InMemoryStore{
42+
data: make(map[types.UID]*TopologyInfo),
43+
}
44+
}
45+
46+
// Delete implements the TopologyProvider interface.
47+
// It uses the built-in delete() function to remove the item from the map.
48+
func (s *InMemoryStore) Delete(pvcUID types.UID) error {
49+
s.mutex.Lock()
50+
defer s.mutex.Unlock()
51+
// First, check if the key exists to provide a helpful error.
52+
_, found := s.data[pvcUID]
53+
if !found {
54+
return nil
55+
}
56+
delete(s.data, pvcUID)
57+
return nil
58+
}
59+
60+
// GetByPvcUID implements the TopologyProvider interface.
61+
func (s *InMemoryStore) GetByPvcUID(pvcUID types.UID) (*TopologyInfo, error) {
62+
if s == nil {
63+
return nil, fmt.Errorf("pvcNodeStore is nil")
64+
}
65+
s.mutex.RLock()
66+
defer s.mutex.RUnlock()
67+
info, found := s.data[pvcUID]
68+
if !found {
69+
return nil, fmt.Errorf("topology object with pvcUID '%s' not found", pvcUID)
70+
}
71+
72+
// Return a deep copy to prevent data races
73+
infoCopy := &TopologyInfo{}
74+
if info.NodeLabels != nil {
75+
infoCopy.NodeLabels = make(map[string]string)
76+
for k, v := range info.NodeLabels {
77+
infoCopy.NodeLabels[k] = v
78+
}
79+
}
80+
81+
if info.TopologyKeys != nil {
82+
infoCopy.TopologyKeys = make([]string, len(info.TopologyKeys))
83+
copy(infoCopy.TopologyKeys, info.TopologyKeys)
84+
}
85+
86+
if info.RequisiteTerms != nil {
87+
infoCopy.RequisiteTerms = make([]topologyTerm, len(info.RequisiteTerms))
88+
for i, term := range info.RequisiteTerms {
89+
newTerm := make(topologyTerm, len(term))
90+
copy(newTerm, term)
91+
infoCopy.RequisiteTerms[i] = newTerm
92+
}
93+
}
94+
95+
return infoCopy, nil
96+
}
97+
98+
// UpdateNodeLabels finds an object by pvcUID and replaces its NodeLabels.
99+
func (s *InMemoryStore) UpdateNodeLabels(pvcUID types.UID, newLabels map[string]string) {
100+
s.mutex.Lock()
101+
defer s.mutex.Unlock()
102+
info, found := s.data[pvcUID]
103+
if !found {
104+
s.data[pvcUID] = &TopologyInfo{NodeLabels: newLabels}
105+
} else {
106+
info.NodeLabels = newLabels
107+
}
108+
}
109+
110+
// UpdateTopologyKeys finds an object by pvcUID and replaces its TopologyKeys.
111+
func (s *InMemoryStore) UpdateTopologyKeys(pvcUID types.UID, newKeys []string) {
112+
s.mutex.Lock()
113+
defer s.mutex.Unlock()
114+
info, found := s.data[pvcUID]
115+
if !found {
116+
s.data[pvcUID] = &TopologyInfo{TopologyKeys: newKeys}
117+
} else {
118+
info.TopologyKeys = newKeys
119+
}
120+
}
121+
122+
// UpdateRequisiteTerms finds an object by pvcUID and replaces its RequisiteTerms.
123+
func (s *InMemoryStore) UpdateRequisiteTerms(pvcUID types.UID, requisiteTerms []topologyTerm) {
124+
s.mutex.Lock()
125+
defer s.mutex.Unlock()
126+
info, found := s.data[pvcUID]
127+
if !found {
128+
s.data[pvcUID] = &TopologyInfo{RequisiteTerms: requisiteTerms}
129+
} else {
130+
info.RequisiteTerms = requisiteTerms
131+
}
132+
}

pkg/controller/cache_test.go

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
package controller
2+
3+
import (
4+
"fmt"
5+
"reflect"
6+
"sync"
7+
"testing"
8+
9+
"k8s.io/apimachinery/pkg/types"
10+
)
11+
12+
// TestNewInMemoryStore tests the NewInMemoryStore function.
13+
func TestNewInMemoryStore(t *testing.T) {
14+
store := NewInMemoryStore()
15+
if store == nil {
16+
t.Error("Expected a new store, but got nil")
17+
}
18+
if store.data == nil {
19+
t.Error("Expected store data to be initialized, but it was nil")
20+
}
21+
}
22+
23+
// TestAddAndGet tests the Add and GetByPvcUID methods.
24+
func TestAddAndGetInInMemoryStore(t *testing.T) {
25+
store := NewInMemoryStore()
26+
pvcUID := types.UID("a9e2d5a3-1c64-4787-97b7-1a22d5a0b123")
27+
labels := map[string]string{"foo": "bar"}
28+
store.UpdateNodeLabels(pvcUID, labels)
29+
30+
retrieved, err := store.GetByPvcUID(pvcUID)
31+
if err != nil {
32+
t.Fatalf("unexpected error: %v", err)
33+
}
34+
35+
expectedInfo := &TopologyInfo{NodeLabels: labels}
36+
if !reflect.DeepEqual(expectedInfo, retrieved) {
37+
t.Errorf("Expected %+v, got %+v", expectedInfo, retrieved)
38+
}
39+
40+
_, err = store.GetByPvcUID("nonexistent")
41+
if err == nil {
42+
t.Error("Expected an error for a nonexistent entry, but got nil")
43+
}
44+
}
45+
46+
// TestDelete tests the Delete method.
47+
func TestDeleteInInMemoryStore(t *testing.T) {
48+
store := NewInMemoryStore()
49+
pvcUID := types.UID("a9e2d5a3-1c64-4787-97b7-1a22d5a0b123")
50+
labels := map[string]string{"foo": "bar"}
51+
store.UpdateNodeLabels(pvcUID, labels)
52+
53+
err := store.Delete(pvcUID)
54+
if err != nil {
55+
t.Fatalf("unexpected error: %v", err)
56+
}
57+
58+
_, err = store.GetByPvcUID(pvcUID)
59+
if err == nil {
60+
t.Error("Expected an error after deleting the entry, but got nil")
61+
}
62+
63+
err = store.Delete("nonexistent")
64+
if err != nil {
65+
t.Errorf("Did not expect an error for deleting a nonexistent entry, but got: %v", err)
66+
}
67+
}
68+
69+
// TestUpdate tests the update methods.
70+
func TestUpdateInInMemoryStore(t *testing.T) {
71+
store := NewInMemoryStore()
72+
pvcUID := types.UID("a9e2d5a3-1c64-4787-97b7-1a22d5a0b123")
73+
74+
// Test updating a nonexistent entry
75+
store.UpdateNodeLabels(pvcUID, map[string]string{"foo": "bar"})
76+
retrieved, _ := store.GetByPvcUID(pvcUID)
77+
if retrieved.NodeLabels["foo"] != "bar" {
78+
t.Errorf("Expected NodeLabels to be updated")
79+
}
80+
81+
// Test updating an existing entry
82+
store.UpdateNodeLabels(pvcUID, map[string]string{"foo": "baz"})
83+
retrieved, _ = store.GetByPvcUID(pvcUID)
84+
if retrieved.NodeLabels["foo"] != "baz" {
85+
t.Errorf("Expected NodeLabels to be updated")
86+
}
87+
88+
store.UpdateTopologyKeys(pvcUID, []string{"key1"})
89+
retrieved, _ = store.GetByPvcUID(pvcUID)
90+
if retrieved.TopologyKeys[0] != "key1" {
91+
t.Errorf("Expected TopologyKeys to be updated")
92+
}
93+
}
94+
95+
// TestUpdateTermsInInMemoryStore tests the UpdateRequisiteTerms methods.
96+
func TestUpdateTermsInInMemoryStore(t *testing.T) {
97+
store := NewInMemoryStore()
98+
pvcUID := types.UID("a9e2d5a3-1c64-4787-97b7-1a22d5a0b123")
99+
100+
// Define some topology terms for testing
101+
term1 := topologyTerm{
102+
{Key: "zone", Value: "zone1"},
103+
{Key: "rack", Value: "rack1"},
104+
}
105+
term2 := topologyTerm{
106+
{Key: "zone", Value: "zone2"},
107+
{Key: "rack", Value: "rack2"},
108+
}
109+
110+
// Test updating requisite terms for a nonexistent entry
111+
store.UpdateRequisiteTerms(pvcUID, []topologyTerm{term1})
112+
retrieved, _ := store.GetByPvcUID(pvcUID)
113+
if !reflect.DeepEqual(retrieved.RequisiteTerms, []topologyTerm{term1}) {
114+
t.Errorf("Expected RequisiteTerms to be updated")
115+
}
116+
117+
// Test updating requisite terms for an existing entry
118+
store.UpdateRequisiteTerms(pvcUID, []topologyTerm{term2})
119+
retrieved, _ = store.GetByPvcUID(pvcUID)
120+
if !reflect.DeepEqual(retrieved.RequisiteTerms, []topologyTerm{term2}) {
121+
t.Errorf("Expected RequisiteTerms to be updated")
122+
}
123+
}
124+
125+
// TestConcurrentAccess tests thread-safety of the InMemoryStore.
126+
func TestConcurrentAccessInInMemoryStore(t *testing.T) {
127+
store := NewInMemoryStore()
128+
var wg sync.WaitGroup
129+
130+
// Number of concurrent goroutines
131+
concurrency := 100
132+
133+
wg.Add(concurrency)
134+
for i := 0; i < concurrency; i++ {
135+
go func(i int) {
136+
defer wg.Done()
137+
pvcUID := types.UID(fmt.Sprintf("item-%d", i))
138+
info := &TopologyInfo{TopologyKeys: []string{fmt.Sprintf("key-%d", i)}}
139+
140+
store.UpdateTopologyKeys(pvcUID, info.TopologyKeys)
141+
142+
retrieved, err := store.GetByPvcUID(pvcUID)
143+
if err != nil {
144+
t.Errorf("goroutine %d: unexpected error getting item: %v", i, err)
145+
}
146+
if !reflect.DeepEqual(retrieved.TopologyKeys, info.TopologyKeys) {
147+
t.Errorf("goroutine %d: retrieved wrong data", i)
148+
}
149+
150+
store.UpdateTopologyKeys(pvcUID, []string{fmt.Sprintf("new-key-%d", i)})
151+
retrieved, err = store.GetByPvcUID(pvcUID)
152+
if err != nil {
153+
t.Errorf("goroutine %d: unexpected error getting item: %v", i, err)
154+
}
155+
if !reflect.DeepEqual(retrieved.TopologyKeys, []string{fmt.Sprintf("new-key-%d", i)}) {
156+
t.Errorf("goroutine %d: retrieved wrong data after update", i)
157+
}
158+
159+
err = store.Delete(pvcUID)
160+
if err != nil {
161+
t.Errorf("goroutine %d: unexpected error deleting item: %v", i, err)
162+
}
163+
}(i)
164+
}
165+
166+
wg.Wait()
167+
}

pkg/controller/clone_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121
"k8s.io/client-go/tools/cache"
2222
"k8s.io/client-go/util/workqueue"
2323
"k8s.io/klog/v2"
24-
"sigs.k8s.io/sig-storage-lib-external-provisioner/v12/controller"
24+
"sigs.k8s.io/sig-storage-lib-external-provisioner/v13/controller"
2525
)
2626

2727
//

0 commit comments

Comments
 (0)