Skip to content

Commit 16dfa35

Browse files
knyarStevenYCChou
authored andcommitted
Add Counter Aggregator (#119)
1 parent f6e3560 commit 16dfa35

File tree

10 files changed

+467
-38
lines changed

10 files changed

+467
-38
lines changed

README.md

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ This drops all series which do not have a `job` label `k8s` and all metrics that
5858

5959
For equality filter on metric name you can use the simpler notation, e.g. `--include='metric_name{label="foo"}'`.
6060

61-
The flag may be repeated to provide several sets of filters, in which case the metric will be forwarded if it matches at least one of them.
61+
The flag may be repeated to provide several sets of filters, in which case the metric will be forwarded if it matches at least one of them. Please note that inclusion filters only apply to Prometheus metrics proxied directly, and do not apply to [aggregated counters](#counter-aggregator).
6262

6363
#### File
6464

@@ -77,6 +77,33 @@ static_metadata:
7777
# - ...
7878
```
7979

80+
#### Counter Aggregator
81+
82+
Counter Aggregator is an advanced feature of the sidecar that can be used to export a sum of multiple Prometheus counters to Stackdriver as a single CUMULATIVE metric.
83+
84+
You might find this useful if you have counter metrics in Prometheus with high cardinality labels (or perhaps just counters exported by a large number of targets) which makes exporting all of them to Stackdriver directly too expensive, however you would like to have a cumulative metric that has the sum of those counters.
85+
86+
Aggregated counters are configured in the `aggregated_counters` block of the configuration file. For example:
87+
88+
```yaml
89+
aggregated_counters:
90+
- metric: network_transmit_bytes
91+
help: total number of bytes sent over eth0
92+
filters:
93+
- 'node_network_transmit_bytes_total{device="eth0"}'
94+
- 'node_network_transmit_bytes{device="eth0"}'
95+
```
96+
97+
In this example, the sidecar will export a new counter `network_transmit_bytes`, which will correspond to the total number of bytes transmitted over 'eth0' interface across all machines monitored by Prometheus. Counter Aggregator keeps track of all counters matching the filters and correctly handles counter resets. Like all internal metrics exported by the sidecar, the aggregated counter is exported using OpenCensus and will be available in Stackdriver as a custom metric (`custom.googleapis.com/opencensus/network_transmit_bytes`).
98+
99+
A list of [Prometheus instant vector selectors](https://prometheus.io/docs/prometheus/latest/querying/basics/#instant-vector-selectors) is expected in the `filters` field. A time series needs to match any of the specified selectors to be included in the aggregated counter.
100+
101+
##### Counter aggregator and inclusion filters
102+
103+
Please note that by default metrics that match one of aggregated counter filters will still be exported to Stackdriver unless you have inclusion filters configured that prevent those metrics from being exported (see `--include`). Using `--include` to prevent a metric from being exported to Stackdriver does not prevent the metric from being covered by aggregated counters.
104+
105+
When using Counter Aggregator you would usually want to configure a restrictive inclusion filter to avoid raw metrics from being exported to Stackdriver.
106+
80107
## Compatibility
81108

82109
The matrix below lists the versions of Prometheus Server and other dependencies that have been qualified to work with releases of `stackdriver-prometheus-sidecar`.

cmd/stackdriver-prometheus-sidecar/main.go

Lines changed: 53 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,12 @@ type fileConfig struct {
163163
Type string `json:"type"`
164164
Help string `json:"help"`
165165
} `json:"static_metadata"`
166+
167+
AggregatedCounters []struct {
168+
Metric string `json:"metric"`
169+
Filters []string `json:"filters"`
170+
Help string `json:"help"`
171+
} `json:"aggregated_counters"`
166172
}
167173

168174
func main() {
@@ -184,6 +190,7 @@ func main() {
184190
listenAddress string
185191
filters []string
186192
filtersets []string
193+
aggregations retrieval.CounterAggregatorConfig
187194
metricRenames map[string]string
188195
staticMetadata []scrape.MetricMetadata
189196
monitoringBackends []string
@@ -254,12 +261,25 @@ func main() {
254261

255262
logger := promlog.New(cfg.logLevel)
256263
if cfg.configFilename != "" {
257-
cfg.metricRenames, cfg.staticMetadata, err = parseConfigFile(cfg.configFilename)
264+
cfg.metricRenames, cfg.staticMetadata, cfg.aggregations, err = parseConfigFile(cfg.configFilename)
258265
if err != nil {
259266
msg := fmt.Sprintf("Parse config file %s", cfg.configFilename)
260267
level.Error(logger).Log("msg", msg, "err", err)
261268
os.Exit(2)
262269
}
270+
271+
// Enable Stackdriver monitoring backend if counter aggregator configuration is present.
272+
if len(cfg.aggregations) > 0 {
273+
sdEnabled := false
274+
for _, backend := range cfg.monitoringBackends {
275+
if backend == "stackdriver" {
276+
sdEnabled = true
277+
}
278+
}
279+
if !sdEnabled {
280+
cfg.monitoringBackends = append(cfg.monitoringBackends, "stackdriver")
281+
}
282+
}
263283
}
264284

265285
level.Info(logger).Log("msg", "Starting Stackdriver Prometheus sidecar", "version", version.Info())
@@ -368,6 +388,16 @@ func main() {
368388
level.Error(logger).Log("msg", "Creating queue manager failed", "err", err)
369389
os.Exit(1)
370390
}
391+
392+
counterAggregator, err := retrieval.NewCounterAggregator(
393+
log.With(logger, "component", "counter_aggregator"),
394+
&cfg.aggregations)
395+
if err != nil {
396+
level.Error(logger).Log("msg", "Creating counter aggregator failed", "err", err)
397+
os.Exit(1)
398+
}
399+
defer counterAggregator.Close()
400+
371401
prometheusReader := retrieval.NewPrometheusReader(
372402
log.With(logger, "component", "Prometheus reader"),
373403
cfg.walDirectory,
@@ -379,6 +409,7 @@ func main() {
379409
queueManager,
380410
cfg.metricsPrefix,
381411
cfg.useGkeResource,
412+
counterAggregator,
382413
)
383414

384415
// Exclude kingpin default flags to expose only Prometheus ones.
@@ -611,14 +642,14 @@ func fillMetadata(staticConfig *map[string]string) {
611642
}
612643
}
613644

614-
func parseConfigFile(filename string) (map[string]string, []scrape.MetricMetadata, error) {
645+
func parseConfigFile(filename string) (map[string]string, []scrape.MetricMetadata, retrieval.CounterAggregatorConfig, error) {
615646
b, err := ioutil.ReadFile(filename)
616647
if err != nil {
617-
return nil, nil, errors.Wrap(err, "reading file")
648+
return nil, nil, nil, errors.Wrap(err, "reading file")
618649
}
619650
var fc fileConfig
620651
if err := yaml.Unmarshal(b, &fc); err != nil {
621-
return nil, nil, errors.Wrap(err, "invalid YAML")
652+
return nil, nil, nil, errors.Wrap(err, "invalid YAML")
622653
}
623654
renameMapping := map[string]string{}
624655
for _, r := range fc.MetricRenames {
@@ -633,13 +664,29 @@ func parseConfigFile(filename string) (map[string]string, []scrape.MetricMetadat
633664
case textparse.MetricTypeCounter, textparse.MetricTypeGauge, textparse.MetricTypeHistogram,
634665
textparse.MetricTypeSummary, textparse.MetricTypeUnknown:
635666
default:
636-
return nil, nil, errors.Errorf("invalid metric type %q", sm.Type)
667+
return nil, nil, nil, errors.Errorf("invalid metric type %q", sm.Type)
637668
}
638669
staticMetadata = append(staticMetadata, scrape.MetricMetadata{
639670
Metric: sm.Metric,
640671
Type: textparse.MetricType(sm.Type),
641672
Help: sm.Help,
642673
})
643674
}
644-
return renameMapping, staticMetadata, nil
675+
676+
aggregations := make(retrieval.CounterAggregatorConfig)
677+
for _, c := range fc.AggregatedCounters {
678+
if _, ok := aggregations[c.Metric]; ok {
679+
return nil, nil, nil, errors.Errorf("duplicate counter aggregator metric %s", c.Metric)
680+
}
681+
a := &retrieval.CounterAggregatorMetricConfig{Help: c.Help}
682+
for _, f := range c.Filters {
683+
matcher, err := promql.ParseMetricSelector(f)
684+
if err != nil {
685+
return nil, nil, nil, errors.Errorf("cannot parse metric selector '%s': %q", f, err)
686+
}
687+
a.Matchers = append(a.Matchers, matcher)
688+
}
689+
aggregations[c.Metric] = a
690+
}
691+
return renameMapping, staticMetadata, aggregations, nil
645692
}

retrieval/aggregator.go

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
// Copyright 2019 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package retrieval
16+
17+
import (
18+
"context"
19+
"math"
20+
21+
"github.com/go-kit/kit/log"
22+
"github.com/go-kit/kit/log/level"
23+
promlabels "github.com/prometheus/prometheus/pkg/labels"
24+
"github.com/prometheus/tsdb/labels"
25+
"go.opencensus.io/stats"
26+
"go.opencensus.io/stats/view"
27+
)
28+
29+
// CounterAggregator provides the 'aggregated counters' feature of the sidecar.
30+
// It can be used to export a sum of multiple counters from Prometheus to
31+
// Stackdriver as a single cumulative metric.
32+
// Each aggregated counter is associated with a single OpenCensus counter that
33+
// can then be exported to Stackdriver (as a CUMULATIVE metric) or exposed to
34+
// Prometheus via the standard `/metrics` endpoint. Regular flushing of counter
35+
// values is implemented by OpenCensus.
36+
type CounterAggregator struct {
37+
logger log.Logger
38+
counters []*aggregatedCounter
39+
statsRecord func(context.Context, ...stats.Measurement) // used in testing.
40+
}
41+
42+
// aggregatedCounter is where CounterAggregator keeps internal state about each
43+
// exported metric: OpenCensus measure and view as well as a list of Matchers that
44+
// define which Prometheus metrics will get aggregated.
45+
type aggregatedCounter struct {
46+
measure *stats.Float64Measure
47+
view *view.View
48+
matchers [][]*promlabels.Matcher
49+
}
50+
51+
// CounterAggregatorConfig contains configuration for CounterAggregator. Keys of the map
52+
// are metric names that will be exported by counter aggregator.
53+
type CounterAggregatorConfig map[string]*CounterAggregatorMetricConfig
54+
55+
// CounterAggregatorMetricConfig provides configuration of a single aggregated counter.
56+
// Matchers specify what Prometheus metrics (which are expected to be counter metrics) will
57+
// be re-aggregated. Help provides a description for the exported metric.
58+
type CounterAggregatorMetricConfig struct {
59+
Matchers [][]*promlabels.Matcher
60+
Help string
61+
}
62+
63+
// counterTracker keeps track of a single time series that has at least one aggregated
64+
// counter associated with it (i.e. there is at least one aggregated counter that has
65+
// Matchers covering this time series). Last timestamp and value are tracked
66+
// to detect counter resets.
67+
type counterTracker struct {
68+
lastTimestamp int64
69+
lastValue float64
70+
measures []*stats.Float64Measure
71+
ca *CounterAggregator
72+
}
73+
74+
// NewCounterAggregator creates a counter aggregator.
75+
func NewCounterAggregator(logger log.Logger, config *CounterAggregatorConfig) (*CounterAggregator, error) {
76+
aggregator := &CounterAggregator{logger: logger, statsRecord: stats.Record}
77+
for metric, cfg := range *config {
78+
measure := stats.Float64(metric, cfg.Help, stats.UnitDimensionless)
79+
v := &view.View{
80+
Name: metric,
81+
Description: cfg.Help,
82+
Measure: measure,
83+
Aggregation: view.Sum(),
84+
}
85+
if err := view.Register(v); err != nil {
86+
return nil, err
87+
}
88+
aggregator.counters = append(aggregator.counters, &aggregatedCounter{
89+
measure: measure,
90+
view: v,
91+
matchers: cfg.Matchers,
92+
})
93+
}
94+
return aggregator, nil
95+
}
96+
97+
// Close must be called when CounterAggregator is no longer needed.
98+
func (c *CounterAggregator) Close() {
99+
for _, counter := range c.counters {
100+
view.Unregister(counter.view)
101+
}
102+
}
103+
104+
// getTracker returns a counterTracker for a specific time series defined by labelset.
105+
// If `nil` is returned, it means that there are no aggregated counters that need to
106+
// be incremented for this time series.
107+
func (c *CounterAggregator) getTracker(lset labels.Labels) *counterTracker {
108+
var measures []*stats.Float64Measure
109+
for _, counter := range c.counters {
110+
if matchFiltersets(lset, counter.matchers) {
111+
measures = append(measures, counter.measure)
112+
}
113+
}
114+
if len(measures) == 0 {
115+
return nil
116+
}
117+
return &counterTracker{measures: measures, ca: c}
118+
}
119+
120+
// newPoint gets called on each new sample (timestamp, value) for time series that need to feed
121+
// values into aggregated counters.
122+
func (t *counterTracker) newPoint(ctx context.Context, lset labels.Labels, ts int64, v float64) {
123+
if math.IsNaN(v) {
124+
level.Debug(t.ca.logger).Log("msg", "got NaN value", "labels", lset, "last ts", t.lastTimestamp, "ts", t, "lastValue", t.lastValue)
125+
return
126+
}
127+
// Ignore measurements that are earlier than last seen timestamp, since they are already covered by
128+
// later values. Samples are coming from TSDB in order, so this is unlikely to happen.
129+
if ts < t.lastTimestamp {
130+
level.Debug(t.ca.logger).Log("msg", "out of order timestamp", "labels", lset, "last ts", t.lastTimestamp, "ts", ts)
131+
return
132+
}
133+
// Use the first value we see as the starting point for the counter.
134+
if t.lastTimestamp == 0 {
135+
level.Debug(t.ca.logger).Log("msg", "first point", "labels", lset)
136+
t.lastTimestamp = ts
137+
t.lastValue = v
138+
return
139+
}
140+
var delta float64
141+
if v < t.lastValue {
142+
// Counter was reset.
143+
delta = v
144+
level.Debug(t.ca.logger).Log("msg", "counter reset", "labels", lset, "value", v, "lastValue", t.lastValue, "delta", delta)
145+
} else {
146+
delta = v - t.lastValue
147+
level.Debug(t.ca.logger).Log("msg", "got delta", "labels", lset, "value", v, "lastValue", t.lastValue, "delta", delta)
148+
}
149+
t.lastTimestamp = ts
150+
t.lastValue = v
151+
if delta == 0 {
152+
return
153+
}
154+
ms := make([]stats.Measurement, len(t.measures))
155+
for i, measure := range t.measures {
156+
ms[i] = measure.M(delta)
157+
}
158+
t.ca.statsRecord(ctx, ms...)
159+
}

0 commit comments

Comments
 (0)