Skip to content

Commit 6de434d

Browse files
knyarStevenYCChou
authored andcommitted
Allow specifying multiple sets of filters. (#113)
This allows specifying multiple sets of filters that time series will be checked against. It also switches to using PromQL native parser for filters. Existing `--filter` flag will keep working, but is now marked as deprecated.
1 parent a264364 commit 6de434d

File tree

38 files changed

+9222
-87
lines changed

38 files changed

+9222
-87
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
# Ignore checkpoint file.
22
stackdriver_sidecar.json
3+
stackdriver-prometheus-sidecar

README.md

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,15 +48,17 @@ stackdriver-prometheus-sidecar --help
4848

4949
#### Filters
5050

51-
The `--filter` flag allows to provide filters which all series have to pass before being sent to Stackdriver. The flag may be repeated to provide several filters. Filters use the same syntax as the well-known PromQL label matchers, e.g.:
51+
The `--include` flag allows to provide filters which all series have to pass before being sent to Stackdriver. Filters use the same syntax as [Prometheus instant vector selectors](https://prometheus.io/docs/prometheus/latest/querying/basics/#instant-vector-selectors), e.g.:
5252

5353
```
54-
stackdriver-prometheus-sidecar --filter=job="k8s" --filter=__name__!~"cadvisor_.+" ...
54+
stackdriver-prometheus-sidecar --include='{__name__!~"cadvisor_.+",job="k8s"}' ...
5555
```
5656

5757
This drops all series which do not have a `job` label `k8s` and all metrics that have a name starting with `cadvisor_`.
5858

59-
Note: On the command-line shell you may need to escape filters as `--filter='job="k8s"'`.
59+
For equality filter on metric name you can use the simpler notation, e.g. `--include='metric_name{label="foo"}'`.
60+
61+
The flag may be repeated to provide several sets of filters, in which case the metric will be forwarded if it matches at least one of them.
6062

6163
#### File
6264

cmd/stackdriver-prometheus-sidecar/main.go

Lines changed: 24 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import (
2525
"os/signal"
2626
"path"
2727
"path/filepath"
28-
"regexp"
2928
"runtime"
3029
"strings"
3130
"syscall"
@@ -52,6 +51,7 @@ import (
5251
"github.com/prometheus/prometheus/config"
5352
"github.com/prometheus/prometheus/pkg/labels"
5453
"github.com/prometheus/prometheus/pkg/textparse"
54+
"github.com/prometheus/prometheus/promql"
5555
"github.com/prometheus/prometheus/scrape"
5656
oc_prometheus "go.opencensus.io/exporter/prometheus"
5757
"go.opencensus.io/plugin/ocgrpc"
@@ -183,6 +183,7 @@ func main() {
183183
prometheusURL *url.URL
184184
listenAddress string
185185
filters []string
186+
filtersets []string
186187
metricRenames map[string]string
187188
staticMetadata []scrape.MetricMetadata
188189
monitoringBackends []string
@@ -236,7 +237,10 @@ func main() {
236237
a.Flag("web.listen-address", "Address to listen on for UI, API, and telemetry.").
237238
Default("0.0.0.0:9091").StringVar(&cfg.listenAddress)
238239

239-
a.Flag("filter", "PromQL-style label matcher which must pass for a series to be forwarded to Stackdriver. May be repeated.").
240+
a.Flag("include", "PromQL metric and label matcher which must pass for a series to be forwarded to Stackdriver. If repeated, the series must pass any of the filter sets to be forwarded.").
241+
StringsVar(&cfg.filtersets)
242+
243+
a.Flag("filter", "PromQL-style matcher for a single label which must pass for a series to be forwarded to Stackdriver. If repeated, the series must pass all filters to be forwarded. Deprecated, please use --include instead.").
240244
StringsVar(&cfg.filters)
241245

242246
promlogflag.AddFlags(a, &cfg.logLevel)
@@ -309,9 +313,9 @@ func main() {
309313
}
310314
}
311315

312-
filters, err := parseFilters(cfg.filters...)
316+
filtersets, err := parseFiltersets(logger, cfg.filtersets, cfg.filters)
313317
if err != nil {
314-
level.Error(logger).Log("msg", "Error parsing filters", "err", err)
318+
level.Error(logger).Log("msg", "Error parsing --include (or --filter)", "err", err)
315319
os.Exit(2)
316320
}
317321

@@ -368,7 +372,7 @@ func main() {
368372
log.With(logger, "component", "Prometheus reader"),
369373
cfg.walDirectory,
370374
tailer,
371-
filters,
375+
filtersets,
372376
cfg.metricRenames,
373377
retrieval.TargetsWithDiscoveredLabels(targetCache, labels.FromMap(staticLabels)),
374378
metadataCache,
@@ -550,32 +554,25 @@ func waitForPrometheus(ctx context.Context, logger log.Logger, promURL *url.URL)
550554
}
551555
}
552556

553-
// parseFilters parses a list of strings that contain PromQL-style label matchers and
557+
// parseFiltersets parses two flags that contain PromQL-style metric/label selectors and
554558
// returns a list of the resulting matchers.
555-
func parseFilters(strs ...string) (matchers []*labels.Matcher, err error) {
556-
pattern := regexp.MustCompile(`^([a-zA-Z0-9_]+)(=|!=|=~|!~)"(.+)"$`)
557-
558-
for _, s := range strs {
559-
parts := pattern.FindStringSubmatch(s)
560-
if len(parts) != 4 {
561-
return nil, fmt.Errorf("invalid filter %q", s)
562-
}
563-
var matcherType labels.MatchType
564-
switch parts[2] {
565-
case "=":
566-
matcherType = labels.MatchEqual
567-
case "!=":
568-
matcherType = labels.MatchNotEqual
569-
case "=~":
570-
matcherType = labels.MatchRegexp
571-
case "!~":
572-
matcherType = labels.MatchNotRegexp
559+
func parseFiltersets(logger log.Logger, filtersets, filters []string) ([][]*labels.Matcher, error) {
560+
var matchers [][]*labels.Matcher
561+
if len(filters) > 0 {
562+
level.Warn(logger).Log("msg", "--filter is deprecated; please use --include instead")
563+
f := fmt.Sprintf("{%s}", strings.Join(filters, ","))
564+
m, err := promql.ParseMetricSelector(f)
565+
if err != nil {
566+
return nil, errors.Errorf("cannot parse --filter flag (metric filter '%s'): %q", f, err)
573567
}
574-
matcher, err := labels.NewMatcher(matcherType, parts[1], parts[3])
568+
matchers = append(matchers, m)
569+
}
570+
for _, f := range filtersets {
571+
m, err := promql.ParseMetricSelector(f)
575572
if err != nil {
576-
return nil, fmt.Errorf("invalid filter %q: %s", s, err)
573+
return nil, errors.Errorf("cannot parse --include flag '%s': %q", f, err)
577574
}
578-
matchers = append(matchers, matcher)
575+
matchers = append(matchers, m)
579576
}
580577
return matchers, nil
581578
}

cmd/stackdriver-prometheus-sidecar/main_test.go

Lines changed: 53 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import (
2323
"path/filepath"
2424
"testing"
2525
"time"
26+
27+
"github.com/go-kit/kit/log"
2628
)
2729

2830
var promPath string
@@ -114,40 +116,58 @@ Loop:
114116
}
115117
}
116118

117-
func TestParseFilters(t *testing.T) {
118-
input := []string{
119-
`__name__="test1"`,
120-
`a1=~"test2.+"`,
121-
`a2!="test3"`,
122-
`a3!~"test4.*"`,
123-
}
124-
// Test success cases.
125-
filters, err := parseFilters(input...)
126-
if err != nil {
127-
t.Fatal(err)
128-
}
129-
// We verify by comparing the serializiation produced by the parsed matchers again.
130-
// Deep equal comparison doesn't work since some fields deep down in the produced regexes differ
131-
// even though everything being equal semantically.
132-
if len(filters) != len(input) {
133-
t.Fatalf("unexpected result length %d", len(filters))
134-
}
135-
for i, f := range filters {
136-
if f.String() != input[i] {
137-
t.Fatalf("unexpected parsed filter %v, want %v", f, input[i])
138-
}
119+
func TestParseWhitelists(t *testing.T) {
120+
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
121+
for _, tt := range []struct {
122+
name string
123+
filtersets []string
124+
filters []string
125+
wantMatchers int
126+
}{
127+
{"both filters and filtersets defined",
128+
[]string{
129+
`metric_name`,
130+
`metric_name{label="value"}`,
131+
},
132+
[]string{
133+
`__name__="test1"`,
134+
`a1=~"test2.+"`,
135+
`a2!="test3"`,
136+
`a3!~"test4.*"`,
137+
}, 3},
138+
{"just filtersets", []string{"metric_name"}, []string{}, 1},
139+
{"just filters", []string{}, []string{`__name__="foo"`}, 1},
140+
{"neither filtersets nor filters", []string{}, []string{}, 0},
141+
} {
142+
t.Run(tt.name, func(t *testing.T) {
143+
// Test success cases.
144+
parsed, err := parseFiltersets(logger, tt.filtersets, tt.filters)
145+
if err != nil {
146+
t.Fatal(err)
147+
}
148+
if len(parsed) != tt.wantMatchers {
149+
t.Fatalf("expected %d matchers; got %d", tt.wantMatchers, len(parsed))
150+
}
151+
})
139152
}
153+
140154
// Test failure cases.
141-
cases := []string{
142-
`a-b="1"`, // Invalid character in key.
143-
`a="1`, // Missing trailing quote.
144-
`a=1"`, // Missing leading quote.
145-
`a!=="1"`, // Invalid operator.
146-
}
147-
for _, c := range cases {
148-
if _, err := parseFilters(c); err == nil {
149-
t.Fatalf("expected error for %q but got none", c)
150-
}
155+
for _, tt := range []struct {
156+
name string
157+
filtersets []string
158+
filters []string
159+
}{
160+
{"Invalid character in key", []string{}, []string{`a-b="1"`}},
161+
{"Missing trailing quote", []string{}, []string{`a="1`}},
162+
{"Missing leading quote", []string{}, []string{`a=1"`}},
163+
{"Invalid operator", []string{}, []string{`a!=="1"`}},
164+
{"Invalid operator in filterset", []string{`{a!=="1"}`}, []string{}},
165+
{"Empty filterset", []string{""}, []string{}},
166+
} {
167+
t.Run(tt.name, func(t *testing.T) {
168+
if _, err := parseFiltersets(logger, tt.filtersets, tt.filters); err == nil {
169+
t.Fatalf("expected error, but got none")
170+
}
171+
})
151172
}
152-
153173
}

retrieval/manager.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func NewPrometheusReader(
8080
logger log.Logger,
8181
walDirectory string,
8282
tailer *tail.Tailer,
83-
filters []*labels.Matcher,
83+
filtersets [][]*labels.Matcher,
8484
metricRenames map[string]string,
8585
targetGetter TargetGetter,
8686
metadataGetter MetadataGetter,
@@ -95,7 +95,7 @@ func NewPrometheusReader(
9595
appender: appender,
9696
logger: logger,
9797
tailer: tailer,
98-
filters: filters,
98+
filtersets: filtersets,
9999
walDirectory: walDirectory,
100100
targetGetter: targetGetter,
101101
metadataGetter: metadataGetter,
@@ -110,7 +110,7 @@ type PrometheusReader struct {
110110
logger log.Logger
111111
walDirectory string
112112
tailer *tail.Tailer
113-
filters []*labels.Matcher
113+
filtersets [][]*labels.Matcher
114114
metricRenames map[string]string
115115
targetGetter TargetGetter
116116
metadataGetter MetadataGetter
@@ -152,7 +152,7 @@ func (r *PrometheusReader) Run(ctx context.Context, startOffset int) error {
152152
seriesCache := newSeriesCache(
153153
r.logger,
154154
r.walDirectory,
155-
r.filters,
155+
r.filtersets,
156156
r.metricRenames,
157157
r.targetGetter,
158158
r.metadataGetter,

retrieval/series_cache.go

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ type seriesGetter interface {
7575
type seriesCache struct {
7676
logger log.Logger
7777
dir string
78-
filters []*promlabels.Matcher
78+
filtersets [][]*promlabels.Matcher
7979
targets TargetGetter
8080
metadata MetadataGetter
8181
resourceMaps []ResourceMap
@@ -129,7 +129,7 @@ func (e *seriesCacheEntry) shouldRefresh() bool {
129129
func newSeriesCache(
130130
logger log.Logger,
131131
dir string,
132-
filters []*promlabels.Matcher,
132+
filtersets [][]*promlabels.Matcher,
133133
renames map[string]string,
134134
targets TargetGetter,
135135
metadata MetadataGetter,
@@ -143,7 +143,7 @@ func newSeriesCache(
143143
return &seriesCache{
144144
logger: logger,
145145
dir: dir,
146-
filters: filters,
146+
filtersets: filtersets,
147147
targets: targets,
148148
metadata: metadata,
149149
resourceMaps: resourceMaps,
@@ -303,10 +303,8 @@ func (c *seriesCache) getResetAdjusted(ref uint64, t int64, v float64) (int64, f
303303
// set the label set for the given reference.
304304
// maxSegment indicates the the highest segment at which the series was possibly defined.
305305
func (c *seriesCache) set(ctx context.Context, ref uint64, lset labels.Labels, maxSegment int) error {
306-
for _, f := range c.filters {
307-
if v := lset.Get(f.Name); !f.Matches(v) {
308-
return nil
309-
}
306+
if c.filtersets != nil && !matchFiltersets(lset, c.filtersets) {
307+
return nil
310308
}
311309
c.mtx.Lock()
312310
c.entries[ref] = &seriesCacheEntry{
@@ -470,3 +468,24 @@ func (c *seriesCache) getResource(discovered, final promlabels.Labels) (*monitor
470468
}
471469
return nil, false
472470
}
471+
472+
// matchFiltersets checks whether any of the supplied filtersets passes.
473+
func matchFiltersets(lset labels.Labels, filtersets [][]*promlabels.Matcher) bool {
474+
for _, fs := range filtersets {
475+
if matchFilterset(lset, fs) {
476+
return true
477+
}
478+
}
479+
return false
480+
}
481+
482+
// matchFilterset checks whether labels match a given list of label matchers.
483+
// All matchers need to match for the function to return true.
484+
func matchFilterset(lset labels.Labels, filterset []*promlabels.Matcher) bool {
485+
for _, matcher := range filterset {
486+
if !matcher.Matches(lset.Get(matcher.Name)) {
487+
return false
488+
}
489+
}
490+
return true
491+
}

retrieval/series_cache_test.go

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -416,30 +416,37 @@ func TestSeriesCache_Filter(t *testing.T) {
416416
}
417417
}()
418418
logger := log.NewLogfmtLogger(logBuffer)
419-
c := newSeriesCache(logger, "", []*promlabels.Matcher{
420-
&promlabels.Matcher{Type: promlabels.MatchEqual, Name: "a", Value: "a1"},
421-
&promlabels.Matcher{Type: promlabels.MatchEqual, Name: "b", Value: "b1"},
419+
c := newSeriesCache(logger, "", [][]*promlabels.Matcher{
420+
{
421+
&promlabels.Matcher{Type: promlabels.MatchEqual, Name: "a", Value: "a1"},
422+
&promlabels.Matcher{Type: promlabels.MatchEqual, Name: "b", Value: "b1"},
423+
},
424+
{&promlabels.Matcher{Type: promlabels.MatchEqual, Name: "c", Value: "c1"}},
422425
}, nil, targetMap, metadataMap, resourceMaps, "", false)
423426

424427
ctx, cancel := context.WithCancel(context.Background())
425428
defer cancel()
426429

427-
// Test base case of metric that passes all filters. This primarily
428-
// ensures that our setup is correct and metrics aren't dropped for reasons
429-
// other than the filter.
430-
err := c.set(ctx, 1, labels.FromStrings("__name__", "metric1", "job", "job1", "instance", "inst1", "a", "a1", "b", "b1"), 1)
431-
if err != nil {
432-
t.Fatal(err)
430+
// Test that metrics that pass a single filterset do not get dropped.
431+
lsets := []labels.Labels{
432+
labels.FromStrings("__name__", "metric1", "job", "job1", "instance", "inst1", "a", "a1", "b", "b1"),
433+
labels.FromStrings("__name__", "metric1", "job", "job1", "instance", "inst1", "c", "c1"),
433434
}
434-
if _, ok, err := c.get(ctx, 1); !ok || err != nil {
435-
t.Fatalf("metric not found: %s", err)
435+
for idx, lset := range lsets {
436+
err := c.set(ctx, uint64(idx), lset, 1)
437+
if err != nil {
438+
t.Fatal(err)
439+
}
440+
if _, ok, err := c.get(ctx, uint64(idx)); !ok || err != nil {
441+
t.Fatalf("metric not found: %s", err)
442+
}
436443
}
437444
// Test filtered metric.
438-
err = c.set(ctx, 2, labels.FromStrings("__name__", "metric1", "job", "job1", "instance", "inst1", "a", "a1", "b", "b2"), 1)
445+
err := c.set(ctx, 100, labels.FromStrings("__name__", "metric1", "job", "job1", "instance", "inst1", "a", "a1", "b", "b2"), 1)
439446
if err != nil {
440447
t.Fatal(err)
441448
}
442-
if _, ok, err := c.get(ctx, 2); err != nil {
449+
if _, ok, err := c.get(ctx, 100); err != nil {
443450
t.Fatalf("error retrieving metric: %s", err)
444451
} else if ok {
445452
t.Fatalf("metric was not filtered")

0 commit comments

Comments
 (0)