Skip to content

Commit bd64ca7

Browse files
committed
Speeding up inspection loop
1 parent ef03a72 commit bd64ca7

28 files changed

+463
-215
lines changed

pkg/deployment/access_package.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,15 +108,15 @@ func (d *Deployment) ensureAccessPackage(apSecretName string) error {
108108

109109
// Fetch client authentication CA
110110
clientAuthSecretName := spec.Sync.Authentication.GetClientCASecretName()
111-
clientAuthCert, clientAuthKey, _, err := k8sutil.GetCASecret(d.deps.KubeCli.CoreV1(), clientAuthSecretName, ns, nil)
111+
clientAuthCert, clientAuthKey, _, err := k8sutil.GetCASecret(secrets, clientAuthSecretName, nil)
112112
if err != nil {
113113
log.Debug().Err(err).Msg("Failed to get client-auth CA secret")
114114
return maskAny(err)
115115
}
116116

117117
// Fetch TLS CA public key
118118
tlsCASecretName := spec.Sync.TLS.GetCASecretName()
119-
tlsCACert, err := k8sutil.GetCACertficateSecret(d.deps.KubeCli.CoreV1(), tlsCASecretName, ns)
119+
tlsCACert, err := k8sutil.GetCACertficateSecret(secrets, tlsCASecretName)
120120
if err != nil {
121121
log.Debug().Err(err).Msg("Failed to get TLS CA secret")
122122
return maskAny(err)

pkg/deployment/context_impl.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -171,8 +171,9 @@ func (d *Deployment) GetSyncServerClient(ctx context.Context, group api.ServerGr
171171
log := d.deps.Log
172172
kubecli := d.deps.KubeCli
173173
ns := d.apiObject.GetNamespace()
174+
secrets := kubecli.CoreV1().Secrets(ns)
174175
secretName := d.apiObject.Spec.Sync.Monitoring.GetTokenSecretName()
175-
monitoringToken, err := k8sutil.GetTokenSecret(kubecli.CoreV1(), secretName, ns)
176+
monitoringToken, err := k8sutil.GetTokenSecret(secrets, secretName)
176177
if err != nil {
177178
log.Debug().Err(err).Str("secret-name", secretName).Msg("Failed to get sync monitoring secret")
178179
return nil, maskAny(err)
@@ -331,7 +332,8 @@ func (d *Deployment) GetPvc(pvcName string) (*v1.PersistentVolumeClaim, error) {
331332
func (d *Deployment) GetTLSKeyfile(group api.ServerGroup, member api.MemberStatus) (string, error) {
332333
secretName := k8sutil.CreateTLSKeyfileSecretName(d.apiObject.GetName(), group.AsRole(), member.ID)
333334
ns := d.apiObject.GetNamespace()
334-
result, err := k8sutil.GetTLSKeyfileSecret(d.deps.KubeCli.CoreV1(), secretName, ns)
335+
secrets := d.deps.KubeCli.CoreV1().Secrets(ns)
336+
result, err := k8sutil.GetTLSKeyfileSecret(secrets, secretName)
335337
if err != nil {
336338
return "", maskAny(err)
337339
}
@@ -353,8 +355,9 @@ func (d *Deployment) DeleteTLSKeyfile(group api.ServerGroup, member api.MemberSt
353355
// Returns: publicKey, privateKey, ownerByDeployment, error
354356
func (d *Deployment) GetTLSCA(secretName string) (string, string, bool, error) {
355357
ns := d.apiObject.GetNamespace()
358+
secrets := d.deps.KubeCli.CoreV1().Secrets(ns)
356359
owner := d.apiObject.AsOwner()
357-
cert, priv, isOwned, err := k8sutil.GetCASecret(d.deps.KubeCli.CoreV1(), secretName, ns, &owner)
360+
cert, priv, isOwned, err := k8sutil.GetCASecret(secrets, secretName, &owner)
358361
if err != nil {
359362
return "", "", false, maskAny(err)
360363
}

pkg/deployment/deployment.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import (
4242
"github.com/arangodb/kube-arangodb/pkg/deployment/resilience"
4343
"github.com/arangodb/kube-arangodb/pkg/deployment/resources"
4444
"github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned"
45+
"github.com/arangodb/kube-arangodb/pkg/util"
4546
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
4647
"github.com/arangodb/kube-arangodb/pkg/util/retry"
4748
"github.com/arangodb/kube-arangodb/pkg/util/trigger"
@@ -78,8 +79,8 @@ type deploymentEvent struct {
7879

7980
const (
8081
deploymentEventQueueSize = 256
81-
minInspectionInterval = time.Second // Ensure we inspect the generated resources no less than with this interval
82-
maxInspectionInterval = time.Minute // Ensure we inspect the generated resources no less than with this interval
82+
minInspectionInterval = util.Interval(time.Second) // Ensure we inspect the generated resources no less than with this interval
83+
maxInspectionInterval = util.Interval(time.Minute) // Ensure we inspect the generated resources no less than with this interval
8384
)
8485

8586
// Deployment is the in process state of an ArangoDeployment.
@@ -247,21 +248,21 @@ func (d *Deployment) run() {
247248
}
248249

249250
case <-d.inspectTrigger.Done():
251+
log.Debug().Msg("Inspect deployment...")
250252
inspectionInterval = d.inspectDeployment(inspectionInterval)
253+
log.Debug().Str("interval", inspectionInterval.String()).Msg("...inspected deployment")
251254

252255
case <-d.updateDeploymentTrigger.Done():
256+
inspectionInterval = minInspectionInterval
253257
if err := d.handleArangoDeploymentUpdatedEvent(); err != nil {
254258
d.CreateEvent(k8sutil.NewErrorEvent("Failed to handle deployment update", err, d.GetAPIObject()))
255259
}
256260

257-
case <-time.After(inspectionInterval):
261+
case <-inspectionInterval.After():
258262
// Trigger inspection
259263
d.inspectTrigger.Trigger()
260264
// Backoff with next interval
261-
inspectionInterval = time.Duration(float64(inspectionInterval) * 1.5)
262-
if inspectionInterval > maxInspectionInterval {
263-
inspectionInterval = maxInspectionInterval
264-
}
265+
inspectionInterval = inspectionInterval.Backoff(1.5, maxInspectionInterval)
265266
}
266267
}
267268
}

pkg/deployment/deployment_inspector.go

Lines changed: 63 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
3030
"github.com/arangodb/kube-arangodb/pkg/util"
3131
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
32+
"github.com/arangodb/kube-arangodb/pkg/util/profiler"
3233
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3334
)
3435

@@ -39,7 +40,7 @@ import (
3940
// - any of the underlying resources has changed
4041
// - once in a while
4142
// Returns the delay until this function should be called again.
42-
func (d *Deployment) inspectDeployment(lastInterval time.Duration) time.Duration {
43+
func (d *Deployment) inspectDeployment(lastInterval util.Interval) util.Interval {
4344
log := d.deps.Log
4445

4546
nextInterval := lastInterval
@@ -92,13 +93,13 @@ func (d *Deployment) inspectDeployment(lastInterval time.Duration) time.Duration
9293
hasError = true
9394
d.CreateEvent(k8sutil.NewErrorEvent("Pod inspection failed", err, d.apiObject))
9495
} else {
95-
nextInterval = util.MinDuration(nextInterval, x)
96+
nextInterval = nextInterval.ReduceTo(x)
9697
}
9798
if x, err := d.resources.InspectPVCs(ctx); err != nil {
9899
hasError = true
99100
d.CreateEvent(k8sutil.NewErrorEvent("PVC inspection failed", err, d.apiObject))
100101
} else {
101-
nextInterval = util.MinDuration(nextInterval, x)
102+
nextInterval = nextInterval.ReduceTo(x)
102103
}
103104

104105
// Check members for resilience
@@ -108,43 +109,67 @@ func (d *Deployment) inspectDeployment(lastInterval time.Duration) time.Duration
108109
}
109110

110111
// Create scale/update plan
111-
if err := d.reconciler.CreatePlan(); err != nil {
112-
hasError = true
113-
d.CreateEvent(k8sutil.NewErrorEvent("Plan creation failed", err, d.apiObject))
114-
}
115-
116-
// Execute current step of scale/update plan
117-
retrySoon, err := d.reconciler.ExecutePlan(ctx)
118-
if err != nil {
119-
hasError = true
120-
d.CreateEvent(k8sutil.NewErrorEvent("Plan execution failed", err, d.apiObject))
121-
}
122-
if retrySoon {
123-
nextInterval = minInspectionInterval
112+
{
113+
ps := profiler.Start()
114+
if err := d.reconciler.CreatePlan(); err != nil {
115+
hasError = true
116+
d.CreateEvent(k8sutil.NewErrorEvent("Plan creation failed", err, d.apiObject))
117+
}
118+
119+
// Execute current step of scale/update plan
120+
retrySoon, err := d.reconciler.ExecutePlan(ctx)
121+
if err != nil {
122+
hasError = true
123+
d.CreateEvent(k8sutil.NewErrorEvent("Plan execution failed", err, d.apiObject))
124+
}
125+
if retrySoon {
126+
nextInterval = minInspectionInterval
127+
}
128+
ps.Done(log, "plan")
124129
}
125130

126131
// Ensure all resources are created
127-
if err := d.resources.EnsureSecrets(); err != nil {
128-
hasError = true
129-
d.CreateEvent(k8sutil.NewErrorEvent("Secret creation failed", err, d.apiObject))
130-
}
131-
if err := d.resources.EnsureServices(); err != nil {
132-
hasError = true
133-
d.CreateEvent(k8sutil.NewErrorEvent("Service creation failed", err, d.apiObject))
134-
}
135-
if err := d.resources.EnsurePVCs(); err != nil {
136-
hasError = true
137-
d.CreateEvent(k8sutil.NewErrorEvent("PVC creation failed", err, d.apiObject))
138-
}
139-
if err := d.resources.EnsurePods(); err != nil {
140-
hasError = true
141-
d.CreateEvent(k8sutil.NewErrorEvent("Pod creation failed", err, d.apiObject))
132+
{
133+
ps := profiler.Start()
134+
{
135+
ps := profiler.Start()
136+
if err := d.resources.EnsureSecrets(); err != nil {
137+
hasError = true
138+
d.CreateEvent(k8sutil.NewErrorEvent("Secret creation failed", err, d.apiObject))
139+
}
140+
ps.LogIf(log, time.Millisecond*10, "EnsureSecrets")
141+
}
142+
{
143+
ps := profiler.Start()
144+
if err := d.resources.EnsureServices(); err != nil {
145+
hasError = true
146+
d.CreateEvent(k8sutil.NewErrorEvent("Service creation failed", err, d.apiObject))
147+
}
148+
ps.LogIf(log, time.Millisecond*10, "EnsureServices")
149+
}
150+
if err := d.resources.EnsurePVCs(); err != nil {
151+
hasError = true
152+
d.CreateEvent(k8sutil.NewErrorEvent("PVC creation failed", err, d.apiObject))
153+
}
154+
{
155+
ps := profiler.Start()
156+
if err := d.resources.EnsurePods(); err != nil {
157+
hasError = true
158+
d.CreateEvent(k8sutil.NewErrorEvent("Pod creation failed", err, d.apiObject))
159+
}
160+
ps.LogIf(log, time.Millisecond*10, "EnsurePods")
161+
}
162+
ps.Done(log, "ensure resources")
142163
}
143164

144165
// Create access packages
145-
if err := d.createAccessPackages(); err != nil {
146-
hasError = true
147-
d.CreateEvent(k8sutil.NewErrorEvent("AccessPackage creation failed", err, d.apiObject))
166+
{
167+
ps := profiler.Start()
168+
if err := d.createAccessPackages(); err != nil {
169+
hasError = true
170+
d.CreateEvent(k8sutil.NewErrorEvent("AccessPackage creation failed", err, d.apiObject))
171+
}
172+
ps.Done(log, "createAccessPackages")
148173
}
149174

150175
// Inspect deployment for obsolete members
@@ -154,9 +179,11 @@ func (d *Deployment) inspectDeployment(lastInterval time.Duration) time.Duration
154179
}
155180

156181
// At the end of the inspect, we cleanup terminated pods.
157-
if err := d.resources.CleanupTerminatedPods(); err != nil {
182+
if x, err := d.resources.CleanupTerminatedPods(); err != nil {
158183
hasError = true
159184
d.CreateEvent(k8sutil.NewErrorEvent("Pod cleanup failed", err, d.apiObject))
185+
} else {
186+
nextInterval = nextInterval.ReduceTo(x)
160187
}
161188
}
162189

@@ -169,10 +196,7 @@ func (d *Deployment) inspectDeployment(lastInterval time.Duration) time.Duration
169196
} else {
170197
d.recentInspectionErrors = 0
171198
}
172-
if nextInterval > maxInspectionInterval {
173-
nextInterval = maxInspectionInterval
174-
}
175-
return nextInterval
199+
return nextInterval.ReduceTo(maxInspectionInterval)
176200
}
177201

178202
// triggerInspection ensures that an inspection is run soon.

pkg/deployment/resources/certificates_client_auth.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ const (
4242

4343
// createClientAuthCACertificate creates a client authentication CA certificate and stores it in a secret with name
4444
// specified in the given spec.
45-
func createClientAuthCACertificate(log zerolog.Logger, cli v1.CoreV1Interface, spec api.SyncAuthenticationSpec, deploymentName, namespace string, ownerRef *metav1.OwnerReference) error {
45+
func createClientAuthCACertificate(log zerolog.Logger, secrets k8sutil.SecretInterface, spec api.SyncAuthenticationSpec, deploymentName string, ownerRef *metav1.OwnerReference) error {
4646
log = log.With().Str("secret", spec.GetClientCASecretName()).Logger()
4747
options := certificates.CreateCertificateOptions{
4848
CommonName: fmt.Sprintf("%s Client Authentication Root Certificate", deploymentName),
@@ -57,7 +57,7 @@ func createClientAuthCACertificate(log zerolog.Logger, cli v1.CoreV1Interface, s
5757
log.Debug().Err(err).Msg("Failed to create CA certificate")
5858
return maskAny(err)
5959
}
60-
if err := k8sutil.CreateCASecret(cli, spec.GetClientCASecretName(), namespace, cert, priv, ownerRef); err != nil {
60+
if err := k8sutil.CreateCASecret(secrets, spec.GetClientCASecretName(), cert, priv, ownerRef); err != nil {
6161
if k8sutil.IsAlreadyExists(err) {
6262
log.Debug().Msg("CA Secret already exists")
6363
} else {
@@ -71,10 +71,10 @@ func createClientAuthCACertificate(log zerolog.Logger, cli v1.CoreV1Interface, s
7171

7272
// createClientAuthCertificateKeyfile creates a client authentication certificate for a specific user and stores
7373
// it in a secret with the given name.
74-
func createClientAuthCertificateKeyfile(log zerolog.Logger, cli v1.CoreV1Interface, commonName string, ttl time.Duration, spec api.SyncAuthenticationSpec, secretName, namespace string, ownerRef *metav1.OwnerReference) error {
74+
func createClientAuthCertificateKeyfile(log zerolog.Logger, secrets v1.SecretInterface, commonName string, ttl time.Duration, spec api.SyncAuthenticationSpec, secretName string, ownerRef *metav1.OwnerReference) error {
7575
log = log.With().Str("secret", secretName).Logger()
7676
// Load CA certificate
77-
caCert, caKey, _, err := k8sutil.GetCASecret(cli, spec.GetClientCASecretName(), namespace, nil)
77+
caCert, caKey, _, err := k8sutil.GetCASecret(secrets, spec.GetClientCASecretName(), nil)
7878
if err != nil {
7979
log.Debug().Err(err).Msg("Failed to load CA certificate")
8080
return maskAny(err)
@@ -100,7 +100,7 @@ func createClientAuthCertificateKeyfile(log zerolog.Logger, cli v1.CoreV1Interfa
100100
}
101101
keyfile := strings.TrimSpace(cert) + "\n" +
102102
strings.TrimSpace(priv)
103-
if err := k8sutil.CreateTLSKeyfileSecret(cli, secretName, namespace, keyfile, ownerRef); err != nil {
103+
if err := k8sutil.CreateTLSKeyfileSecret(secrets, secretName, keyfile, ownerRef); err != nil {
104104
if k8sutil.IsAlreadyExists(err) {
105105
log.Debug().Msg("Server Secret already exists")
106106
} else {

pkg/deployment/resources/certificates_tls.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ const (
4343

4444
// createTLSCACertificate creates a CA certificate and stores it in a secret with name
4545
// specified in the given spec.
46-
func createTLSCACertificate(log zerolog.Logger, cli v1.CoreV1Interface, spec api.TLSSpec, deploymentName, namespace string, ownerRef *metav1.OwnerReference) error {
46+
func createTLSCACertificate(log zerolog.Logger, secrets k8sutil.SecretInterface, spec api.TLSSpec, deploymentName string, ownerRef *metav1.OwnerReference) error {
4747
log = log.With().Str("secret", spec.GetCASecretName()).Logger()
4848

4949
options := certificates.CreateCertificateOptions{
@@ -58,7 +58,7 @@ func createTLSCACertificate(log zerolog.Logger, cli v1.CoreV1Interface, spec api
5858
log.Debug().Err(err).Msg("Failed to create CA certificate")
5959
return maskAny(err)
6060
}
61-
if err := k8sutil.CreateCASecret(cli, spec.GetCASecretName(), namespace, cert, priv, ownerRef); err != nil {
61+
if err := k8sutil.CreateCASecret(secrets, spec.GetCASecretName(), cert, priv, ownerRef); err != nil {
6262
if k8sutil.IsAlreadyExists(err) {
6363
log.Debug().Msg("CA Secret already exists")
6464
} else {
@@ -72,7 +72,7 @@ func createTLSCACertificate(log zerolog.Logger, cli v1.CoreV1Interface, spec api
7272

7373
// createTLSServerCertificate creates a TLS certificate for a specific server and stores
7474
// it in a secret with the given name.
75-
func createTLSServerCertificate(log zerolog.Logger, cli v1.CoreV1Interface, serverNames []string, spec api.TLSSpec, secretName, namespace string, ownerRef *metav1.OwnerReference) error {
75+
func createTLSServerCertificate(log zerolog.Logger, secrets v1.SecretInterface, serverNames []string, spec api.TLSSpec, secretName string, ownerRef *metav1.OwnerReference) error {
7676
log = log.With().Str("secret", secretName).Logger()
7777
// Load alt names
7878
dnsNames, ipAddresses, emailAddress, err := spec.GetParsedAltNames()
@@ -82,7 +82,7 @@ func createTLSServerCertificate(log zerolog.Logger, cli v1.CoreV1Interface, serv
8282
}
8383

8484
// Load CA certificate
85-
caCert, caKey, _, err := k8sutil.GetCASecret(cli, spec.GetCASecretName(), namespace, nil)
85+
caCert, caKey, _, err := k8sutil.GetCASecret(secrets, spec.GetCASecretName(), nil)
8686
if err != nil {
8787
log.Debug().Err(err).Msg("Failed to load CA certificate")
8888
return maskAny(err)
@@ -109,7 +109,7 @@ func createTLSServerCertificate(log zerolog.Logger, cli v1.CoreV1Interface, serv
109109
}
110110
keyfile := strings.TrimSpace(cert) + "\n" +
111111
strings.TrimSpace(priv)
112-
if err := k8sutil.CreateTLSKeyfileSecret(cli, secretName, namespace, keyfile, ownerRef); err != nil {
112+
if err := k8sutil.CreateTLSKeyfileSecret(secrets, secretName, keyfile, ownerRef); err != nil {
113113
if k8sutil.IsAlreadyExists(err) {
114114
log.Debug().Msg("Server Secret already exists")
115115
} else {

pkg/deployment/resources/pod_cleanup.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,23 +25,27 @@ package resources
2525
import (
2626
"time"
2727

28+
"github.com/arangodb/kube-arangodb/pkg/util"
2829
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
2930

3031
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
3132
)
3233

3334
const (
34-
statelessTerminationPeriod = time.Minute // We wait this long for a stateless server to terminate on it's own. Afterwards we kill it.
35+
statelessTerminationPeriod = time.Minute // We wait this long for a stateless server to terminate on it's own. Afterwards we kill it.
36+
recheckStatefullPodCleanupInterval = util.Interval(time.Second * 2) // Interval used when Pod finalizers need to be rechecked soon
3537
)
3638

3739
// CleanupTerminatedPods removes all pods in Terminated state that belong to a member in Created state.
38-
func (r *Resources) CleanupTerminatedPods() error {
40+
// Returns: Interval_till_next_inspection, error
41+
func (r *Resources) CleanupTerminatedPods() (util.Interval, error) {
3942
log := r.log
43+
nextInterval := maxPodInspectorInterval // Large by default, will be made smaller if needed in the rest of the function
4044

4145
pods, err := r.context.GetOwnedPods()
4246
if err != nil {
4347
log.Debug().Err(err).Msg("Failed to get owned pods")
44-
return maskAny(err)
48+
return 0, maskAny(err)
4549
}
4650

4751
// Update member status from all pods found
@@ -66,12 +70,15 @@ func (r *Resources) CleanupTerminatedPods() error {
6670
if !memberStatus.Conditions.IsTrue(api.ConditionTypeTerminated) {
6771
if !group.IsStateless() {
6872
// For statefull members, we have to wait for confirmed termination
73+
log.Debug().Str("pod", p.GetName()).Msg("Cannot cleanup pod yet, waiting for it to reach terminated state")
74+
nextInterval = nextInterval.ReduceTo(recheckStatefullPodCleanupInterval)
6975
continue
7076
} else {
7177
// If a stateless server does not terminate within a reasonable amount or time, we kill it.
7278
t := p.GetDeletionTimestamp()
7379
if t == nil || t.Add(statelessTerminationPeriod).After(time.Now()) {
7480
// Either delete timestamp is not set, or not yet waiting long enough
81+
nextInterval = nextInterval.ReduceTo(util.Interval(statelessTerminationPeriod))
7582
continue
7683
}
7784
}
@@ -84,5 +91,5 @@ func (r *Resources) CleanupTerminatedPods() error {
8491
log.Warn().Err(err).Str("pod-name", p.GetName()).Msg("Failed to cleanup pod")
8592
}
8693
}
87-
return nil
94+
return nextInterval, nil
8895
}

0 commit comments

Comments
 (0)