Skip to content

Commit 3095ed1

Browse files
committed
feat: add dry-run mode
Signed-off-by: Bence Csati <bence.csati@axoflow.com>
1 parent e1b4892 commit 3095ed1

File tree

7 files changed

+77
-37
lines changed

7 files changed

+77
-37
lines changed

api/telemetry/v1alpha1/collector_types.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ type MemoryLimiter struct {
5050
MemorySpikePercentage uint32 `json:"spike_limit_percentage"`
5151
}
5252

53+
// +kubebuilder:validation:XValidation:rule="!has(self.dryRunMode) || !self.dryRunMode || (has(self.debug) && self.debug)",message="dryRunMode can only be set to true when debug is explicitly set to true"
54+
5355
// CollectorSpec defines the desired state of Collector
5456
type CollectorSpec struct {
5557
// +kubebuilder:validation:Required
@@ -64,7 +66,11 @@ type CollectorSpec struct {
6466
ControlNamespace string `json:"controlNamespace"`
6567

6668
// Enables debug logging for the collector.
67-
Debug bool `json:"debug,omitempty"`
69+
Debug *bool `json:"debug,omitempty"`
70+
71+
// DryRunMode disables all exporters except for the debug exporter, as well as persistence options configured for the collector.
72+
// This can be useful for testing and debugging purposes.
73+
DryRunMode *bool `json:"dryRunMode,omitempty"`
6874

6975
// Setting memory limits for the Collector using the memory limiter processor.
7076
MemoryLimiter *MemoryLimiter `json:"memoryLimiter,omitempty"`

pkg/resources/manager/collector_manager.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,8 @@ func (c *CollectorManager) BuildConfigInputForCollector(ctx context.Context, col
132132
TenantSubscriptionMap: tenantSubscriptionMap,
133133
SubscriptionOutputMap: subscriptionOutputMap,
134134
},
135-
Debug: collector.Spec.Debug,
135+
Debug: utils.DerefOrZero(collector.Spec.Debug),
136+
DryRunMode: utils.DerefOrZero(collector.Spec.DryRunMode),
136137
MemoryLimiter: *collector.Spec.MemoryLimiter,
137138
}, nil
138139
}
@@ -181,7 +182,9 @@ func (c *CollectorManager) OtelCollector(collector *v1alpha1.Collector, otelConf
181182
OpenTelemetryCommonFields: *collector.Spec.OtelCommonFields,
182183
},
183184
}
184-
handleVolumes(&otelCollector.Spec.OpenTelemetryCommonFields, tenants, outputs)
185+
if !utils.DerefOrZero(collector.Spec.DryRunMode) {
186+
handleVolumes(&otelCollector.Spec.OpenTelemetryCommonFields, tenants, outputs)
187+
}
185188
setOtelCommonFieldsDefaults(&otelCollector.Spec.OpenTelemetryCommonFields, additionalArgs, saName)
186189

187190
if memoryLimit := collector.Spec.GetMemoryLimit(); memoryLimit != nil {

pkg/resources/otel_conf_gen/otel_conf_gen.go

Lines changed: 43 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ type OtelColConfigInput struct {
4040
components.ResourceRelations
4141
MemoryLimiter v1alpha1.MemoryLimiter
4242
Debug bool
43+
DryRunMode bool
4344
}
4445

4546
func (cfgInput *OtelColConfigInput) IsEmpty() bool {
@@ -61,15 +62,22 @@ func (cfgInput *OtelColConfigInput) IsEmpty() bool {
6162
}
6263

6364
func (cfgInput *OtelColConfigInput) generateExporters(ctx context.Context) map[string]any {
64-
exporters := map[string]any{}
65+
// If in dry-run mode, only generate debug exporters
66+
if cfgInput.DryRunMode {
67+
return exporter.GenerateDebugExporters()
68+
}
69+
70+
exporters := make(map[string]any)
71+
72+
if cfgInput.Debug {
73+
maps.Copy(exporters, exporter.GenerateDebugExporters())
74+
}
75+
6576
maps.Copy(exporters, exporter.GenerateMetricsExporters())
6677
maps.Copy(exporters, exporter.GenerateOTLPGRPCExporters(ctx, cfgInput.ResourceRelations))
6778
maps.Copy(exporters, exporter.GenerateOTLPHTTPExporters(ctx, cfgInput.ResourceRelations))
6879
maps.Copy(exporters, exporter.GenerateFluentforwardExporters(ctx, cfgInput.ResourceRelations))
6980
maps.Copy(exporters, exporter.GenerateFileExporter(ctx, cfgInput.ResourceRelations))
70-
if cfgInput.Debug {
71-
maps.Copy(exporters, exporter.GenerateDebugExporters())
72-
}
7381

7482
return exporters
7583
}
@@ -122,7 +130,7 @@ func (cfgInput *OtelColConfigInput) generateExtensions() (map[string]any, []stri
122130
}
123131

124132
for _, tenant := range cfgInput.Tenants {
125-
if tenant.Spec.PersistenceConfig.EnableFileStorage {
133+
if !cfgInput.DryRunMode && tenant.Spec.PersistenceConfig.EnableFileStorage {
126134
extensions[fmt.Sprintf("file_storage/%s", tenant.Name)] = storage.GenerateFileStorageExtensionForTenant(tenant.Spec.PersistenceConfig.Directory, tenant.Name)
127135
}
128136
}
@@ -149,7 +157,7 @@ func (cfgInput *OtelColConfigInput) generateReceivers() map[string]any {
149157
}); tenantIdx != -1 {
150158
namespaces := cfgInput.Tenants[tenantIdx].Status.LogSourceNamespaces
151159
if len(namespaces) > 0 || cfgInput.Tenants[tenantIdx].Spec.SelectFromAllNamespaces {
152-
receivers[fmt.Sprintf("filelog/%s", tenantName)] = receiver.GenerateDefaultKubernetesReceiver(namespaces, cfgInput.Tenants[tenantIdx])
160+
receivers[fmt.Sprintf("filelog/%s", tenantName)] = receiver.GenerateDefaultKubernetesReceiver(namespaces, cfgInput.DryRunMode, cfgInput.Tenants[tenantIdx])
153161
}
154162
}
155163
}
@@ -159,8 +167,11 @@ func (cfgInput *OtelColConfigInput) generateReceivers() map[string]any {
159167

160168
func (cfgInput *OtelColConfigInput) generateConnectors() map[string]any {
161169
connectors := make(map[string]any)
162-
maps.Copy(connectors, connector.GenerateCountConnectors())
163-
maps.Copy(connectors, connector.GenerateBytesConnectors())
170+
171+
if !cfgInput.DryRunMode {
172+
maps.Copy(connectors, connector.GenerateCountConnectors())
173+
maps.Copy(connectors, connector.GenerateBytesConnectors())
174+
}
164175

165176
for _, tenant := range cfgInput.Tenants {
166177
// Generate routing connector for the tenant's subscription if it has any
@@ -193,16 +204,18 @@ func (cfgInput *OtelColConfigInput) generateNamedPipelines() map[string]*otelv1b
193204
namedPipelines := make(map[string]*otelv1beta1.Pipeline)
194205
tenants := []string{}
195206
for tenant := range cfgInput.TenantSubscriptionMap {
196-
namedPipelines[fmt.Sprintf("logs/tenant_%s", tenant)] = pipeline.GenerateRootPipeline(cfgInput.Tenants, tenant)
207+
namedPipelines[fmt.Sprintf("logs/tenant_%s", tenant)] = pipeline.GenerateRootPipeline(cfgInput.Tenants, tenant, cfgInput.DryRunMode)
197208
tenants = append(tenants, tenant)
198209
}
199210

200-
maps.Copy(namedPipelines, pipeline.GenerateMetricsPipelines())
211+
if !cfgInput.DryRunMode {
212+
maps.Copy(namedPipelines, pipeline.GenerateMetricsPipelines())
213+
}
201214

202215
for _, tenant := range tenants {
203216
// Generate a pipeline for the tenant
204217
tenantRootPipeline := fmt.Sprintf("logs/tenant_%s", tenant)
205-
namedPipelines[tenantRootPipeline] = pipeline.GenerateRootPipeline(cfgInput.Tenants, tenant)
218+
namedPipelines[tenantRootPipeline] = pipeline.GenerateRootPipeline(cfgInput.Tenants, tenant, cfgInput.DryRunMode)
206219

207220
connector.GenerateRoutingConnectorForBridgesTenantPipeline(tenant, namedPipelines[tenantRootPipeline], cfgInput.Bridges)
208221
processor.GenerateTransformProcessorForTenantPipeline(tenant, namedPipelines[tenantRootPipeline], cfgInput.Tenants)
@@ -234,24 +247,25 @@ func (cfgInput *OtelColConfigInput) generateNamedPipelines() map[string]*otelv1b
234247

235248
var exporters []string
236249

237-
if output.Output.Spec.OTLPGRPC != nil {
238-
exporters = []string{components.GetExporterNameForOutput(output.Output), outputCountConnectorName, outputBytesConnectorName}
239-
}
240-
241-
if output.Output.Spec.OTLPHTTP != nil {
242-
exporters = []string{components.GetExporterNameForOutput(output.Output), outputCountConnectorName, outputBytesConnectorName}
243-
}
244-
245-
if output.Output.Spec.Fluentforward != nil {
246-
exporters = []string{components.GetExporterNameForOutput(output.Output), outputCountConnectorName, outputBytesConnectorName}
247-
}
248-
249-
if output.Output.Spec.File != nil {
250-
exporters = []string{components.GetExporterNameForOutput(output.Output), outputCountConnectorName, outputBytesConnectorName}
251-
}
252-
253-
if cfgInput.Debug {
254-
exporters = append(exporters, "debug")
250+
// If in dry-run mode, only generate debug exporters
251+
if cfgInput.DryRunMode {
252+
exporters = []string{exporter.DebugExporterID}
253+
} else {
254+
if cfgInput.Debug {
255+
exporters = append(exporters, exporter.DebugExporterID)
256+
}
257+
if output.Output.Spec.OTLPGRPC != nil {
258+
exporters = append(exporters, components.GetExporterNameForOutput(output.Output), outputCountConnectorName, outputBytesConnectorName)
259+
}
260+
if output.Output.Spec.OTLPHTTP != nil {
261+
exporters = append(exporters, components.GetExporterNameForOutput(output.Output), outputCountConnectorName, outputBytesConnectorName)
262+
}
263+
if output.Output.Spec.Fluentforward != nil {
264+
exporters = append(exporters, components.GetExporterNameForOutput(output.Output), outputCountConnectorName, outputBytesConnectorName)
265+
}
266+
if output.Output.Spec.File != nil {
267+
exporters = append(exporters, components.GetExporterNameForOutput(output.Output), outputCountConnectorName, outputBytesConnectorName)
268+
}
255269
}
256270

257271
namedPipelines[outputPipelineName] = pipeline.GeneratePipeline(receivers, processors, exporters)

pkg/resources/otel_conf_gen/otel_conf_gen_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,7 @@ func TestOtelColConfigInput_generateNamedPipelines(t *testing.T) {
466466
{
467467
name: "Single tenant with no subscriptions",
468468
cfgInput: OtelColConfigInput{
469+
DryRunMode: false,
469470
ResourceRelations: components.ResourceRelations{
470471
Bridges: nil,
471472
OutputsWithSecretData: nil,
@@ -486,7 +487,7 @@ func TestOtelColConfigInput_generateNamedPipelines(t *testing.T) {
486487
},
487488
},
488489
expectedPipelines: map[string]*otelv1beta1.Pipeline{
489-
"logs/tenant_tenant1": pipeline.GenerateRootPipeline([]v1alpha1.Tenant{}, "tenant1"),
490+
"logs/tenant_tenant1": pipeline.GenerateRootPipeline([]v1alpha1.Tenant{}, "tenant1", false),
490491
"logs/tenant_tenant1_subscription_ns1_sub1": pipeline.GeneratePipeline(
491492
[]string{"routing/tenant_tenant1_subscriptions"},
492493
[]string{"attributes/subscription_sub1"},

pkg/resources/otel_conf_gen/pipeline/components/receiver/filelog_receiver.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import (
2020
"github.com/kube-logging/telemetry-controller/api/telemetry/v1alpha1"
2121
)
2222

23-
func GenerateDefaultKubernetesReceiver(namespaces []string, tenant v1alpha1.Tenant) map[string]any {
23+
func GenerateDefaultKubernetesReceiver(namespaces []string, dryRunMode bool, tenant v1alpha1.Tenant) map[string]any {
2424
// TODO: fix parser-crio
2525
operators := []map[string]any{
2626
{
@@ -114,7 +114,7 @@ func GenerateDefaultKubernetesReceiver(namespaces []string, tenant v1alpha1.Tena
114114
"max_elapsed_time": 0,
115115
},
116116
}
117-
if tenant.Spec.PersistenceConfig.EnableFileStorage {
117+
if !dryRunMode && tenant.Spec.PersistenceConfig.EnableFileStorage {
118118
k8sReceiver["storage"] = fmt.Sprintf("file_storage/%s", tenant.Name)
119119
}
120120

pkg/resources/otel_conf_gen/pipeline/pipeline.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ func GeneratePipeline(receivers, processors, exporters []string) *otelv1beta1.Pi
3232
}
3333
}
3434

35-
func GenerateRootPipeline(tenants []v1alpha1.Tenant, tenantName string) *otelv1beta1.Pipeline {
36-
tenantCountConnectorName := "count/tenant_metrics"
35+
func GenerateRootPipeline(tenants []v1alpha1.Tenant, tenantName string, dryRunMode bool) *otelv1beta1.Pipeline {
36+
const tenantCountConnectorName = "count/tenant_metrics"
3737
var receiverName string
3838
var exporterName string
3939
for _, tenant := range tenants {
@@ -50,6 +50,10 @@ func GenerateRootPipeline(tenants []v1alpha1.Tenant, tenantName string) *otelv1b
5050
}
5151
}
5252

53+
if dryRunMode {
54+
return GeneratePipeline([]string{receiverName}, []string{"k8sattributes", fmt.Sprintf("attributes/tenant_%s", tenantName), "filter/exclude"}, []string{exporterName})
55+
}
56+
5357
return GeneratePipeline([]string{receiverName}, []string{"k8sattributes", fmt.Sprintf("attributes/tenant_%s", tenantName), "filter/exclude"}, []string{exporterName, tenantCountConnectorName})
5458
}
5559

pkg/sdk/utils/utils.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,18 @@ func ToObject[T client.Object](items []T) []client.Object {
7575
return objects
7676
}
7777

78+
// DerefOrZero returns the value referenced by p, or the zero-value of the type
79+
func DerefOrZero[T any](p *T) T {
80+
return DerefOr(p, *new(T))
81+
}
82+
83+
func DerefOr[T any](p *T, defVal T) T {
84+
if p == nil {
85+
return defVal
86+
}
87+
return *p
88+
}
89+
7890
// NormalizeStringSlice takes a slice of strings, removes duplicates, sorts it, and returns the unique sorted slice.
7991
func NormalizeStringSlice(inputList []string) []string {
8092
allKeys := make(map[string]bool)

0 commit comments

Comments
 (0)