From 08ca57a5830397acf341959c3006474b03c47076 Mon Sep 17 00:00:00 2001 From: Aditya Gupta Date: Thu, 19 Jun 2025 09:26:11 +0100 Subject: [PATCH] feat: Add metric aggregation configuration for hydrating system_labels --- README.md | 1 + collectors/monitoring_collector.go | 39 ++++ collectors/monitoring_collector_test.go | 257 +++++++++++++++++++++++- stackdriver_exporter.go | 67 ++++-- stackdriver_exporter_test.go | 148 +++++++++++++- 5 files changed, 493 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index e10c88cb..e4a4e554 100644 --- a/README.md +++ b/README.md @@ -87,6 +87,7 @@ If you are still using the legacy [Access scopes][access-scopes], the `https://w | `monitoring.metrics-interval` | No | `5m` | Metric's timestamp interval to request from the Google Stackdriver Monitoring Metrics API. Only the most recent data point is used | | `monitoring.metrics-offset` | No | `0s` | Offset (into the past) for the metric's timestamp interval to request from the Google Stackdriver Monitoring Metrics API, to handle latency in published metrics | | `monitoring.filters` | No | | Additonal filters to be sent on the Monitoring API call. Add multiple filters by providing this parameter multiple times. See [monitoring.filters](#using-filters) for more info. | +| `monitoring.metrics-with-aggregations` | No | | Specify metrics with aggregation options in the format: metric_name:alignment_period:cross_series_reducer:group_by_fields:per_series_aligner. Example: custom.googleapis.com/my_metric:60s:REDUCE_SUM:metric.labels.instance_id,resource.labels.zone:ALIGN_MEAN | | `monitoring.aggregate-deltas` | No | | If enabled will treat all DELTA metrics as an in-memory counter instead of a gauge. Be sure to read [what to know about aggregating DELTA metrics](#what-to-know-about-aggregating-delta-metrics) | | `monitoring.aggregate-deltas-ttl` | No | `30m` | How long should a delta metric continue to be exported and stored after GCP stops producing it. Read [slow moving metrics](#slow-moving-metrics) to understand the problem this attempts to solve | | `monitoring.descriptor-cache-ttl` | No | `0s` | How long should the metric descriptors for a prefixed be cached for | diff --git a/collectors/monitoring_collector.go b/collectors/monitoring_collector.go index a2c1543a..659a3a0c 100644 --- a/collectors/monitoring_collector.go +++ b/collectors/monitoring_collector.go @@ -14,6 +14,7 @@ package collectors import ( + "encoding/json" "errors" "fmt" "log/slog" @@ -36,10 +37,19 @@ type MetricFilter struct { FilterQuery string } +type MetricAggregationConfig struct { + TargetedMetricPrefix string + AlignmentPeriod string + CrossSeriesReducer string + GroupByFields []string + PerSeriesAligner string +} + type MonitoringCollector struct { projectID string metricsTypePrefixes []string metricsFilters []MetricFilter + metricsAggregationConfigs []MetricAggregationConfig metricsInterval time.Duration metricsOffset time.Duration metricsIngestDelay bool @@ -66,6 +76,8 @@ type MonitoringCollectorOptions struct { // ExtraFilters is a list of criteria to apply to each corresponding metric prefix query. If one or more are // applicable to a given metric type prefix, they will be 'AND' concatenated. ExtraFilters []MetricFilter + // MetricsWithAggregations is a list of metrics with aggregation options in the format: metric_name:cross_series_reducer:group_by_fields:per_series_aligner. Example: custom.googleapis.com/my_metric:REDUCE_SUM:metric.labels.instance_id,resource.labels.zone:ALIGN_MEAN + MetricAggregationConfigs []MetricAggregationConfig // RequestInterval is the time interval used in each request to get metrics. If there are many data points returned // during this interval, only the latest will be reported. RequestInterval time.Duration @@ -198,6 +210,7 @@ func NewMonitoringCollector(projectID string, monitoringService *monitoring.Serv projectID: projectID, metricsTypePrefixes: opts.MetricTypePrefixes, metricsFilters: opts.ExtraFilters, + metricsAggregationConfigs: opts.MetricAggregationConfigs, metricsInterval: opts.RequestInterval, metricsOffset: opts.RequestOffset, metricsIngestDelay: opts.IngestDelay, @@ -319,6 +332,16 @@ func (c *MonitoringCollector) reportMonitoringMetrics(ch chan<- prometheus.Metri IntervalStartTime(startTime.Format(time.RFC3339Nano)). IntervalEndTime(endTime.Format(time.RFC3339Nano)) + for _, ef := range c.metricsAggregationConfigs { + if strings.HasPrefix(metricDescriptor.Type, ef.TargetedMetricPrefix) { + timeSeriesListCall.AggregationAlignmentPeriod(ef.AlignmentPeriod). + AggregationCrossSeriesReducer(ef.CrossSeriesReducer). + AggregationGroupByFields(ef.GroupByFields...). + AggregationPerSeriesAligner(ef.PerSeriesAligner) + break + } + } + for { c.apiCallsTotalMetric.Inc() page, err := timeSeriesListCall.Do() @@ -452,6 +475,22 @@ func (c *MonitoringCollector) reportTimeSeriesMetrics( } } + // Add the monitored system labels + var systemLabels map[string]string + if timeSeries.Metadata != nil && timeSeries.Metadata.SystemLabels != nil { + err := json.Unmarshal(timeSeries.Metadata.SystemLabels, &systemLabels) + if err != nil { + c.logger.Error("failed to decode SystemLabels", "err", err) + } else { + for key, value := range systemLabels { + if !c.keyExists(labelKeys, key) { + labelKeys = append(labelKeys, key) + labelValues = append(labelValues, value) + } + } + } + } + if c.monitoringDropDelegatedProjects { dropDelegatedProject := false diff --git a/collectors/monitoring_collector_test.go b/collectors/monitoring_collector_test.go index e80fdfbb..db239f3b 100644 --- a/collectors/monitoring_collector_test.go +++ b/collectors/monitoring_collector_test.go @@ -13,16 +13,28 @@ package collectors -import "testing" +import ( + "log/slog" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "google.golang.org/api/monitoring/v3" +) func TestIsGoogleMetric(t *testing.T) { good := []string{ "pubsub.googleapis.com/some/metric", + "compute.googleapis.com/instance/cpu/utilization", + "cloudsql.googleapis.com/database/cpu/utilization", + "storage.googleapis.com/api/request_count", + "custom.googleapis.com/my_metric", } bad := []string{ "my.metric/a/b", "my.metrics/pubsub.googleapis.com/a", + "mycompany.com/metrics/requests", } for _, e := range good { @@ -37,3 +49,246 @@ func TestIsGoogleMetric(t *testing.T) { } } } + +func TestGoogleDescriptorCache(t *testing.T) { + ttl := 1 * time.Second + innerCache := newDescriptorCache(ttl) + cache := &googleDescriptorCache{inner: innerCache} + + googleMetric := "pubsub.googleapis.com/topic/num_undelivered_messages" + customMetric := "custom.googleapis.com/my_metric" + + googleDescriptors := []*monitoring.MetricDescriptor{ + {Type: googleMetric, DisplayName: "Google Metric"}, + } + customDescriptors := []*monitoring.MetricDescriptor{ + {Type: customMetric, DisplayName: "Custom Metric"}, + } + + // Test that Google metrics are cached + cache.Store(googleMetric, googleDescriptors) + result := cache.Lookup(googleMetric) + if result == nil { + t.Error("Google metric should be cached") + } + if len(result) != 1 || result[0].Type != googleMetric { + t.Error("Cached Google metric should match stored value") + } + + // Test that custom.googleapis.com metrics are also cached (they are Google metrics) + cache.Store(customMetric, customDescriptors) + result = cache.Lookup(customMetric) + if result == nil { + t.Error("Custom Google metric should be cached") + } + if len(result) != 1 || result[0].Type != customMetric { + t.Error("Cached custom Google metric should match stored value") + } + + // Test expiration + time.Sleep(2 * ttl) + result = cache.Lookup(googleMetric) + if result != nil { + t.Error("Cached Google metric should have expired") + } +} + +func TestNoopDescriptorCache(t *testing.T) { + cache := &noopDescriptorCache{} + + descriptors := []*monitoring.MetricDescriptor{ + {Type: "test.metric", DisplayName: "Test Metric"}, + } + + // Test that Lookup always returns nil + result := cache.Lookup("any.prefix") + if result != nil { + t.Error("Noop cache should always return nil on lookup") + } + + // Test that Store does nothing (no panic) + cache.Store("any.prefix", descriptors) + result = cache.Lookup("any.prefix") + if result != nil { + t.Error("Noop cache should still return nil after store") + } +} + +func TestNewMonitoringCollector(t *testing.T) { + logger := slog.Default() + monitoringService := &monitoring.Service{} + + tests := []struct { + name string + projectID string + opts MonitoringCollectorOptions + expectError bool + }{ + { + name: "basic collector creation", + projectID: "test-project", + opts: MonitoringCollectorOptions{ + MetricTypePrefixes: []string{"pubsub.googleapis.com"}, + RequestInterval: 5 * time.Minute, + }, + expectError: false, + }, + { + name: "collector with descriptor cache", + projectID: "test-project", + opts: MonitoringCollectorOptions{ + MetricTypePrefixes: []string{"pubsub.googleapis.com"}, + RequestInterval: 5 * time.Minute, + DescriptorCacheTTL: 10 * time.Minute, + DescriptorCacheOnlyGoogle: true, + }, + expectError: false, + }, + { + name: "collector with delta aggregation", + projectID: "test-project", + opts: MonitoringCollectorOptions{ + MetricTypePrefixes: []string{"pubsub.googleapis.com"}, + RequestInterval: 5 * time.Minute, + AggregateDeltas: true, + }, + expectError: false, + }, + { + name: "collector with all options", + projectID: "test-project", + opts: MonitoringCollectorOptions{ + MetricTypePrefixes: []string{"pubsub.googleapis.com"}, + ExtraFilters: []MetricFilter{{TargetedMetricPrefix: "pubsub.googleapis.com", FilterQuery: "resource.labels.topic_id=\"test\""}}, + MetricAggregationConfigs: []MetricAggregationConfig{{TargetedMetricPrefix: "pubsub.googleapis.com", CrossSeriesReducer: "REDUCE_SUM"}}, + RequestInterval: 5 * time.Minute, + RequestOffset: 1 * time.Minute, + IngestDelay: true, + FillMissingLabels: true, + DropDelegatedProjects: true, + AggregateDeltas: true, + DescriptorCacheTTL: 10 * time.Minute, + DescriptorCacheOnlyGoogle: false, + }, + expectError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + collector, err := NewMonitoringCollector(tt.projectID, monitoringService, tt.opts, logger, nil, nil) + + if tt.expectError && err == nil { + t.Error("Expected error but got none") + } + if !tt.expectError && err != nil { + t.Errorf("Unexpected error: %v", err) + } + if err != nil { + return + } + + // Verify basic fields + if collector.projectID != tt.projectID { + t.Errorf("Expected projectID %s, got %s", tt.projectID, collector.projectID) + } + + if len(collector.metricsTypePrefixes) != len(tt.opts.MetricTypePrefixes) { + t.Errorf("Expected %d metric prefixes, got %d", len(tt.opts.MetricTypePrefixes), len(collector.metricsTypePrefixes)) + } + + if collector.metricsInterval != tt.opts.RequestInterval { + t.Errorf("Expected interval %v, got %v", tt.opts.RequestInterval, collector.metricsInterval) + } + + if collector.metricsOffset != tt.opts.RequestOffset { + t.Errorf("Expected offset %v, got %v", tt.opts.RequestOffset, collector.metricsOffset) + } + + if collector.metricsIngestDelay != tt.opts.IngestDelay { + t.Errorf("Expected ingest delay %v, got %v", tt.opts.IngestDelay, collector.metricsIngestDelay) + } + + if collector.collectorFillMissingLabels != tt.opts.FillMissingLabels { + t.Errorf("Expected fill missing labels %v, got %v", tt.opts.FillMissingLabels, collector.collectorFillMissingLabels) + } + + if collector.monitoringDropDelegatedProjects != tt.opts.DropDelegatedProjects { + t.Errorf("Expected drop delegated projects %v, got %v", tt.opts.DropDelegatedProjects, collector.monitoringDropDelegatedProjects) + } + + if collector.aggregateDeltas != tt.opts.AggregateDeltas { + t.Errorf("Expected aggregate deltas %v, got %v", tt.opts.AggregateDeltas, collector.aggregateDeltas) + } + + // Verify descriptor cache type + if tt.opts.DescriptorCacheTTL == 0 { + if _, ok := collector.descriptorCache.(*noopDescriptorCache); !ok { + t.Error("Expected noop descriptor cache when TTL is 0") + } + } else if tt.opts.DescriptorCacheOnlyGoogle { + if _, ok := collector.descriptorCache.(*googleDescriptorCache); !ok { + t.Error("Expected google descriptor cache when only Google is enabled") + } + } else { + if _, ok := collector.descriptorCache.(*descriptorCache); !ok { + t.Error("Expected regular descriptor cache when TTL > 0 and not Google-only") + } + } + + // Verify metrics are created + if collector.apiCallsTotalMetric == nil { + t.Error("Expected apiCallsTotalMetric to be created") + } + if collector.scrapesTotalMetric == nil { + t.Error("Expected scrapesTotalMetric to be created") + } + if collector.scrapeErrorsTotalMetric == nil { + t.Error("Expected scrapeErrorsTotalMetric to be created") + } + if collector.lastScrapeErrorMetric == nil { + t.Error("Expected lastScrapeErrorMetric to be created") + } + if collector.lastScrapeTimestampMetric == nil { + t.Error("Expected lastScrapeTimestampMetric to be created") + } + if collector.lastScrapeDurationSecondsMetric == nil { + t.Error("Expected lastScrapeDurationSecondsMetric to be created") + } + }) + } +} + +func TestMonitoringCollectorDescribe(t *testing.T) { + logger := slog.Default() + monitoringService := &monitoring.Service{} + opts := MonitoringCollectorOptions{ + MetricTypePrefixes: []string{"pubsub.googleapis.com"}, + RequestInterval: 5 * time.Minute, + } + + collector, err := NewMonitoringCollector("test-project", monitoringService, opts, logger, nil, nil) + if err != nil { + t.Fatalf("Failed to create collector: %v", err) + } + + // Create a channel to collect descriptions + ch := make(chan *prometheus.Desc, 10) + + // Call Describe + collector.Describe(ch) + close(ch) + + // Count the descriptions + count := 0 + for range ch { + count++ + } + + // Should have 6 metrics: api_calls_total, scrapes_total, scrape_errors_total, + // last_scrape_error, last_scrape_timestamp, last_scrape_duration_seconds + expectedCount := 6 + if count != expectedCount { + t.Errorf("Expected %d metric descriptions, got %d", expectedCount, count) + } +} diff --git a/stackdriver_exporter.go b/stackdriver_exporter.go index 90c33b86..e798f676 100644 --- a/stackdriver_exporter.go +++ b/stackdriver_exporter.go @@ -126,6 +126,11 @@ var ( "Filters. i.e: pubsub.googleapis.com/subscription:resource.labels.subscription_id=monitoring.regex.full_match(\"my-subs-prefix.*\")", ).Strings() + monitoringMetricsWithAggregations = kingpin.Flag( + "monitoring.metrics-with-aggregations", + "Specify metrics with aggregation options in the format: metric_name:alignment_period:cross_series_reducer:group_by_fields:per_series_aligner. Example: custom.googleapis.com/my_metric:60s:REDUCE_SUM:metric.labels.instance_id,resource.labels.zone:ALIGN_MEAN", + ).Strings() + monitoringMetricsAggregateDeltas = kingpin.Flag( "monitoring.aggregate-deltas", "If enabled will treat all DELTA metrics as an in-memory counter instead of a gauge", ).Default("false").Bool() @@ -185,12 +190,13 @@ type handler struct { handler http.Handler logger *slog.Logger - projectIDs []string - metricsPrefixes []string - metricsExtraFilters []collectors.MetricFilter - additionalGatherer prometheus.Gatherer - m *monitoring.Service - collectors *collectors.CollectorCache + projectIDs []string + metricsPrefixes []string + metricsExtraFilters []collectors.MetricFilter + metricsWithAggregationConfigs []collectors.MetricAggregationConfig + additionalGatherer prometheus.Gatherer + m *monitoring.Service + collectors *collectors.CollectorCache } func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -208,7 +214,7 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { h.handler.ServeHTTP(w, r) } -func newHandler(projectIDs []string, metricPrefixes []string, metricExtraFilters []collectors.MetricFilter, m *monitoring.Service, logger *slog.Logger, additionalGatherer prometheus.Gatherer) *handler { +func newHandler(projectIDs []string, metricPrefixes []string, metricExtraFilters []collectors.MetricFilter, metricsWithAggregationConfigs []collectors.MetricAggregationConfig, m *monitoring.Service, logger *slog.Logger, additionalGatherer prometheus.Gatherer) *handler { var ttl time.Duration // Add collector caching TTL as max of deltas aggregation or descriptor caching if *monitoringMetricsAggregateDeltas || *monitoringDescriptorCacheTTL > 0 { @@ -223,13 +229,14 @@ func newHandler(projectIDs []string, metricPrefixes []string, metricExtraFilters logger.Info("Creating collector cache", "ttl", ttl) h := &handler{ - logger: logger, - projectIDs: projectIDs, - metricsPrefixes: metricPrefixes, - metricsExtraFilters: metricExtraFilters, - additionalGatherer: additionalGatherer, - m: m, - collectors: collectors.NewCollectorCache(ttl), + logger: logger, + projectIDs: projectIDs, + metricsPrefixes: metricPrefixes, + metricsExtraFilters: metricExtraFilters, + metricsWithAggregationConfigs: metricsWithAggregationConfigs, + additionalGatherer: additionalGatherer, + m: m, + collectors: collectors.NewCollectorCache(ttl), } h.handler = h.innerHandler(nil) @@ -247,6 +254,7 @@ func (h *handler) getCollector(project string, filters map[string]bool) (*collec collector, err := collectors.NewMonitoringCollector(project, h.m, collectors.MonitoringCollectorOptions{ MetricTypePrefixes: filterdPrefixes, ExtraFilters: h.metricsExtraFilters, + MetricAggregationConfigs: h.metricsWithAggregationConfigs, RequestInterval: *monitoringMetricsInterval, RequestOffset: *monitoringMetricsOffset, IngestDelay: *monitoringMetricsIngestDelay, @@ -373,24 +381,26 @@ func main() { "build_context", version.BuildContext(), "metric_prefixes", fmt.Sprintf("%v", metricsPrefixes), "extra_filters", strings.Join(*monitoringMetricsExtraFilter, ","), + "aggregations", strings.Join(*monitoringMetricsWithAggregations, ","), "projectIDs", fmt.Sprintf("%v", discoveredProjectIDs), "projectsFilter", *projectsFilter, ) parsedMetricsPrefixes := parseMetricTypePrefixes(metricsPrefixes) metricExtraFilters := parseMetricExtraFilters() + metricsWithAggregations := parseMetricsWithAggregations(logger, *monitoringMetricsWithAggregations) // drop duplicate projects slices.Sort(discoveredProjectIDs) uniqueProjectIds := slices.Compact(discoveredProjectIDs) if *metricsPath == *stackdriverMetricsPath { handler := newHandler( - uniqueProjectIds, parsedMetricsPrefixes, metricExtraFilters, monitoringService, logger, prometheus.DefaultGatherer) + uniqueProjectIds, parsedMetricsPrefixes, metricExtraFilters, metricsWithAggregations, monitoringService, logger, prometheus.DefaultGatherer) http.Handle(*metricsPath, promhttp.InstrumentMetricHandler(prometheus.DefaultRegisterer, handler)) } else { logger.Info("Serving Stackdriver metrics at separate path", "path", *stackdriverMetricsPath) handler := newHandler( - uniqueProjectIds, parsedMetricsPrefixes, metricExtraFilters, monitoringService, logger, nil) + uniqueProjectIds, parsedMetricsPrefixes, metricExtraFilters, metricsWithAggregations, monitoringService, logger, nil) http.Handle(*stackdriverMetricsPath, promhttp.InstrumentMetricHandler(prometheus.DefaultRegisterer, handler)) http.Handle(*metricsPath, promhttp.Handler()) } @@ -468,3 +478,28 @@ func parseMetricExtraFilters() []collectors.MetricFilter { } return extraFilters } + +func parseMetricsWithAggregations(logger *slog.Logger, input []string) []collectors.MetricAggregationConfig { + var configs []collectors.MetricAggregationConfig + + for _, item := range input { + parts := strings.Split(item, ":") + if len(parts) != 5 { + logger.Error("Invalid format for metrics-with-aggregations", "aggregations", item) + continue + } + + groupByFields := strings.Split(parts[3], ",") + + config := collectors.MetricAggregationConfig{ + TargetedMetricPrefix: parts[0], + AlignmentPeriod: parts[1], + CrossSeriesReducer: parts[2], + GroupByFields: groupByFields, + PerSeriesAligner: parts[4], + } + configs = append(configs, config) + } + + return configs +} diff --git a/stackdriver_exporter_test.go b/stackdriver_exporter_test.go index b0e65f0d..69d17eae 100644 --- a/stackdriver_exporter_test.go +++ b/stackdriver_exporter_test.go @@ -13,8 +13,13 @@ package main -import "testing" -import "reflect" +import ( + "log/slog" + "reflect" + "testing" + + "github.com/prometheus-community/stackdriver_exporter/collectors" +) func TestParseMetricTypePrefixes(t *testing.T) { inputPrefixes := []string{ @@ -61,3 +66,142 @@ func TestFilterMetricTypePrefixes(t *testing.T) { t.Errorf("filterMetricTypePrefixes did not produce expected output. Expected:\n%s\nGot:\n%s", expectedOutputPrefixes, outputPrefixes) } } + +func TestParseMetricsWithAggregations(t *testing.T) { + logger := slog.Default() + + tests := []struct { + name string + input []string + expected []collectors.MetricAggregationConfig + }{ + { + name: "valid single aggregation config", + input: []string{ + "custom.googleapis.com/my_metric:60s:REDUCE_SUM:metric.labels.instance_id,resource.labels.zone:ALIGN_MEAN", + }, + expected: []collectors.MetricAggregationConfig{ + { + TargetedMetricPrefix: "custom.googleapis.com/my_metric", + AlignmentPeriod: "60s", + CrossSeriesReducer: "REDUCE_SUM", + GroupByFields: []string{"metric.labels.instance_id", "resource.labels.zone"}, + PerSeriesAligner: "ALIGN_MEAN", + }, + }, + }, + { + name: "valid multiple aggregation configs", + input: []string{ + "custom.googleapis.com/my_metric:60s:REDUCE_SUM:metric.labels.instance_id,resource.labels.zone:ALIGN_MEAN", + "pubsub.googleapis.com/subscription:300s:REDUCE_MEAN:resource.labels.subscription_id:ALIGN_RATE", + }, + expected: []collectors.MetricAggregationConfig{ + { + TargetedMetricPrefix: "custom.googleapis.com/my_metric", + AlignmentPeriod: "60s", + CrossSeriesReducer: "REDUCE_SUM", + GroupByFields: []string{"metric.labels.instance_id", "resource.labels.zone"}, + PerSeriesAligner: "ALIGN_MEAN", + }, + { + TargetedMetricPrefix: "pubsub.googleapis.com/subscription", + AlignmentPeriod: "300s", + CrossSeriesReducer: "REDUCE_MEAN", + GroupByFields: []string{"resource.labels.subscription_id"}, + PerSeriesAligner: "ALIGN_RATE", + }, + }, + }, + { + name: "valid config with single group by field", + input: []string{ + "compute.googleapis.com/instance:60s:REDUCE_SUM:resource.labels.instance_name:ALIGN_MEAN", + }, + expected: []collectors.MetricAggregationConfig{ + { + TargetedMetricPrefix: "compute.googleapis.com/instance", + AlignmentPeriod: "60s", + CrossSeriesReducer: "REDUCE_SUM", + GroupByFields: []string{"resource.labels.instance_name"}, + PerSeriesAligner: "ALIGN_MEAN", + }, + }, + }, + { + name: "valid config with multiple group by fields", + input: []string{ + "compute.googleapis.com/instance:60s:REDUCE_SUM:resource.labels.instance_name,resource.labels.zone,metric.labels.instance_id:ALIGN_MEAN", + }, + expected: []collectors.MetricAggregationConfig{ + { + TargetedMetricPrefix: "compute.googleapis.com/instance", + AlignmentPeriod: "60s", + CrossSeriesReducer: "REDUCE_SUM", + GroupByFields: []string{"resource.labels.instance_name", "resource.labels.zone", "metric.labels.instance_id"}, + PerSeriesAligner: "ALIGN_MEAN", + }, + }, + }, + { + name: "empty input", + input: []string{}, + expected: []collectors.MetricAggregationConfig{}, + }, + { + name: "invalid format - too few parts", + input: []string{ + "custom.googleapis.com/my_metric:60s:REDUCE_SUM", + }, + expected: []collectors.MetricAggregationConfig{}, + }, + { + name: "invalid format - too many parts", + input: []string{ + "custom.googleapis.com/my_metric:60s:REDUCE_SUM:metric.labels.instance_id:ALIGN_MEAN:extra_part", + }, + expected: []collectors.MetricAggregationConfig{}, + }, + { + name: "mixed valid and invalid configs", + input: []string{ + "custom.googleapis.com/my_metric:60s:REDUCE_SUM:metric.labels.instance_id,resource.labels.zone:ALIGN_MEAN", + "invalid_format", + "pubsub.googleapis.com/subscription:300s:REDUCE_MEAN:resource.labels.subscription_id:ALIGN_RATE", + }, + expected: []collectors.MetricAggregationConfig{ + { + TargetedMetricPrefix: "custom.googleapis.com/my_metric", + AlignmentPeriod: "60s", + CrossSeriesReducer: "REDUCE_SUM", + GroupByFields: []string{"metric.labels.instance_id", "resource.labels.zone"}, + PerSeriesAligner: "ALIGN_MEAN", + }, + { + TargetedMetricPrefix: "pubsub.googleapis.com/subscription", + AlignmentPeriod: "300s", + CrossSeriesReducer: "REDUCE_MEAN", + GroupByFields: []string{"resource.labels.subscription_id"}, + PerSeriesAligner: "ALIGN_RATE", + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := parseMetricsWithAggregations(logger, tt.input) + + // For empty expected results, check length instead of using reflect.DeepEqual + if len(tt.expected) == 0 { + if len(result) != 0 { + t.Errorf("parseMetricsWithAggregations() returned %d items, want 0", len(result)) + } + } else { + if !reflect.DeepEqual(result, tt.expected) { + t.Errorf("parseMetricsWithAggregations() = %v, want %v", result, tt.expected) + } + } + }) + } +}