Skip to content

Commit 3903543

Browse files
authored
[Feature] Unify timeouts and add additional parameters for kubernetes (#858)
1 parent c586d09 commit 3903543

File tree

77 files changed

+619
-333
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

77 files changed

+619
-333
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
- Add Recovery during PlanBuild operation
88
- Fix Exporter in Deployments without authentication
99
- Allow to disable ClusterScalingIntegration and add proper Scheduled label to pods
10+
- Add additional timeout parameters and kubernetes batch size
1011

1112
## [1.2.5](https://github.com/arangodb/kube-arangodb/tree/1.2.5) (2021-10-25)
1213
- Split & Unify Lifecycle management functionality

Makefile

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -367,10 +367,7 @@ run-unit-tests: $(SOURCES)
367367
$(REPOPATH)/pkg/apis/storage/... \
368368
$(REPOPATH)/pkg/deployment/... \
369369
$(REPOPATH)/pkg/storage \
370-
$(REPOPATH)/pkg/util/k8sutil \
371-
$(REPOPATH)/pkg/util/k8sutil/test \
372-
$(REPOPATH)/pkg/util/probe \
373-
$(REPOPATH)/pkg/util/validation \
370+
$(REPOPATH)/pkg/util/... \
374371
$(REPOPATH)/pkg/backup/...
375372

376373
# Release building

admin.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ import (
3333
"strconv"
3434
"syscall"
3535

36+
"github.com/arangodb/kube-arangodb/pkg/util/globals"
37+
3638
"github.com/pkg/errors"
3739
"github.com/spf13/cobra"
3840
v1 "k8s.io/api/core/v1"
@@ -288,7 +290,7 @@ func createClient(endpoints []string, certCA *x509.CertPool, auth connection.Aut
288290

289291
// getJWTTokenFromSecrets returns token from the secret.
290292
func getJWTTokenFromSecrets(ctx context.Context, secrets secret.ReadInterface, name string) (connection.Authentication, error) {
291-
ctxChild, cancel := context.WithTimeout(ctx, k8sutil.GetRequestTimeout())
293+
ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx)
292294
defer cancel()
293295

294296
token, err := k8sutil.GetTokenSecret(ctxChild, secrets, name)
@@ -306,7 +308,7 @@ func getJWTTokenFromSecrets(ctx context.Context, secrets secret.ReadInterface, n
306308

307309
// getCACertificate returns CA certificate from the secret.
308310
func getCACertificate(ctx context.Context, secrets secret.ReadInterface, name string) (*x509.CertPool, error) {
309-
ctxChild, cancel := context.WithTimeout(ctx, k8sutil.GetRequestTimeout())
311+
ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx)
310312
defer cancel()
311313

312314
s, err := secrets.Get(ctxChild, name, metav1.GetOptions{})
@@ -331,7 +333,7 @@ func getDeployment(ctx context.Context, namespace, deplName string) (v12.ArangoD
331333
return v12.ArangoDeployment{}, errors.WithMessage(err, "failed to create Arango extension client")
332334
}
333335

334-
ctxChild, cancel := context.WithTimeout(ctx, k8sutil.GetRequestTimeout())
336+
ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx)
335337
defer cancel()
336338

337339
deployments, err := extCli.DatabaseV1().ArangoDeployments(namespace).List(ctxChild, metav1.ListOptions{})

main.go

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,13 @@ import (
3434
"strings"
3535
"time"
3636

37+
"github.com/arangodb/kube-arangodb/pkg/util/globals"
38+
3739
operatorHTTP "github.com/arangodb/kube-arangodb/pkg/util/http"
3840
"github.com/gin-gonic/gin"
3941

4042
"github.com/arangodb/kube-arangodb/pkg/version"
4143

42-
"github.com/arangodb/kube-arangodb/pkg/util/arangod"
43-
4444
"github.com/arangodb/kube-arangodb/pkg/operator/scope"
4545

4646
"github.com/arangodb/kube-arangodb/pkg/deployment/features"
@@ -123,9 +123,13 @@ var (
123123
singleMode bool
124124
scope string
125125
}
126-
timeouts struct {
127-
k8s time.Duration
128-
arangoD time.Duration
126+
operatorKubernetesOptions struct {
127+
maxBatchSize int64
128+
}
129+
operatorTimeouts struct {
130+
k8s time.Duration
131+
arangoD time.Duration
132+
reconciliation time.Duration
129133
}
130134
chaosOptions struct {
131135
allowed bool
@@ -158,9 +162,11 @@ func init() {
158162
f.BoolVar(&chaosOptions.allowed, "chaos.allowed", false, "Set to allow chaos in deployments. Only activated when allowed and enabled in deployment")
159163
f.BoolVar(&operatorOptions.singleMode, "mode.single", false, "Enable single mode in Operator. WARNING: There should be only one replica of Operator, otherwise Operator can take unexpected actions")
160164
f.StringVar(&operatorOptions.scope, "scope", scope.DefaultScope.String(), "Define scope on which Operator works. Legacy - pre 1.1.0 scope with limited cluster access")
161-
f.DurationVar(&timeouts.k8s, "timeout.k8s", time.Second*3, "The request timeout to the kubernetes")
162-
f.DurationVar(&timeouts.arangoD, "timeout.arangod", time.Second*10, "The request timeout to the ArangoDB")
163-
f.BoolVar(&operatorOptions.scalingIntegrationEnabled, "scaling-integration", false, "Enable Scaling Integration")
165+
f.DurationVar(&operatorTimeouts.k8s, "timeout.k8s", globals.DefaultKubernetesTimeout, "The request timeout to the kubernetes")
166+
f.DurationVar(&operatorTimeouts.arangoD, "timeout.arangod", globals.DefaultArangoDTimeout, "The request timeout to the ArangoDB")
167+
f.DurationVar(&operatorTimeouts.reconciliation, "timeout.reconciliation", globals.DefaultReconciliationTimeout, "The reconciliation timeout to the ArangoDB CR")
168+
f.BoolVar(&operatorOptions.scalingIntegrationEnabled, "internal.scaling-integration", false, "Enable Scaling Integration")
169+
f.Int64Var(&operatorKubernetesOptions.maxBatchSize, "kubernetes.max-batch-size", globals.DefaultKubernetesRequestBatchSize, "Size of batch during objects read")
164170
features.Init(&cmdMain)
165171
}
166172

@@ -185,8 +191,11 @@ func cmdMainRun(cmd *cobra.Command, args []string) {
185191
ip := os.Getenv(constants.EnvOperatorPodIP)
186192

187193
deploymentApi.DefaultImage = operatorOptions.arangoImage
188-
k8sutil.SetRequestTimeout(timeouts.k8s)
189-
arangod.SetRequestTimeout(timeouts.arangoD)
194+
195+
globals.GetGlobalTimeouts().Kubernetes().Set(operatorTimeouts.k8s)
196+
globals.GetGlobalTimeouts().ArangoD().Set(operatorTimeouts.arangoD)
197+
globals.GetGlobalTimeouts().Reconciliation().Set(operatorTimeouts.reconciliation)
198+
globals.GetGlobals().Kubernetes().RequestBatchSize().Set(operatorKubernetesOptions.maxBatchSize)
190199

191200
// Prepare log service
192201
var err error

pkg/backup/handlers/arango/backup/arango_client_impl.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ import (
2929
"fmt"
3030
"time"
3131

32+
"github.com/arangodb/kube-arangodb/pkg/util/globals"
33+
3234
"github.com/arangodb/kube-arangodb/pkg/util/errors"
3335

3436
"github.com/arangodb/go-driver"
@@ -130,7 +132,7 @@ func (ac *arangoClientBackupImpl) Get(backupID driver.BackupID) (driver.BackupMe
130132
}
131133

132134
func (ac *arangoClientBackupImpl) getCredentialsFromSecret(ctx context.Context, secretName string) (interface{}, error) {
133-
ctxChild, cancel := context.WithTimeout(ctx, k8sutil.GetRequestTimeout())
135+
ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx)
134136
defer cancel()
135137
token, err := k8sutil.GetTokenSecret(ctxChild, ac.kubecli.CoreV1().Secrets(ac.backup.Namespace), secretName)
136138
if err != nil {

pkg/deployment/access_package.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ import (
2828
"strings"
2929
"time"
3030

31+
"github.com/arangodb/kube-arangodb/pkg/util/globals"
32+
3133
"github.com/arangodb/kube-arangodb/pkg/util/errors"
3234

3335
certificates "github.com/arangodb-helper/go-certificates"
@@ -67,7 +69,7 @@ func (d *Deployment) createAccessPackages(ctx context.Context) error {
6769
}
6870

6971
// Remove all access packages that we did build, but are no longer needed
70-
ctxChild, cancel := context.WithTimeout(ctx, k8sutil.GetRequestTimeout())
72+
ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx)
7173
defer cancel()
7274
secretList, err := secrets.List(ctxChild, metav1.ListOptions{})
7375
if err != nil {
@@ -80,7 +82,7 @@ func (d *Deployment) createAccessPackages(ctx context.Context) error {
8082
// Secret is an access package
8183
if _, wanted := apNameMap[secret.GetName()]; !wanted {
8284
// We found an obsolete access package secret. Remove it.
83-
err = k8sutil.RunWithTimeout(ctx, func(ctxChild context.Context) error {
85+
err = globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
8486
return secrets.Delete(ctxChild, secret.GetName(), metav1.DeleteOptions{
8587
Preconditions: &metav1.Preconditions{UID: &secret.UID},
8688
})
@@ -110,7 +112,7 @@ func (d *Deployment) ensureAccessPackage(ctx context.Context, apSecretName strin
110112
secrets := d.deps.KubeCli.CoreV1().Secrets(ns)
111113
spec := d.apiObject.Spec
112114

113-
err := k8sutil.RunWithTimeout(ctx, func(ctxChild context.Context) error {
115+
err := globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
114116
_, err := secrets.Get(ctxChild, apSecretName, metav1.GetOptions{})
115117
return err
116118
})
@@ -124,7 +126,7 @@ func (d *Deployment) ensureAccessPackage(ctx context.Context, apSecretName strin
124126

125127
// Fetch client authentication CA
126128
clientAuthSecretName := spec.Sync.Authentication.GetClientCASecretName()
127-
ctxChild, cancel := context.WithTimeout(ctx, k8sutil.GetRequestTimeout())
129+
ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx)
128130
defer cancel()
129131
clientAuthCert, clientAuthKey, _, err := k8sutil.GetCASecret(ctxChild, secrets, clientAuthSecretName, nil)
130132
if err != nil {
@@ -220,7 +222,7 @@ func (d *Deployment) ensureAccessPackage(ctx context.Context, apSecretName strin
220222
}
221223
// Attach secret to owner
222224
secret.SetOwnerReferences(append(secret.GetOwnerReferences(), d.apiObject.AsOwner()))
223-
err = k8sutil.RunWithTimeout(ctx, func(ctxChild context.Context) error {
225+
err = globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
224226
_, err := secrets.Create(ctxChild, secret, metav1.CreateOptions{})
225227
return err
226228
})

pkg/deployment/cleanup.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ package deployment
2626
import (
2727
"context"
2828

29+
"github.com/arangodb/kube-arangodb/pkg/util/globals"
30+
2931
core "k8s.io/api/core/v1"
3032
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
3133

@@ -45,7 +47,7 @@ func (d *Deployment) removePodFinalizers(ctx context.Context, cachedStatus inspe
4547
return err
4648
}
4749

48-
ctxChild, cancel := context.WithTimeout(ctx, k8sutil.GetRequestTimeout())
50+
ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx)
4951
defer cancel()
5052

5153
if err := d.PodsModInterface().Delete(ctxChild, pod.GetName(), meta.DeleteOptions{

pkg/deployment/cluster_scaling_integration.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ import (
2828
"sync"
2929
"time"
3030

31+
"github.com/arangodb/kube-arangodb/pkg/util/globals"
32+
3133
"github.com/arangodb/kube-arangodb/pkg/util/errors"
3234

3335
"github.com/rs/zerolog"
@@ -152,14 +154,14 @@ func (ci *clusterScalingIntegration) ListenForClusterEvents(stopCh <-chan struct
152154
func (ci *clusterScalingIntegration) inspectCluster(ctx context.Context, expectSuccess bool) error {
153155
log := ci.log
154156

155-
ctxChild, cancel := context.WithTimeout(ctx, arangod.GetRequestTimeout())
157+
ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
156158
defer cancel()
157159
c, err := ci.depl.clientCache.GetDatabase(ctxChild)
158160
if err != nil {
159161
return errors.WithStack(err)
160162
}
161163

162-
ctxChild, cancel = context.WithTimeout(ctx, arangod.GetRequestTimeout())
164+
ctxChild, cancel = globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
163165
defer cancel()
164166
req, err := arangod.GetNumberOfServers(ctxChild, c.Connection())
165167
if err != nil {
@@ -204,7 +206,7 @@ func (ci *clusterScalingIntegration) inspectCluster(ctx context.Context, expectS
204206
}
205207
// Let's update the spec
206208
apiObject := ci.depl.apiObject
207-
ctxChild, cancel = context.WithTimeout(ctx, k8sutil.GetRequestTimeout())
209+
ctxChild, cancel = globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx)
208210
defer cancel()
209211
current, err := ci.depl.deps.DatabaseCRCli.DatabaseV1().ArangoDeployments(apiObject.Namespace).Get(ctxChild, apiObject.Name, metav1.GetOptions{})
210212
if err != nil {

pkg/deployment/context_impl.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ import (
3131
"strconv"
3232
"time"
3333

34+
"github.com/arangodb/kube-arangodb/pkg/util/globals"
35+
3436
"github.com/arangodb/kube-arangodb/pkg/deployment/patch"
3537
"k8s.io/apimachinery/pkg/types"
3638

@@ -86,7 +88,7 @@ var _ resources.Context = &Deployment{}
8688

8789
// GetBackup receives information about a backup resource
8890
func (d *Deployment) GetBackup(ctx context.Context, backup string) (*backupApi.ArangoBackup, error) {
89-
ctxChild, cancel := context.WithTimeout(ctx, k8sutil.GetRequestTimeout())
91+
ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx)
9092
defer cancel()
9193

9294
return d.deps.DatabaseCRCli.BackupV1().ArangoBackups(d.Namespace()).Get(ctxChild, backup, meta.GetOptions{})
@@ -392,7 +394,7 @@ func (d *Deployment) GetPod(ctx context.Context, podName string) (*core.Pod, err
392394
// of the deployment. If the pod does not exist, the error is ignored.
393395
func (d *Deployment) DeletePod(ctx context.Context, podName string) error {
394396
log := d.deps.Log
395-
err := k8sutil.RunWithTimeout(ctx, func(ctxChild context.Context) error {
397+
err := globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
396398
return d.PodsModInterface().Delete(ctxChild, podName, meta.DeleteOptions{})
397399
})
398400
if err != nil && !k8sutil.IsNotFound(err) {
@@ -409,7 +411,7 @@ func (d *Deployment) CleanupPod(ctx context.Context, p *core.Pod) error {
409411
podName := p.GetName()
410412
options := meta.NewDeleteOptions(0)
411413
options.Preconditions = meta.NewUIDPreconditions(string(p.GetUID()))
412-
err := k8sutil.RunWithTimeout(ctx, func(ctxChild context.Context) error {
414+
err := globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
413415
return d.PodsModInterface().Delete(ctxChild, podName, *options)
414416
})
415417
if err != nil && !k8sutil.IsNotFound(err) {
@@ -424,7 +426,7 @@ func (d *Deployment) CleanupPod(ctx context.Context, p *core.Pod) error {
424426
func (d *Deployment) RemovePodFinalizers(ctx context.Context, podName string) error {
425427
log := d.deps.Log
426428

427-
ctxChild, cancel := context.WithTimeout(ctx, k8sutil.GetRequestTimeout())
429+
ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx)
428430
defer cancel()
429431
p, err := d.GetCachedStatus().PodReadInterface().Get(ctxChild, podName, meta.GetOptions{})
430432
if err != nil {
@@ -445,7 +447,7 @@ func (d *Deployment) RemovePodFinalizers(ctx context.Context, podName string) er
445447
// of the deployment. If the pvc does not exist, the error is ignored.
446448
func (d *Deployment) DeletePvc(ctx context.Context, pvcName string) error {
447449
log := d.deps.Log
448-
err := k8sutil.RunWithTimeout(ctx, func(ctxChild context.Context) error {
450+
err := globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
449451
return d.PersistentVolumeClaimsModInterface().Delete(ctxChild, pvcName, meta.DeleteOptions{})
450452
})
451453
if err != nil && !k8sutil.IsNotFound(err) {
@@ -458,7 +460,7 @@ func (d *Deployment) DeletePvc(ctx context.Context, pvcName string) error {
458460
// UpdatePvc updated a persistent volume claim in the namespace
459461
// of the deployment. If the pvc does not exist, the error is ignored.
460462
func (d *Deployment) UpdatePvc(ctx context.Context, pvc *core.PersistentVolumeClaim) error {
461-
err := k8sutil.RunWithTimeout(ctx, func(ctxChild context.Context) error {
463+
err := globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
462464
_, err := d.PersistentVolumeClaimsModInterface().Update(ctxChild, pvc, meta.UpdateOptions{})
463465
return err
464466
})
@@ -488,7 +490,7 @@ func (d *Deployment) GetOwnedPVCs() ([]core.PersistentVolumeClaim, error) {
488490

489491
// GetPvc gets a PVC by the given name, in the samespace of the deployment.
490492
func (d *Deployment) GetPvc(ctx context.Context, pvcName string) (*core.PersistentVolumeClaim, error) {
491-
ctxChild, cancel := context.WithTimeout(ctx, k8sutil.GetRequestTimeout())
493+
ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx)
492494
defer cancel()
493495

494496
pvc, err := d.GetCachedStatus().PersistentVolumeClaimReadInterface().Get(ctxChild, pvcName, meta.GetOptions{})
@@ -514,7 +516,7 @@ func (d *Deployment) GetTLSKeyfile(group api.ServerGroup, member api.MemberStatu
514516
// If the secret does not exist, the error is ignored.
515517
func (d *Deployment) DeleteTLSKeyfile(ctx context.Context, group api.ServerGroup, member api.MemberStatus) error {
516518
secretName := k8sutil.CreateTLSKeyfileSecretName(d.GetName(), group.AsRole(), member.ID)
517-
err := k8sutil.RunWithTimeout(ctx, func(ctxChild context.Context) error {
519+
err := globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
518520
return d.SecretsModInterface().Delete(ctxChild, secretName, meta.DeleteOptions{})
519521
})
520522
if err != nil && !k8sutil.IsNotFound(err) {
@@ -724,7 +726,7 @@ func (d *Deployment) ApplyPatchOnPod(ctx context.Context, pod *core.Pod, p ...pa
724726

725727
c := d.deps.KubeCli.CoreV1().Pods(pod.GetNamespace())
726728

727-
ctxChild, cancel := context.WithTimeout(ctx, k8sutil.GetRequestTimeout())
729+
ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx)
728730
defer cancel()
729731
_, err = c.Patch(ctxChild, pod.GetName(), types.JSONPatchType, data, meta.PatchOptions{})
730732
if err != nil {

0 commit comments

Comments
 (0)