Skip to content

Commit 8624d7a

Browse files
committed
feat: tenant owned resources problem enhancements
Signed-off-by: Bence Csati <bence.csati@axoflow.com>
1 parent c9352fa commit 8624d7a

File tree

3 files changed

+97
-56
lines changed

3 files changed

+97
-56
lines changed

controllers/telemetry/route_controller.go

Lines changed: 62 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import (
1919
"fmt"
2020
"reflect"
2121
"slices"
22-
"sort"
22+
"strings"
2323

2424
"emperror.dev/errors"
2525
apiv1 "k8s.io/api/core/v1"
@@ -98,11 +98,12 @@ func (r *RouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
9898
}
9999
for _, step := range steps {
100100
if err := step.fn(); err != nil {
101-
return r.handleReconcileError(ctx, baseManager, tenant, step.name, err)
101+
return r.handleTenantReconcileError(ctx, &baseManager, tenant, step.name, err)
102102
}
103103
}
104104

105105
tenant.Status.State = state.StateReady
106+
tenant.ClearProblems()
106107
if !reflect.DeepEqual(originalTenantStatus, tenant.Status) {
107108
baseManager.Info("tenant status changed")
108109
if updateErr := r.Status().Update(ctx, tenant); updateErr != nil {
@@ -217,12 +218,11 @@ func (r *RouteReconciler) SetupWithManager(mgr ctrl.Manager) error {
217218
return builder.Complete(r)
218219
}
219220

220-
// handleReconcileError handles errors that occur during reconciliation steps
221-
func (r *RouteReconciler) handleReconcileError(ctx context.Context, baseManager manager.BaseManager, tenant *v1alpha1.Tenant, stepName string, err error) (ctrl.Result, error) {
221+
// handleTenantReconcileError handles errors that occur during reconciliation steps
222+
func (r *RouteReconciler) handleTenantReconcileError(ctx context.Context, baseManager *manager.BaseManager, tenant *v1alpha1.Tenant, stepName string, err error) (ctrl.Result, error) {
222223
wrappedErr := errors.Wrapf(err, "failed to %s for tenant %s", stepName, tenant.Name)
223224

224-
tenant.Status.Problems = append(tenant.Status.Problems, wrappedErr.Error())
225-
tenant.Status.ProblemsCount = len(tenant.Status.Problems)
225+
tenant.AddProblem(wrappedErr.Error())
226226
tenant.Status.State = state.StateFailed
227227

228228
baseManager.Error(errors.WithStack(err), fmt.Sprintf("failed to %s", stepName), "tenant", tenant.Name)
@@ -247,6 +247,10 @@ func handleLogSourceNamespaces(ctx context.Context, tenantResManager *manager.Te
247247
}
248248

249249
func handleOwnedResources(ctx context.Context, tenantResManager *manager.TenantResourceManager, tenant *v1alpha1.Tenant) error {
250+
// Caching all outputs for the tenant to validate subscriptions against
251+
var allOutputsForTenant []model.ResourceOwnedByTenant
252+
253+
// Process outputs first to ensure they have their tenant set before validating subscriptions
250254
tenantOwnedResources := []model.ResourceOwnedByTenant{
251255
&v1alpha1.Output{},
252256
&v1alpha1.Subscription{},
@@ -255,11 +259,7 @@ func handleOwnedResources(ctx context.Context, tenantResManager *manager.TenantR
255259
resourcesForTenant, resourceUpdateList, err := tenantResManager.GetResourceOwnedByTenant(ctx, resource, tenant)
256260
if err != nil {
257261
tenantResManager.Error(errors.WithStack(err), fmt.Sprintf("failed to get %T for tenant", resource), "tenant", tenant.Name)
258-
if updateErr := tenantResManager.Status().Update(ctx, tenant); updateErr != nil {
259-
tenantResManager.Error(errors.WithStack(updateErr), "failed updating tenant status", "tenant", tenant.Name)
260-
return errors.Append(err, updateErr)
261-
}
262-
return err
262+
return fmt.Errorf("failed to get %T for tenant %s: %w", resource, tenant.Name, err)
263263
}
264264

265265
// Add all newly updated resources here
@@ -271,38 +271,60 @@ func handleOwnedResources(ctx context.Context, tenantResManager *manager.TenantR
271271
}
272272
tenantResManager.DisownResources(ctx, resourcesToDisown)
273273

274+
if _, ok := resource.(*v1alpha1.Output); ok {
275+
allOutputsForTenant = resourcesForTenant
276+
}
277+
274278
if _, ok := resource.(*v1alpha1.Subscription); ok {
275279
subscriptionNames := manager.GetResourceNamesFromResource(resourcesForTenant)
276280
components.SortNamespacedNames(subscriptionNames)
277281
tenant.Status.Subscriptions = subscriptionNames
278282

279-
if err := validateSubscriptionOutputs(ctx, tenantResManager, tenant, resourcesForTenant); err != nil {
283+
if err := validateSubscriptionOutputs(ctx, tenantResManager, tenant, resourcesForTenant, allOutputsForTenant); err != nil {
280284
return err
281285
}
282286
}
287+
288+
for _, res := range resourcesForTenant {
289+
if res.GetState() == state.StateFailed {
290+
tenantResManager.Error(errors.New("resource failed"), "failed resource", "resource", res.GetName())
291+
return fmt.Errorf("resource %s is in a failed state", res.GetName())
292+
}
293+
}
283294
}
284295

285296
return nil
286297
}
287298

288-
func validateSubscriptionOutputs(ctx context.Context, tenantResManager *manager.TenantResourceManager, tenant *v1alpha1.Tenant, subscriptionsForTenant []model.ResourceOwnedByTenant) error {
299+
func validateSubscriptionOutputs(ctx context.Context, tenantResManager *manager.TenantResourceManager, tenant *v1alpha1.Tenant, subscriptionsForTenant []model.ResourceOwnedByTenant, outputsForTenant []model.ResourceOwnedByTenant) error {
289300
realSubscriptionsForTenant, err := utils.GetConcreteTypeFromList[*v1alpha1.Subscription](utils.ToObject(subscriptionsForTenant))
290301
if err != nil {
291302
tenantResManager.Error(errors.WithStack(err), "failed to get concrete type from list", "tenant", tenant.Name)
292303
return err
293304
}
294305

306+
realOutputsForTenant, err := utils.GetConcreteTypeFromList[*v1alpha1.Output](utils.ToObject(outputsForTenant))
307+
if err != nil {
308+
tenantResManager.Error(errors.WithStack(err), "failed to get concrete type from list for outputs", "tenant", tenant.Name)
309+
return err
310+
}
311+
312+
outputMap := make(map[v1alpha1.NamespacedName]*v1alpha1.Output)
313+
for _, output := range realOutputsForTenant {
314+
outputMap[output.NamespacedName()] = output
315+
}
316+
295317
for _, subscription := range realSubscriptionsForTenant {
296318
originalSubscriptionStatus := subscription.Status.DeepCopy()
297-
validOutputs, invalidOutputs := tenantResManager.ValidateSubscriptionOutputs(ctx, subscription)
298-
299-
if len(invalidOutputs) > 0 {
300-
subscription.Status.Problems = append(subscription.Status.Problems, fmt.Sprintf("invalid outputs for subscription %s: %v", subscription.Name, invalidOutputs))
301-
subscription.Status.ProblemsCount = len(subscription.Status.Problems)
319+
validOutputs, invalidOutputs := tenantResManager.ValidateSubscriptionReferencedOutputsWithCache(ctx, subscription, outputMap)
320+
switch len(invalidOutputs) {
321+
case 0:
322+
subscription.Status.State = state.StateReady
323+
subscription.ClearProblems()
324+
default:
302325
subscription.Status.State = state.StateFailed
303-
tenantResManager.UpdateOutputs(ctx, tenant, invalidOutputs)
326+
subscription.AddProblem(fmt.Sprintf("invalid outputs referenced by subscription %s: %s", subscription.Name, strings.Join(invalidOutputs, ", ")))
304327
}
305-
306328
components.SortNamespacedNames(validOutputs)
307329
subscription.Status.Outputs = validOutputs
308330

@@ -320,34 +342,40 @@ func validateSubscriptionOutputs(ctx context.Context, tenantResManager *manager.
320342
func handleBridgeResources(ctx context.Context, bridgeManager *manager.BridgeManager, tenant *v1alpha1.Tenant) error {
321343
bridgesForTenant, err := bridgeManager.GetBridgesForTenant(ctx, tenant.Name)
322344
if err != nil {
323-
tenant.Status.State = state.StateFailed
324345
bridgeManager.Error(errors.WithStack(err), "failed to get bridges for tenant", "tenant", tenant.Name)
325-
if updateErr := bridgeManager.Status().Update(ctx, tenant); updateErr != nil {
326-
bridgeManager.Error(errors.WithStack(updateErr), "failed updating tenant status", "tenant", tenant.Name)
327-
return errors.Append(err, updateErr)
328-
}
329-
330-
return err
346+
return fmt.Errorf("failed to get bridges for tenant %s: %w", tenant.Name, err)
331347
}
332348

333349
bridgesForTenantNames := manager.GetBridgeNamesFromBridges(bridgesForTenant)
334-
sort.Strings(bridgesForTenantNames)
350+
slices.Sort(bridgesForTenantNames)
335351
tenant.Status.ConnectedBridges = bridgesForTenantNames
336352

337-
for _, bridge := range bridgesForTenant {
338-
if err := bridgeManager.ValidateBridgeConnection(ctx, tenant.Name, &bridge); err != nil {
339-
bridge.Status.Problems = append(bridge.Status.Problems, errors.Wrapf(err, "bridge %s validation failed", bridge.Name).Error())
340-
bridge.Status.ProblemsCount = len(bridge.Status.Problems)
353+
for i := range bridgesForTenant {
354+
bridge := &bridgesForTenant[i]
355+
originalBridgeStatus := bridge.Status.DeepCopy()
356+
357+
if err := bridgeManager.ValidateBridgeConnection(ctx, tenant.Name, bridge); err != nil {
358+
bridge.AddProblem(errors.Wrapf(err, "bridge %s validation failed", bridge.Name).Error())
341359
bridge.Status.State = state.StateFailed
342360

343361
bridgeManager.Error(errors.WithStack(err), "failed to check bridge connection", "bridge", bridge.Name)
344-
if updateErr := bridgeManager.Status().Update(ctx, tenant); updateErr != nil {
345-
bridgeManager.Error(errors.WithStack(updateErr), "failed updating tenant status", "tenant", tenant.Name)
362+
if updateErr := bridgeManager.Status().Update(ctx, bridge); updateErr != nil {
363+
bridgeManager.Error(errors.WithStack(updateErr), "failed updating bridge status", "bridge", bridge.Name)
346364
return errors.Append(err, updateErr)
347365
}
348366

349367
return err
350368
}
369+
370+
bridge.Status.State = state.StateReady
371+
bridge.ClearProblems()
372+
373+
if !reflect.DeepEqual(originalBridgeStatus, &bridge.Status) {
374+
if updateErr := bridgeManager.Status().Update(ctx, bridge); updateErr != nil {
375+
bridgeManager.Error(errors.WithStack(updateErr), "failed updating bridge status", "bridge", bridge.Name)
376+
return updateErr
377+
}
378+
}
351379
}
352380

353381
return nil

pkg/resources/manager/bridge_manager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func (b *BridgeManager) getTenants(ctx context.Context, listOpts *client.ListOpt
7575
return tenants.Items, nil
7676
}
7777

78-
func (b *BridgeManager) CheckBridgeConnection(ctx context.Context, tenantName string, bridge *v1alpha1.Bridge) error {
78+
func (b *BridgeManager) ValidateBridgeConnection(ctx context.Context, tenantName string, bridge *v1alpha1.Bridge) error {
7979
for _, tenantReference := range []string{bridge.Spec.SourceTenant, bridge.Spec.TargetTenant} {
8080
if tenantReference != tenantName {
8181
listOpts := &client.ListOptions{

pkg/resources/manager/tenant_resource_manager.go

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
apierrors "k8s.io/apimachinery/pkg/api/errors"
2727
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2828
"k8s.io/apimachinery/pkg/fields"
29-
"k8s.io/apimachinery/pkg/types"
3029
"sigs.k8s.io/controller-runtime/pkg/client"
3130

3231
"github.com/kube-logging/telemetry-controller/api/telemetry/v1alpha1"
@@ -99,12 +98,21 @@ func (t *TenantResourceManager) GetResourceOwnedByTenant(ctx context.Context, re
9998
currentTenant := res.GetTenant()
10099
if currentTenant != "" && currentTenant != tenant.Name {
101100
t.Error(
102-
fmt.Errorf("resource is owned by another tenant"),
101+
fmt.Errorf("resource: %s of kind: %s is owned by another tenant", res.GetName(), res.GetObjectKind().GroupVersionKind().Kind),
103102
"skipping reconciliation",
104103
"current_tenant", currentTenant,
105104
"desired_tenant", tenant.Name,
106105
"action_required", "remove resource from previous tenant before adopting to new tenant",
107106
)
107+
108+
// Clear any previous problems and add the ownership conflict
109+
res.SetProblems([]string{
110+
fmt.Sprintf("resource %s/%s is already owned by tenant %s", res.GetNamespace(), res.GetName(), currentTenant),
111+
})
112+
res.SetState(state.StateFailed)
113+
if err := t.Status().Update(ctx, res); err != nil {
114+
t.Error(err, fmt.Sprintf("failed to update resource (%s/%s) state", res.GetNamespace(), res.GetName()))
115+
}
108116
continue
109117
}
110118

@@ -188,43 +196,52 @@ func (t *TenantResourceManager) DisownResources(ctx context.Context, resourceToD
188196
}
189197
}
190198

191-
// ValidateSubscriptionOutputs validates the output references of a subscription
192-
func (t *TenantResourceManager) ValidateSubscriptionOutputs(ctx context.Context, subscription *v1alpha1.Subscription) []v1alpha1.NamespacedName {
193-
validOutputs := []v1alpha1.NamespacedName{}
194-
invalidOutputs := []v1alpha1.NamespacedName{}
199+
// ValidateSubscriptionReferencedOutputsWithCache validates outputs referenced by the subscription using a provided cache of outputs
200+
func (t *TenantResourceManager) ValidateSubscriptionReferencedOutputsWithCache(ctx context.Context, subscription *v1alpha1.Subscription, outputMap map[v1alpha1.NamespacedName]*v1alpha1.Output) ([]v1alpha1.NamespacedName, []string) {
201+
var validOutputs []v1alpha1.NamespacedName
202+
var invalidOutputs []string
195203
for _, outputRef := range subscription.Spec.Outputs {
196-
checkedOutput := &v1alpha1.Output{}
197-
if err := t.Get(ctx, types.NamespacedName(outputRef), checkedOutput); err != nil {
198-
t.Error(err, "referred output invalid", "output", outputRef.String())
199-
200-
invalidOutputs = append(invalidOutputs, outputRef)
204+
invalid := false
205+
checkedOutput, exists := outputMap[outputRef]
206+
if !exists {
207+
// Output doesn't exist or isn't owned by this tenant
208+
t.Error(errors.New("output not found"), "referred output invalid", "output", outputRef.String())
209+
invalidOutputs = append(invalidOutputs, outputRef.String())
201210
continue
202211
}
203212

204213
// ensure the output belongs to the same tenant
205214
if checkedOutput.Status.Tenant != subscription.Status.Tenant {
215+
invalid = true
206216
t.Error(errors.New("output and subscription tenants mismatch"),
207217
"output and subscription tenants mismatch",
208218
"output", checkedOutput.NamespacedName().String(),
209219
"output's tenant", checkedOutput.Status.Tenant,
210220
"subscription", subscription.NamespacedName().String(),
211221
"subscription's tenant", subscription.Status.Tenant)
212-
213-
invalidOutputs = append(invalidOutputs, outputRef)
214-
continue
222+
checkedOutput.AddProblem(fmt.Sprintf("output %s does not belong to the same tenant as subscription %s", outputRef.String(), subscription.NamespacedName().String()))
215223
}
216224

217225
// validate output secret
218226
if checkedOutput.Spec.Authentication != nil {
219227
if err := components.QueryOutputSecret(ctx, t.Client, checkedOutput); err != nil {
228+
invalid = true
220229
t.Error(err, "failed to query output secret", "output", checkedOutput.NamespacedName().String())
230+
checkedOutput.AddProblem(fmt.Sprintf("output %s secret could not be queried: %v", outputRef.String(), err))
231+
}
232+
}
221233

222-
invalidOutputs = append(invalidOutputs, outputRef)
223-
continue
234+
if invalid {
235+
checkedOutput.Status.State = state.StateFailed
236+
if updateErr := t.Status().Update(ctx, checkedOutput); updateErr != nil {
237+
t.Error(updateErr, fmt.Sprintf("failed to update output (%s/%s) state", checkedOutput.GetNamespace(), checkedOutput.GetName()))
224238
}
239+
invalidOutputs = append(invalidOutputs, checkedOutput.NamespacedName().String())
240+
continue
225241
}
226242

227243
// update the output state if validation was successful
244+
checkedOutput.ClearProblems()
228245
checkedOutput.SetState(state.StateReady)
229246
if updateErr := t.Status().Update(ctx, checkedOutput); updateErr != nil {
230247
checkedOutput.SetState(state.StateFailed)
@@ -234,11 +251,7 @@ func (t *TenantResourceManager) ValidateSubscriptionOutputs(ctx context.Context,
234251
validOutputs = append(validOutputs, outputRef)
235252
}
236253

237-
if len(invalidOutputs) > 0 {
238-
t.Error(errors.New("some outputs are invalid"), "some outputs are invalid", "invalidOutputs", invalidOutputs, "subscription", subscription.NamespacedName().String())
239-
}
240-
241-
return validOutputs
254+
return validOutputs, invalidOutputs
242255
}
243256

244257
// getNamespacesForSelectorSlice returns a list of namespaces that match the given label selectors

0 commit comments

Comments
 (0)