Skip to content

Commit bcb5fa6

Browse files
committed
Merge branch 'master' into use-central-flags
2 parents dba5a39 + a0ee591 commit bcb5fa6

File tree

22 files changed

+975
-171
lines changed

22 files changed

+975
-171
lines changed

cmd/csi-provisioner/csi-provisioner.go

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ import (
6060
_ "k8s.io/component-base/metrics/prometheus/workqueue" // register work queues in the default legacy registry
6161
csitrans "k8s.io/csi-translation-lib"
6262
"k8s.io/klog/v2"
63-
"sigs.k8s.io/sig-storage-lib-external-provisioner/v12/controller"
64-
libmetrics "sigs.k8s.io/sig-storage-lib-external-provisioner/v12/controller/metrics"
63+
"sigs.k8s.io/sig-storage-lib-external-provisioner/v13/controller"
64+
libmetrics "sigs.k8s.io/sig-storage-lib-external-provisioner/v13/controller/metrics"
6565

6666
"github.com/kubernetes-csi/csi-lib-utils/leaderelection"
6767
"github.com/kubernetes-csi/csi-lib-utils/metrics"
@@ -78,8 +78,11 @@ import (
7878
)
7979

8080
var (
81+
master = flag.String("master", "", "Master URL to build a client config from. Either this or kubeconfig needs to be set if the provisioner is being run out of cluster.")
8182
volumeNamePrefix = flag.String("volume-name-prefix", "pvc", "Prefix to apply to the name of a created volume.")
8283
volumeNameUUIDLength = flag.Int("volume-name-uuid-length", -1, "Truncates generated UUID of a created volume to this length. Defaults behavior is to NOT truncate.")
84+
retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed provisioning or deletion. It doubles with each failure, up to retry-interval-max.")
85+
retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed provisioning or deletion.")
8386
workerThreads = flag.Uint("worker-threads", 100, "Number of provisioner worker threads, in other words nr. of simultaneous CSI calls.")
8487
finalizerThreads = flag.Uint("cloning-protection-threads", 1, "Number of simultaneously running threads, handling cloning finalizer removal")
8588
capacityThreads = flag.Uint("capacity-threads", 1, "Number of simultaneously running threads, handling CSIStorageCapacity objects")
@@ -92,6 +95,9 @@ var (
9295

9396
defaultFSType = flag.String("default-fstype", "", "The default filesystem type of the volume to provision when fstype is unspecified in the StorageClass. If the default is not set and fstype is unset in the StorageClass, then no fstype will be set")
9497

98+
kubeAPICapacityQPS = flag.Float32("kube-api-capacity-qps", 1, "QPS to use for storage capacity updates while communicating with the kubernetes apiserver. Defaults to 1.0.")
99+
kubeAPICapacityBurst = flag.Int("kube-api-capacity-burst", 5, "Burst to use for storage capacity updates while communicating with the kubernetes apiserver. Defaults to 5.")
100+
95101
enableCapacity = flag.Bool("enable-capacity", false, "This enables producing CSIStorageCapacity objects with capacity information from the driver's GetCapacity call.")
96102
capacityImmediateBinding = flag.Bool("capacity-for-immediate-binding", false, "Enables producing capacity information for storage classes with immediate binding. Not needed for the Kubernetes scheduler, maybe useful for other consumers or for debugging.")
97103
capacityPollInterval = flag.Duration("capacity-poll-interval", time.Minute, "How long the external-provisioner waits before checking for storage capacity changes.")
@@ -165,9 +171,9 @@ func main() {
165171
standardflags.Configuration.KubeConfig = kubeconfigEnv
166172
}
167173

168-
if standardflags.Configuration.Master != "" || standardflags.Configuration.KubeConfig != "" {
174+
if *master != "" || standardflags.Configuration.KubeConfig != "" {
169175
klog.Infof("Either master or kubeconfig specified. building kube config from that..")
170-
config, err = clientcmd.BuildConfigFromFlags(standardflags.Configuration.Master, standardflags.Configuration.KubeConfig)
176+
config, err = clientcmd.BuildConfigFromFlags(*master, standardflags.Configuration.KubeConfig)
171177
} else {
172178
klog.Infof("Building kube configs for running in cluster...")
173179
config, err = rest.InClusterConfig()
@@ -373,8 +379,8 @@ func main() {
373379

374380
// -------------------------------
375381
// PersistentVolumeClaims informer
376-
genericRateLimiter := workqueue.NewTypedItemExponentialFailureRateLimiter[string](standardflags.Configuration.RetryIntervalStart, standardflags.Configuration.RetryIntervalMax)
377-
claimRateLimiter := workqueue.NewTypedItemExponentialFailureRateLimiter[string](standardflags.Configuration.RetryIntervalStart, standardflags.Configuration.RetryIntervalMax)
382+
genericRateLimiter := workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax)
383+
claimRateLimiter := workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax)
378384
claimQueue := workqueue.NewTypedRateLimitingQueueWithConfig(claimRateLimiter, workqueue.TypedRateLimitingQueueConfig[string]{Name: "claims"})
379385
claimInformer := factory.Core().V1().PersistentVolumeClaims().Informer()
380386

@@ -387,8 +393,7 @@ func main() {
387393
controller.Threadiness(int(*workerThreads)),
388394
controller.CreateProvisionedPVLimiter(workqueue.DefaultTypedControllerRateLimiter[string]()),
389395
controller.ClaimsInformer(claimInformer),
390-
controller.NodesLister(nodeLister),
391-
controller.RetryIntervalMax(standardflags.Configuration.RetryIntervalMax),
396+
controller.RetryIntervalMax(*retryIntervalMax),
392397
}
393398

394399
if utilfeature.DefaultFeatureGate.Enabled(features.HonorPVReclaimPolicy) {
@@ -398,6 +403,10 @@ func main() {
398403
if supportsMigrationFromInTreePluginName != "" {
399404
provisionerOptions = append(provisionerOptions, controller.AdditionalProvisionerNames([]string{supportsMigrationFromInTreePluginName}))
400405
}
406+
var pvcNodeStore *ctrl.InMemoryStore
407+
if ctrl.SupportsTopology(pluginCapabilities) {
408+
pvcNodeStore = ctrl.NewInMemoryStore()
409+
}
401410

402411
// Create the provisioner: it implements the Provisioner interface expected by
403412
// the controller
@@ -427,12 +436,15 @@ func main() {
427436
nodeDeployment,
428437
*controllerPublishReadOnly,
429438
*preventVolumeModeConversion,
439+
pvcNodeStore,
430440
)
431441

432442
var capacityController *capacity.Controller
433443
if *enableCapacity {
434444
// Publishing storage capacity information uses its own client
435445
// with separate rate limiting.
446+
config.QPS = *kubeAPICapacityQPS
447+
config.Burst = *kubeAPICapacityBurst
436448
clientset, err := kubernetes.NewForConfig(config)
437449
if err != nil {
438450
klog.Fatalf("Failed to create client: %v", err)
@@ -464,7 +476,7 @@ func main() {
464476

465477
var topologyInformer topology.Informer
466478
if nodeDeployment == nil {
467-
topologyRateLimiter := workqueue.NewTypedItemExponentialFailureRateLimiter[string](standardflags.Configuration.RetryIntervalStart, standardflags.Configuration.RetryIntervalMax)
479+
topologyRateLimiter := workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax)
468480
topologyInformer = topology.NewNodeTopology(
469481
provisionerName,
470482
clientset,
@@ -530,7 +542,7 @@ func main() {
530542
klog.Fatalf("unexpected error when checking for the V1 CSIStorageCapacity API: %v", err)
531543
}
532544

533-
capacityRateLimiter := workqueue.NewTypedItemExponentialFailureRateLimiter[capacity.QueueKey](standardflags.Configuration.RetryIntervalStart, standardflags.Configuration.RetryIntervalMax)
545+
capacityRateLimiter := workqueue.NewTypedItemExponentialFailureRateLimiter[capacity.QueueKey](*retryIntervalStart, *retryIntervalMax)
534546
capacityController = capacity.NewCentralCapacityController(
535547
csi.NewControllerClient(grpcClient),
536548
provisionerName,

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ require (
3333
github.com/onsi/ginkgo/v2 v2.23.4
3434
github.com/onsi/gomega v1.37.0
3535
k8s.io/kubernetes v1.34.0
36-
sigs.k8s.io/sig-storage-lib-external-provisioner/v12 v12.0.1
36+
sigs.k8s.io/sig-storage-lib-external-provisioner/v13 v13.0.0
3737
)
3838

3939
require (

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -374,8 +374,8 @@ sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 h1:IpInykpT6ceI+QxKBbEflcR5E
374374
sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730/go.mod h1:mdzfpAEoE6DHQEN0uh9ZbOCuHbLK5wOm7dK4ctXE9Tg=
375375
sigs.k8s.io/randfill v1.0.0 h1:JfjMILfT8A6RbawdsK2JXGBR5AQVfd+9TbzrlneTyrU=
376376
sigs.k8s.io/randfill v1.0.0/go.mod h1:XeLlZ/jmk4i1HRopwe7/aU3H5n1zNUcX6TM94b3QxOY=
377-
sigs.k8s.io/sig-storage-lib-external-provisioner/v12 v12.0.1 h1:rQcMFPcZ12y82+BA7b29gCWyoI/+/gAQUZu/Cw+8bC0=
378-
sigs.k8s.io/sig-storage-lib-external-provisioner/v12 v12.0.1/go.mod h1:kPy4hBso6PNhP9PdlTDdBZqxP1RKg7DFFM7lIR1bA8k=
377+
sigs.k8s.io/sig-storage-lib-external-provisioner/v13 v13.0.0 h1:bqSqBfqtToTDMDz+FEzfqofXAp5ptt6Z7ShR0g05PGA=
378+
sigs.k8s.io/sig-storage-lib-external-provisioner/v13 v13.0.0/go.mod h1:1xSe5kgJcKbrtNdD5WoytKUoByAGDl3wVHlKP0RZIC8=
379379
sigs.k8s.io/structured-merge-diff/v6 v6.3.0 h1:jTijUJbW353oVOd9oTlifJqOGEkUw2jB/fXCbTiQEco=
380380
sigs.k8s.io/structured-merge-diff/v6 v6.3.0/go.mod h1:M3W8sfWvn2HhQDIbGWj3S099YozAsymCo/wrT5ohRUE=
381381
sigs.k8s.io/yaml v1.6.0 h1:G8fkbMSAFqgEFgh4b1wmtzDnioxFCUgTZhlbj5P9QYs=

pkg/capacity/provision.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import (
2020
"context"
2121

2222
v1 "k8s.io/api/core/v1"
23-
"sigs.k8s.io/sig-storage-lib-external-provisioner/v12/controller"
23+
"sigs.k8s.io/sig-storage-lib-external-provisioner/v13/controller"
2424
)
2525

2626
type provisionWrapper struct {

pkg/controller/cache.go

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package controller
2+
3+
import (
4+
"fmt"
5+
"sync"
6+
7+
"k8s.io/apimachinery/pkg/types"
8+
)
9+
10+
// TopologyInfo holds the data for NodeLabels and TopologyKeys
11+
type TopologyInfo struct {
12+
NodeLabels map[string]string
13+
TopologyKeys []string
14+
RequisiteTerms []topologyTerm
15+
}
16+
17+
// TopologyProvider is an interface that defines the behavior for looking up
18+
// a TopologyInfo object by its pvc UID.
19+
type TopologyProvider interface {
20+
GetByPvcUID(pvcUID types.UID) (*TopologyInfo, error)
21+
// The entry is deleted when provision succeeds or returns a final error.
22+
Delete(pvcUID types.UID) error
23+
24+
// Update methods now perform an "upsert" and don't return errors.
25+
UpdateNodeLabels(pvcUID types.UID, newLabels map[string]string)
26+
UpdateTopologyKeys(pvcUID types.UID, newKeys []string)
27+
UpdateRequisiteTerms(pvcUID types.UID, requisiteTerms []topologyTerm)
28+
}
29+
30+
// InMemoryStore is a concrete implementation of TopologyProvider.
31+
// It uses an in-memory map for quick lookups.
32+
type InMemoryStore struct {
33+
// The map key is the object's name.
34+
data map[types.UID]*TopologyInfo
35+
// Adding a mutex for thread-safe access
36+
mutex sync.RWMutex
37+
}
38+
39+
// NewInMemoryStore creates and initializes a new store.
40+
func NewInMemoryStore() *InMemoryStore {
41+
return &InMemoryStore{
42+
data: make(map[types.UID]*TopologyInfo),
43+
}
44+
}
45+
46+
// Delete implements the TopologyProvider interface.
47+
// It uses the built-in delete() function to remove the item from the map.
48+
func (s *InMemoryStore) Delete(pvcUID types.UID) error {
49+
s.mutex.Lock()
50+
defer s.mutex.Unlock()
51+
// First, check if the key exists to provide a helpful error.
52+
_, found := s.data[pvcUID]
53+
if !found {
54+
return nil
55+
}
56+
delete(s.data, pvcUID)
57+
return nil
58+
}
59+
60+
// GetByPvcUID implements the TopologyProvider interface.
61+
func (s *InMemoryStore) GetByPvcUID(pvcUID types.UID) (*TopologyInfo, error) {
62+
if s == nil {
63+
return nil, fmt.Errorf("pvcNodeStore is nil")
64+
}
65+
s.mutex.RLock()
66+
defer s.mutex.RUnlock()
67+
info, found := s.data[pvcUID]
68+
if !found {
69+
return nil, fmt.Errorf("topology object with pvcUID '%s' not found", pvcUID)
70+
}
71+
72+
// Return a deep copy to prevent data races
73+
infoCopy := &TopologyInfo{}
74+
if info.NodeLabels != nil {
75+
infoCopy.NodeLabels = make(map[string]string)
76+
for k, v := range info.NodeLabels {
77+
infoCopy.NodeLabels[k] = v
78+
}
79+
}
80+
81+
if info.TopologyKeys != nil {
82+
infoCopy.TopologyKeys = make([]string, len(info.TopologyKeys))
83+
copy(infoCopy.TopologyKeys, info.TopologyKeys)
84+
}
85+
86+
if info.RequisiteTerms != nil {
87+
infoCopy.RequisiteTerms = make([]topologyTerm, len(info.RequisiteTerms))
88+
for i, term := range info.RequisiteTerms {
89+
newTerm := make(topologyTerm, len(term))
90+
copy(newTerm, term)
91+
infoCopy.RequisiteTerms[i] = newTerm
92+
}
93+
}
94+
95+
return infoCopy, nil
96+
}
97+
98+
// UpdateNodeLabels finds an object by pvcUID and replaces its NodeLabels.
99+
func (s *InMemoryStore) UpdateNodeLabels(pvcUID types.UID, newLabels map[string]string) {
100+
s.mutex.Lock()
101+
defer s.mutex.Unlock()
102+
info, found := s.data[pvcUID]
103+
if !found {
104+
s.data[pvcUID] = &TopologyInfo{NodeLabels: newLabels}
105+
} else {
106+
info.NodeLabels = newLabels
107+
}
108+
}
109+
110+
// UpdateTopologyKeys finds an object by pvcUID and replaces its TopologyKeys.
111+
func (s *InMemoryStore) UpdateTopologyKeys(pvcUID types.UID, newKeys []string) {
112+
s.mutex.Lock()
113+
defer s.mutex.Unlock()
114+
info, found := s.data[pvcUID]
115+
if !found {
116+
s.data[pvcUID] = &TopologyInfo{TopologyKeys: newKeys}
117+
} else {
118+
info.TopologyKeys = newKeys
119+
}
120+
}
121+
122+
// UpdateRequisiteTerms finds an object by pvcUID and replaces its RequisiteTerms.
123+
func (s *InMemoryStore) UpdateRequisiteTerms(pvcUID types.UID, requisiteTerms []topologyTerm) {
124+
s.mutex.Lock()
125+
defer s.mutex.Unlock()
126+
info, found := s.data[pvcUID]
127+
if !found {
128+
s.data[pvcUID] = &TopologyInfo{RequisiteTerms: requisiteTerms}
129+
} else {
130+
info.RequisiteTerms = requisiteTerms
131+
}
132+
}

0 commit comments

Comments
 (0)