Skip to content
This repository was archived by the owner on Mar 18, 2025. It is now read-only.

Commit 7de5db5

Browse files
committed
Configurable Trend stats mapping
1 parent bcf82f6 commit 7de5db5

File tree

7 files changed

+240
-31
lines changed

7 files changed

+240
-31
lines changed

pkg/remotewrite/config.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ const (
2121
defaultMetricPrefix = "k6_"
2222
)
2323

24+
var defaultTrendStats = []string{"p(99)"}
25+
2426
type Config struct {
2527
// URL contains the absolute URL for the Write endpoint where to flush the time series.
2628
URL null.String `json:"url" envconfig:"K6_PROMETHEUS_REMOTE_URL"`
@@ -44,6 +46,11 @@ type Config struct {
4446
// TrendAsNativeHistogram defines if the mapping for metrics defined as Trend type
4547
// should map to a Prometheus' Native Histogram.
4648
TrendAsNativeHistogram null.Bool `json:"trendAsNativeHistogram" envconfig:"K6_PROMETHEUS_TREND_AS_NATIVE_HISTOGRAM"`
49+
50+
// TrendStats defines the stats to flush for Trend metrics.
51+
//
52+
// TODO: should we support K6_SUMMARY_TREND_STATS?
53+
TrendStats []string `json:"trendStats" envconfig:"K6_PROMETHEUS_TREND_STATS"`
4754
}
4855

4956
// NewConfig creates an Output's configuration.
@@ -55,6 +62,7 @@ func NewConfig() Config {
5562
Password: null.NewString("", false),
5663
PushInterval: types.NullDurationFrom(defaultPushInterval),
5764
Headers: make(map[string]string),
65+
TrendStats: defaultTrendStats,
5866
}
5967
}
6068

@@ -117,6 +125,11 @@ func (base Config) Apply(applied Config) Config {
117125
}
118126
}
119127

128+
if len(applied.TrendStats) > 0 {
129+
base.TrendStats = make([]string, len(applied.TrendStats))
130+
copy(base.TrendStats, applied.TrendStats)
131+
}
132+
120133
return base
121134
}
122135

@@ -222,6 +235,11 @@ func parseEnvs(env map[string]string) (Config, error) {
222235
c.TrendAsNativeHistogram = b
223236
}
224237
}
238+
239+
if trendStats, trendStatsDefined := env["K6_PROMETHEUS_TREND_STATS"]; trendStatsDefined {
240+
c.TrendStats = strings.Split(trendStats, ",")
241+
}
242+
225243
return c, nil
226244
}
227245

@@ -262,6 +280,16 @@ func parseArg(text string) (Config, error) {
262280
if err := c.TrendAsNativeHistogram.UnmarshalText([]byte(v)); err != nil {
263281
return c, fmt.Errorf("trendAsNativeHistogram value must be true or false, not %q", v)
264282
}
283+
284+
// TODO: add the support for trendStats
285+
// strvals doesn't support the same format used by --summary-trend-stats
286+
// using the comma as the separator, because it is already used for
287+
// dividing the keys.
288+
//
289+
//if v, ok := params["trendStats"].(string); ok && len(v) > 0 {
290+
//c.TrendStats = strings.Split(v, ",")
291+
//}
292+
265293
default:
266294
if !strings.HasPrefix(key, "headers.") {
267295
return c, fmt.Errorf("%q is an unknown option's key", r[0])

pkg/remotewrite/config_test.go

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616
"gopkg.in/guregu/null.v3"
1717
)
1818

19-
func TestConifgApply(t *testing.T) {
19+
func TestConfigApply(t *testing.T) {
2020
t.Parallel()
2121

2222
fullConfig := Config{
@@ -28,6 +28,7 @@ func TestConifgApply(t *testing.T) {
2828
Headers: map[string]string{
2929
"X-Header": "value",
3030
},
31+
TrendStats: []string{"p(99)"},
3132
}
3233

3334
// Defaults should be overwritten by valid values
@@ -103,6 +104,7 @@ func TestGetConsolidatedConfig(t *testing.T) {
103104
Password: null.NewString("", false),
104105
PushInterval: types.NullDurationFrom(5 * time.Second),
105106
Headers: make(map[string]string),
107+
TrendStats: []string{"p(99)"},
106108
},
107109
},
108110
"JSONSuccess": {
@@ -114,6 +116,7 @@ func TestGetConsolidatedConfig(t *testing.T) {
114116
Password: null.NewString("", false),
115117
PushInterval: types.NullDurationFrom(defaultPushInterval),
116118
Headers: make(map[string]string),
119+
TrendStats: []string{"p(99)"},
117120
},
118121
},
119122
"MixedSuccess": {
@@ -130,6 +133,7 @@ func TestGetConsolidatedConfig(t *testing.T) {
130133
Password: null.NewString("", false),
131134
PushInterval: types.NullDurationFrom(defaultPushInterval),
132135
Headers: make(map[string]string),
136+
TrendStats: []string{"p(99)"},
133137
},
134138
},
135139
"OrderOfPrecedence": {
@@ -146,6 +150,7 @@ func TestGetConsolidatedConfig(t *testing.T) {
146150
Password: null.StringFrom("env"),
147151
PushInterval: types.NullDurationFrom(defaultPushInterval),
148152
Headers: make(map[string]string),
153+
TrendStats: []string{"p(99)"},
149154
},
150155
},
151156
"InvalidJSON": {
@@ -214,8 +219,8 @@ func TestOptionURL(t *testing.T) {
214219
Password: null.NewString("", false),
215220
PushInterval: types.NullDurationFrom(5 * time.Second),
216221
Headers: make(map[string]string),
222+
TrendStats: []string{"p(99)"},
217223
}
218-
219224
for name, tc := range cases {
220225
tc := tc
221226
t.Run(name, func(t *testing.T) {
@@ -248,6 +253,7 @@ func TestOptionHeaders(t *testing.T) {
248253
"X-MY-HEADER1": "hval1",
249254
"X-MY-HEADER2": "hval2",
250255
},
256+
TrendStats: []string{"p(99)"},
251257
}
252258
for name, tc := range cases {
253259
tc := tc
@@ -278,6 +284,7 @@ func TestOptionInsecureSkipTLSVerify(t *testing.T) {
278284
InsecureSkipTLSVerify: null.BoolFrom(false),
279285
PushInterval: types.NullDurationFrom(defaultPushInterval),
280286
Headers: make(map[string]string),
287+
TrendStats: []string{"p(99)"},
281288
}
282289
for name, tc := range cases {
283290
tc := tc
@@ -310,6 +317,7 @@ func TestOptionBasicAuth(t *testing.T) {
310317
Password: null.StringFrom("pass1"),
311318
PushInterval: types.NullDurationFrom(5 * time.Second),
312319
Headers: make(map[string]string),
320+
TrendStats: []string{"p(99)"},
313321
}
314322

315323
for name, tc := range cases {
@@ -344,6 +352,7 @@ func TestOptionTrendAsNativeHistogram(t *testing.T) {
344352
PushInterval: types.NullDurationFrom(5 * time.Second),
345353
Headers: make(map[string]string),
346354
TrendAsNativeHistogram: null.BoolFrom(true),
355+
TrendStats: []string{"p(99)"},
347356
}
348357

349358
for name, tc := range cases {
@@ -377,6 +386,40 @@ func TestOptionPushInterval(t *testing.T) {
377386
Password: null.NewString("", false),
378387
PushInterval: types.NullDurationFrom((1 * time.Minute) + (2 * time.Second)),
379388
Headers: make(map[string]string),
389+
TrendStats: []string{"p(99)"},
390+
}
391+
392+
for name, tc := range cases {
393+
tc := tc
394+
t.Run(name, func(t *testing.T) {
395+
c, err := GetConsolidatedConfig(
396+
tc.jsonRaw, tc.env, tc.arg)
397+
require.NoError(t, err)
398+
assert.Equal(t, expconfig, c)
399+
})
400+
}
401+
}
402+
403+
func TestConfigTrendStats(t *testing.T) {
404+
t.Parallel()
405+
406+
cases := map[string]struct {
407+
arg string
408+
env map[string]string
409+
jsonRaw json.RawMessage
410+
}{
411+
"JSON": {jsonRaw: json.RawMessage(`{"trendStats":["max","p(95)"]}`)},
412+
"Env": {env: map[string]string{"K6_PROMETHEUS_TREND_STATS": "max,p(95)"}},
413+
// TODO: support arg, check the comment in the code
414+
//"Arg": {arg: "trendStats=max,p(95)"},
415+
}
416+
417+
expconfig := Config{
418+
URL: null.StringFrom("http://localhost:9090/api/v1/write"),
419+
InsecureSkipTLSVerify: null.BoolFrom(true),
420+
PushInterval: types.NullDurationFrom(5 * time.Second),
421+
Headers: make(map[string]string),
422+
TrendStats: []string{"max", "p(95)"},
380423
}
381424

382425
for name, tc := range cases {

pkg/remotewrite/prometheus_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package remotewrite
22

33
import (
4+
"sort"
45
"testing"
56
"time"
67

@@ -62,3 +63,12 @@ func assertTimeSeriesEqual(t *testing.T, expected []*prompb.TimeSeries, actual [
6263
assert.Equal(t, expected[i], actual[i])
6364
}
6465
}
66+
67+
// sortByLabelName sorts a slice of time series by Name label.
68+
//
69+
// TODO: remove the assumption that Name label is the first.
70+
func sortByNameLabel(s []*prompb.TimeSeries) {
71+
sort.Slice(s, func(i, j int) bool {
72+
return s[i].Labels[0].Value <= s[j].Labels[0].Value
73+
})
74+
}

pkg/remotewrite/remotewrite.go

Lines changed: 69 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package remotewrite
33
import (
44
"context"
55
"fmt"
6+
"strings"
67
"time"
78

89
"github.com/grafana/xk6-output-prometheus-remote/pkg/remote"
@@ -19,10 +20,11 @@ var _ output.Output = new(Output)
1920
type Output struct {
2021
output.SampleBuffer
2122

22-
config Config
23-
logger logrus.FieldLogger
24-
periodicFlusher *output.PeriodicFlusher
25-
tsdb map[metrics.TimeSeries]*seriesWithMeasure
23+
config Config
24+
logger logrus.FieldLogger
25+
periodicFlusher *output.PeriodicFlusher
26+
tsdb map[metrics.TimeSeries]*seriesWithMeasure
27+
trendStatsResolver map[string]func(*metrics.TrendSink) float64
2628

2729
// TODO: copy the prometheus/remote.WriteClient interface and depend on it
2830
client *remote.WriteClient
@@ -46,12 +48,19 @@ func New(params output.Params) (*Output, error) {
4648
return nil, fmt.Errorf("failed to initialize the Prometheus remote write client: %w", err)
4749
}
4850

49-
return &Output{
51+
o := &Output{
5052
client: wc,
5153
config: config,
5254
logger: logger,
5355
tsdb: make(map[metrics.TimeSeries]*seriesWithMeasure),
54-
}, nil
56+
}
57+
58+
if len(config.TrendStats) > 0 {
59+
if err := o.setTrendStatsResolver(config.TrendStats); err != nil {
60+
return nil, err
61+
}
62+
}
63+
return o, nil
5564
}
5665

5766
func (o *Output) Description() string {
@@ -76,6 +85,50 @@ func (o *Output) Stop() error {
7685
return nil
7786
}
7887

88+
// setTrendStatsResolver sets the resolver for the Trend stats.
89+
//
90+
// TODO: refactor, the code can be improved
91+
func (o *Output) setTrendStatsResolver(trendStats []string) error {
92+
var trendStatsCopy []string
93+
hasSum := false
94+
// copy excluding sum
95+
for _, stat := range trendStats {
96+
if stat == "sum" {
97+
hasSum = true
98+
continue
99+
}
100+
trendStatsCopy = append(trendStatsCopy, stat)
101+
}
102+
resolvers, err := metrics.GetResolversForTrendColumns(trendStatsCopy)
103+
if err != nil {
104+
return err
105+
}
106+
// sum is not supported from GetResolversForTrendColumns
107+
// so if it has been requested
108+
// it adds it specifically
109+
if hasSum {
110+
resolvers["sum"] = func(t *metrics.TrendSink) float64 {
111+
return t.Sum
112+
}
113+
}
114+
o.trendStatsResolver = make(TrendStatsResolver, len(resolvers))
115+
for stat, fn := range resolvers {
116+
statKey := stat
117+
118+
// the config passes percentiles with p(x) form, for example p(95),
119+
// but the mapping generates series name in the form p95.
120+
//
121+
// TODO: maybe decoupling mapping from the stat resolver keys?
122+
if strings.HasPrefix(statKey, "p(") {
123+
statKey = stat[2 : len(statKey)-1] // trim the parenthesis
124+
statKey = strings.ReplaceAll(statKey, ".", "") // remove dots, p(0.95) => p095
125+
statKey = "p" + statKey
126+
}
127+
o.trendStatsResolver[statKey] = fn
128+
}
129+
return nil
130+
}
131+
79132
func (o *Output) flush() {
80133
var (
81134
start = time.Now()
@@ -138,7 +191,8 @@ func (o *Output) convertToPbSeries(samplesContainers []metrics.SampleContainer)
138191
truncTime := sample.Time.Truncate(time.Millisecond)
139192
swm, ok := o.tsdb[sample.TimeSeries]
140193
if !ok {
141-
swm = newSeriesWithMeasure(sample.TimeSeries, o.config.TrendAsNativeHistogram.Bool)
194+
// TODO: encapsulate the trend arguments into a Trend Mapping factory
195+
swm = newSeriesWithMeasure(sample.TimeSeries, o.config.TrendAsNativeHistogram.Bool, o.trendStatsResolver)
142196
swm.Latest = truncTime
143197
o.tsdb[sample.TimeSeries] = swm
144198
seen[sample.TimeSeries] = struct{}{}
@@ -248,18 +302,24 @@ type prompbMapper interface {
248302
MapPrompb(series metrics.TimeSeries, t time.Time) []*prompb.TimeSeries
249303
}
250304

251-
func newSeriesWithMeasure(series metrics.TimeSeries, trendAsNativeHistogram bool) *seriesWithMeasure {
305+
func newSeriesWithMeasure(series metrics.TimeSeries, trendAsNativeHistogram bool, tsr TrendStatsResolver) *seriesWithMeasure {
252306
var sink metrics.Sink
253307
switch series.Metric.Type {
254308
case metrics.Counter:
255309
sink = &metrics.CounterSink{}
256310
case metrics.Gauge:
257311
sink = &metrics.GaugeSink{}
258312
case metrics.Trend:
313+
// TODO: refactor encapsulating in a factory method
259314
if trendAsNativeHistogram {
260315
sink = newNativeHistogramSink(series.Metric)
261316
} else {
262-
sink = newExtendedTrendSink()
317+
var err error
318+
sink, err = newExtendedTrendSink(tsr)
319+
if err != nil {
320+
// the resolver must be already validated
321+
panic(err)
322+
}
263323
}
264324
case metrics.Rate:
265325
sink = &metrics.RateSink{}

0 commit comments

Comments
 (0)