@@ -24,6 +24,7 @@ package storage
2424
2525import (
2626 "context"
27+ "crypto/sha1"
2728 "encoding/json"
2829 "fmt"
2930 "math/rand"
@@ -32,6 +33,7 @@ import (
3233 "sort"
3334 "strconv"
3435 "strings"
36+ "time"
3537
3638 "k8s.io/apimachinery/pkg/api/resource"
3739
@@ -41,6 +43,8 @@ import (
4143
4244 api "github.com/arangodb/kube-arangodb/pkg/apis/storage/v1alpha"
4345 "github.com/arangodb/kube-arangodb/pkg/storage/provisioner"
46+ "github.com/arangodb/kube-arangodb/pkg/util/constants"
47+ "github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
4448)
4549
4650const (
@@ -72,7 +76,24 @@ func (ls *LocalStorage) createPVs(ctx context.Context, apiObject *api.ArangoLoca
7276 clients [i ], clients [j ] = clients [j ], clients [i ]
7377 })
7478
79+ var nodeClientMap map [string ]provisioner.API
7580 for i , claim := range unboundClaims {
81+ // Find deployment name & role in the claim (if any)
82+ deplName , role , enforceAniAffinity := getDeploymentInfo (claim )
83+ allowedClients := clients
84+ if enforceAniAffinity && deplName != "" {
85+ // Select nodes to choose from such that no volume in group lands on the same node
86+ if nodeClientMap == nil {
87+ nodeClientMap = createNodeClientMap (ctx , clients )
88+ }
89+ var err error
90+ allowedClients , err = ls .filterAllowedNodes (nodeClientMap , deplName , role )
91+ if err != nil {
92+ log .Warn ().Err (err ).Msg ("Failed to filter allowed nodes" )
93+ continue // We'll try this claim again later
94+ }
95+ }
96+
7697 // Find size of PVC
7798 volSize := defaultVolumeSize
7899 if reqStorage := claim .Spec .Resources .Requests .StorageEphemeral (); reqStorage != nil {
@@ -81,7 +102,7 @@ func (ls *LocalStorage) createPVs(ctx context.Context, apiObject *api.ArangoLoca
81102 }
82103 }
83104 // Create PV
84- if err := ls .createPV (ctx , apiObject , clients , i , volSize ); err != nil {
105+ if err := ls .createPV (ctx , apiObject , allowedClients , i , volSize , claim , deplName , role ); err != nil {
85106 log .Error ().Err (err ).Msg ("Failed to create PersistentVolume" )
86107 }
87108 }
@@ -90,7 +111,7 @@ func (ls *LocalStorage) createPVs(ctx context.Context, apiObject *api.ArangoLoca
90111}
91112
92113// createPV creates a PersistentVolume.
93- func (ls * LocalStorage ) createPV (ctx context.Context , apiObject * api.ArangoLocalStorage , clients []provisioner.API , clientsOffset int , volSize int64 ) error {
114+ func (ls * LocalStorage ) createPV (ctx context.Context , apiObject * api.ArangoLocalStorage , clients []provisioner.API , clientsOffset int , volSize int64 , claim v1. PersistentVolumeClaim , deploymentName , role string ) error {
94115 log := ls .deps .Log
95116 // Try clients
96117 for clientIdx := 0 ; clientIdx < len (clients ); clientIdx ++ {
@@ -117,7 +138,7 @@ func (ls *LocalStorage) createPV(ctx context.Context, apiObject *api.ArangoLocal
117138 continue
118139 }
119140 // Create a volume
120- pvName := apiObject .GetName () + "-" + name
141+ pvName := strings . ToLower ( apiObject .GetName () + "-" + shortHash ( info . NodeName ) + "-" + name )
121142 volumeMode := v1 .PersistentVolumeFilesystem
122143 nodeAff , err := createNodeAffinity (info .NodeName )
123144 if err != nil {
@@ -131,6 +152,10 @@ func (ls *LocalStorage) createPV(ctx context.Context, apiObject *api.ArangoLocal
131152 v1 .AlphaStorageNodeAffinityAnnotation : nodeAff ,
132153 nodeNameAnnotation : info .NodeName ,
133154 },
155+ Labels : map [string ]string {
156+ k8sutil .LabelKeyArangoDeployment : deploymentName ,
157+ k8sutil .LabelKeyRole : role ,
158+ },
134159 },
135160 Spec : v1.PersistentVolumeSpec {
136161 Capacity : v1.ResourceList {
@@ -147,6 +172,13 @@ func (ls *LocalStorage) createPV(ctx context.Context, apiObject *api.ArangoLocal
147172 },
148173 StorageClassName : apiObject .Spec .StorageClass .Name ,
149174 VolumeMode : & volumeMode ,
175+ ClaimRef : & v1.ObjectReference {
176+ Kind : "PersistentVolumeClaim" ,
177+ APIVersion : "" ,
178+ Name : claim .GetName (),
179+ Namespace : claim .GetNamespace (),
180+ UID : claim .GetUID (),
181+ },
150182 },
151183 }
152184 // Attach PV to ArangoLocalStorage
@@ -159,6 +191,16 @@ func (ls *LocalStorage) createPV(ctx context.Context, apiObject *api.ArangoLocal
159191 Str ("name" , pvName ).
160192 Str ("node-name" , info .NodeName ).
161193 Msg ("Created PersistentVolume" )
194+
195+ // Bind claim to volume
196+ if err := ls .bindClaimToVolume (claim , pv .GetName ()); err != nil {
197+ // Try to delete the PV now
198+ if err := ls .deps .KubeCli .CoreV1 ().PersistentVolumes ().Delete (pv .GetName (), & metav1.DeleteOptions {}); err != nil {
199+ log .Error ().Err (err ).Msg ("Failed to delete PV after binding PVC failed" )
200+ }
201+ return maskAny (err )
202+ }
203+
162204 return nil
163205 }
164206 }
@@ -204,3 +246,96 @@ func createNodeAffinity(nodeName string) (string, error) {
204246 }
205247 return string (encoded ), nil
206248}
249+
250+ // createNodeClientMap creates a map from node name to API.
251+ // Clients that do not respond properly on a GetNodeInfo request are
252+ // ignored.
253+ func createNodeClientMap (ctx context.Context , clients []provisioner.API ) map [string ]provisioner.API {
254+ result := make (map [string ]provisioner.API )
255+ for _ , c := range clients {
256+ if info , err := c .GetNodeInfo (ctx ); err == nil {
257+ result [info .NodeName ] = c
258+ }
259+ }
260+ return result
261+ }
262+
263+ // getDeploymentInfo returns the name of the deployment that created the given claim,
264+ // the role of the server that the claim is used for and the value for `enforceAntiAffinity`.
265+ // If not found, empty strings are returned.
266+ // Returns deploymentName, role, enforceAntiAffinity.
267+ func getDeploymentInfo (pvc v1.PersistentVolumeClaim ) (string , string , bool ) {
268+ deploymentName := pvc .GetLabels ()[k8sutil .LabelKeyArangoDeployment ]
269+ role := pvc .GetLabels ()[k8sutil .LabelKeyRole ]
270+ enforceAntiAffinity , _ := strconv .ParseBool (pvc .GetAnnotations ()[constants .AnnotationEnforceAntiAffinity ]) // If annotation empty, this will yield false.
271+ return deploymentName , role , enforceAntiAffinity
272+ }
273+
274+ // filterAllowedNodes returns those clients that do not yet have a volume for the given deployment name & role.
275+ func (ls * LocalStorage ) filterAllowedNodes (clients map [string ]provisioner.API , deploymentName , role string ) ([]provisioner.API , error ) {
276+ // Find all PVs for given deployment & role
277+ list , err := ls .deps .KubeCli .CoreV1 ().PersistentVolumes ().List (metav1.ListOptions {
278+ LabelSelector : fmt .Sprintf ("%s=%s,%s=%s" , k8sutil .LabelKeyArangoDeployment , deploymentName , k8sutil .LabelKeyRole , role ),
279+ })
280+ if err != nil {
281+ return nil , maskAny (err )
282+ }
283+ excludedNodes := make (map [string ]struct {})
284+ for _ , pv := range list .Items {
285+ nodeName := pv .GetAnnotations ()[nodeNameAnnotation ]
286+ excludedNodes [nodeName ] = struct {}{}
287+ }
288+ result := make ([]provisioner.API , 0 , len (clients ))
289+ for nodeName , c := range clients {
290+ if _ , found := excludedNodes [nodeName ]; ! found {
291+ result = append (result , c )
292+ }
293+ }
294+ return result , nil
295+ }
296+
297+ // bindClaimToVolume tries to bind the given claim to the volume with given name.
298+ // If the claim has been updated, the function retries several times.
299+ func (ls * LocalStorage ) bindClaimToVolume (claim v1.PersistentVolumeClaim , volumeName string ) error {
300+ log := ls .deps .Log .With ().Str ("pvc-name" , claim .GetName ()).Str ("volume-name" , volumeName ).Logger ()
301+ pvcs := ls .deps .KubeCli .CoreV1 ().PersistentVolumeClaims (claim .GetNamespace ())
302+
303+ for attempt := 0 ; attempt < 10 ; attempt ++ {
304+ // Backoff if needed
305+ time .Sleep (time .Millisecond * time .Duration (10 * attempt ))
306+
307+ // Fetch latest version of claim
308+ updated , err := pvcs .Get (claim .GetName (), metav1.GetOptions {})
309+ if k8sutil .IsNotFound (err ) {
310+ return maskAny (err )
311+ } else if err != nil {
312+ log .Warn ().Err (err ).Msg ("Failed to load updated PersistentVolumeClaim" )
313+ continue
314+ }
315+
316+ // Check claim. If already bound, bail out
317+ if ! pvcNeedsVolume (* updated ) {
318+ return maskAny (fmt .Errorf ("PersistentVolumeClaim '%s' no longer needs a volume" , claim .GetName ()))
319+ }
320+
321+ // Try to bind
322+ updated .Spec .VolumeName = volumeName
323+ if _ , err := pvcs .Update (updated ); k8sutil .IsConflict (err ) {
324+ // Claim modified already, retry
325+ log .Debug ().Err (err ).Msg ("PersistentVolumeClaim has been modified. Retrying." )
326+ } else if err != nil {
327+ log .Error ().Err (err ).Msg ("Failed to bind PVC to volume" )
328+ return maskAny (err )
329+ }
330+ log .Debug ().Msg ("Bound volume to PersistentVolumeClaim" )
331+ return nil
332+ }
333+ log .Error ().Msg ("All attempts to bind PVC to volume failed" )
334+ return maskAny (fmt .Errorf ("All attempts to bind PVC to volume failed" ))
335+ }
336+
337+ // shortHash creates a 6 letter hash of the given name.
338+ func shortHash (name string ) string {
339+ h := sha1 .Sum ([]byte (name ))
340+ return fmt .Sprintf ("%0x" , h )[:6 ]
341+ }
0 commit comments