Skip to content
This repository was archived by the owner on Mar 18, 2025. It is now read-only.

Commit 43e6aab

Browse files
committed
Stale marking process disabled by default
1 parent e4d9ee5 commit 43e6aab

File tree

5 files changed

+156
-16
lines changed

5 files changed

+156
-16
lines changed

pkg/remote/client_test.go

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,22 @@ package remote
33
import (
44
"context"
55
"io"
6+
"math"
67
"net/http"
78
"net/http/httptest"
89
"net/url"
910
"testing"
1011
"time"
1112

13+
"github.com/golang/snappy"
14+
"github.com/grafana/xk6-output-prometheus-remote/pkg/stale"
1215
"github.com/stretchr/testify/assert"
1316
"github.com/stretchr/testify/require"
1417
prompb "go.buf.build/grpc/go/prometheus/prometheus"
18+
"google.golang.org/protobuf/proto"
1519
)
1620

17-
func TestNewWrtiteClient(t *testing.T) {
21+
func TestNewWriteClient(t *testing.T) {
1822
t.Parallel()
1923
t.Run("DefaultConfig", func(t *testing.T) {
2024
t.Parallel()
@@ -172,6 +176,37 @@ func TestNewWriteRequestBody(t *testing.T) {
172176
assert.Contains(t, string(b), `label1`)
173177
}
174178

179+
func TestNewWriteRequestBodyWithStaleMarker(t *testing.T) {
180+
t.Parallel()
181+
182+
timestamp := time.Date(2022, time.December, 15, 11, 41, 18, 123, time.UTC)
183+
184+
ts := []*prompb.TimeSeries{
185+
{
186+
Labels: []*prompb.Label{{Name: "label1", Value: "val1"}},
187+
Samples: []*prompb.Sample{{
188+
Value: stale.Marker,
189+
Timestamp: timestamp.UnixMilli(),
190+
}},
191+
},
192+
}
193+
b, err := newWriteRequestBody(ts)
194+
require.NoError(t, err)
195+
require.NotEmpty(t, b)
196+
197+
sb, err := snappy.Decode(nil, b)
198+
require.NoError(t, err)
199+
200+
var series prompb.WriteRequest
201+
err = proto.Unmarshal(sb, &series)
202+
require.NoError(t, err)
203+
require.NotEmpty(t, series.Timeseries[0])
204+
require.NotEmpty(t, series.Timeseries[0].Samples)
205+
206+
assert.True(t, math.IsNaN(series.Timeseries[0].Samples[0].Value))
207+
assert.Equal(t, timestamp.UnixMilli(), series.Timeseries[0].Samples[0].Timestamp)
208+
}
209+
175210
func TestValidateStatusCode(t *testing.T) {
176211
t.Parallel()
177212
tests := []struct {

pkg/remotewrite/config.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ type Config struct {
5353
//
5454
// TODO: should we support K6_SUMMARY_TREND_STATS?
5555
TrendStats []string `json:"trendStats"`
56+
57+
StaleMarkers null.Bool `json:"staleMarkers"`
5658
}
5759

5860
// NewConfig creates an Output's configuration.
@@ -65,6 +67,7 @@ func NewConfig() Config {
6567
PushInterval: types.NullDurationFrom(defaultPushInterval),
6668
Headers: make(map[string]string),
6769
TrendStats: defaultTrendStats,
70+
StaleMarkers: null.BoolFrom(false),
6871
}
6972
}
7073

@@ -121,6 +124,10 @@ func (conf Config) Apply(applied Config) Config {
121124
conf.TrendAsNativeHistogram = applied.TrendAsNativeHistogram
122125
}
123126

127+
if applied.StaleMarkers.Valid {
128+
conf.StaleMarkers = applied.StaleMarkers
129+
}
130+
124131
if len(applied.Headers) > 0 {
125132
for k, v := range applied.Headers {
126133
conf.Headers[k] = v
@@ -235,6 +242,12 @@ func parseEnvs(env map[string]string) (Config, error) {
235242
c.TrendAsNativeHistogram = b
236243
}
237244

245+
if b, err := getEnvBool(env, "K6_PROMETHEUS_RW_STALE_MARKERS"); err != nil {
246+
return c, err
247+
} else if b.Valid {
248+
c.StaleMarkers = b
249+
}
250+
238251
if trendStats, trendStatsDefined := env["K6_PROMETHEUS_RW_TREND_STATS"]; trendStatsDefined {
239252
c.TrendStats = strings.Split(trendStats, ",")
240253
}

pkg/remotewrite/config_test.go

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ func TestConfigApply(t *testing.T) {
2828
Headers: map[string]string{
2929
"X-Header": "value",
3030
},
31-
TrendStats: []string{"p(99)"},
31+
TrendStats: []string{"p(99)"},
32+
StaleMarkers: null.BoolFrom(true),
3233
}
3334

3435
// Defaults should be overwritten by valid values
@@ -106,6 +107,7 @@ func TestGetConsolidatedConfig(t *testing.T) {
106107
PushInterval: types.NullDurationFrom(5 * time.Second),
107108
Headers: make(map[string]string),
108109
TrendStats: []string{"p(99)"},
110+
StaleMarkers: null.BoolFrom(false),
109111
},
110112
},
111113
"JSONSuccess": {
@@ -118,6 +120,7 @@ func TestGetConsolidatedConfig(t *testing.T) {
118120
PushInterval: types.NullDurationFrom(defaultPushInterval),
119121
Headers: make(map[string]string),
120122
TrendStats: []string{"p(99)"},
123+
StaleMarkers: null.BoolFrom(false),
121124
},
122125
},
123126
"MixedSuccess": {
@@ -135,6 +138,7 @@ func TestGetConsolidatedConfig(t *testing.T) {
135138
PushInterval: types.NullDurationFrom(defaultPushInterval),
136139
Headers: make(map[string]string),
137140
TrendStats: []string{"p(99)"},
141+
StaleMarkers: null.BoolFrom(false),
138142
},
139143
},
140144
"OrderOfPrecedence": {
@@ -152,6 +156,7 @@ func TestGetConsolidatedConfig(t *testing.T) {
152156
PushInterval: types.NullDurationFrom(defaultPushInterval),
153157
Headers: make(map[string]string),
154158
TrendStats: []string{"p(99)"},
159+
StaleMarkers: null.BoolFrom(false),
155160
},
156161
},
157162
"InvalidJSON": {
@@ -224,6 +229,7 @@ func TestOptionServerURL(t *testing.T) {
224229
PushInterval: types.NullDurationFrom(5 * time.Second),
225230
Headers: make(map[string]string),
226231
TrendStats: []string{"p(99)"},
232+
StaleMarkers: null.BoolFrom(false),
227233
}
228234
for name, tc := range cases {
229235
tc := tc
@@ -259,7 +265,8 @@ func TestOptionHeaders(t *testing.T) {
259265
"X-MY-HEADER1": "hval1",
260266
"X-MY-HEADER2": "hval2",
261267
},
262-
TrendStats: []string{"p(99)"},
268+
TrendStats: []string{"p(99)"},
269+
StaleMarkers: null.BoolFrom(false),
263270
}
264271
for name, tc := range cases {
265272
tc := tc
@@ -293,6 +300,7 @@ func TestOptionInsecureSkipTLSVerify(t *testing.T) {
293300
PushInterval: types.NullDurationFrom(defaultPushInterval),
294301
Headers: make(map[string]string),
295302
TrendStats: []string{"p(99)"},
303+
StaleMarkers: null.BoolFrom(false),
296304
}
297305
for name, tc := range cases {
298306
tc := tc
@@ -328,6 +336,7 @@ func TestOptionBasicAuth(t *testing.T) {
328336
PushInterval: types.NullDurationFrom(5 * time.Second),
329337
Headers: make(map[string]string),
330338
TrendStats: []string{"p(99)"},
339+
StaleMarkers: null.BoolFrom(false),
331340
}
332341

333342
for name, tc := range cases {
@@ -365,6 +374,7 @@ func TestOptionTrendAsNativeHistogram(t *testing.T) {
365374
Headers: make(map[string]string),
366375
TrendAsNativeHistogram: null.BoolFrom(true),
367376
TrendStats: []string{"p(99)"},
377+
StaleMarkers: null.BoolFrom(false),
368378
}
369379

370380
for name, tc := range cases {
@@ -401,6 +411,7 @@ func TestOptionPushInterval(t *testing.T) {
401411
PushInterval: types.NullDurationFrom((1 * time.Minute) + (2 * time.Second)),
402412
Headers: make(map[string]string),
403413
TrendStats: []string{"p(99)"},
414+
StaleMarkers: null.BoolFrom(false),
404415
}
405416

406417
for name, tc := range cases {
@@ -436,6 +447,40 @@ func TestConfigTrendStats(t *testing.T) {
436447
PushInterval: types.NullDurationFrom(5 * time.Second),
437448
Headers: make(map[string]string),
438449
TrendStats: []string{"max", "p(95)"},
450+
StaleMarkers: null.BoolFrom(false),
451+
}
452+
453+
for name, tc := range cases {
454+
tc := tc
455+
t.Run(name, func(t *testing.T) {
456+
t.Parallel()
457+
c, err := GetConsolidatedConfig(
458+
tc.jsonRaw, tc.env, tc.arg)
459+
require.NoError(t, err)
460+
assert.Equal(t, expconfig, c)
461+
})
462+
}
463+
}
464+
465+
func TestOptionStaleMarker(t *testing.T) {
466+
t.Parallel()
467+
468+
cases := map[string]struct {
469+
arg string
470+
env map[string]string
471+
jsonRaw json.RawMessage
472+
}{
473+
"JSON": {jsonRaw: json.RawMessage(`{"staleMarkers":true}`)},
474+
"Env": {env: map[string]string{"K6_PROMETHEUS_RW_STALE_MARKERS": "true"}},
475+
}
476+
477+
expconfig := Config{
478+
ServerURL: null.StringFrom("http://localhost:9090/api/v1/write"),
479+
InsecureSkipTLSVerify: null.BoolFrom(true),
480+
PushInterval: types.NullDurationFrom(5 * time.Second),
481+
Headers: make(map[string]string),
482+
TrendStats: []string{"p(99)"},
483+
StaleMarkers: null.BoolFrom(true),
439484
}
440485

441486
for name, tc := range cases {

pkg/remotewrite/remotewrite.go

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@ package remotewrite
44
import (
55
"context"
66
"fmt"
7-
"math"
87
"strings"
98
"time"
109

1110
"github.com/grafana/xk6-output-prometheus-remote/pkg/remote"
11+
"github.com/grafana/xk6-output-prometheus-remote/pkg/stale"
1212

1313
"go.k6.io/k6/metrics"
1414
"go.k6.io/k6/output"
@@ -17,16 +17,7 @@ import (
1717
prompb "go.buf.build/grpc/go/prometheus/prometheus"
1818
)
1919

20-
var (
21-
_ output.Output = new(Output)
22-
23-
// staleNaN is the Prometheus special value for marking
24-
// a time series as stale.
25-
//
26-
// https://pkg.go.dev/github.com/prometheus/prometheus/pkg/value#pkg-constants
27-
//nolint:gochecknoglobals
28-
staleNaN = math.Float64frombits(0x7ff0000000000002)
29-
)
20+
var _ output.Output = new(Output)
3021

3122
// Output is a k6 output that sends metrics to a Prometheus remote write endpoint.
3223
type Output struct {
@@ -99,8 +90,11 @@ func (o *Output) Stop() error {
9990
defer o.logger.Debug("Output stopped")
10091
o.periodicFlusher.Stop()
10192

102-
staleMarkers := o.staleMarkers(time.Now())
93+
if !o.config.StaleMarkers.Bool {
94+
return nil
95+
}
10396

97+
staleMarkers := o.staleMarkers(time.Now())
10498
if len(staleMarkers) < 1 {
10599
o.logger.Debug("No time series to mark as stale")
106100
return nil
@@ -133,7 +127,7 @@ func (o *Output) staleMarkers(t time.Time) []*prompb.TimeSeries {
133127
s.Samples = append(s.Samples, &prompb.Sample{})
134128
}
135129

136-
s.Samples[0].Value = staleNaN
130+
s.Samples[0].Value = stale.Marker
137131
s.Samples[0].Timestamp = timestamp
138132
}
139133
staleMarkers = append(staleMarkers, series...)

pkg/remotewrite/remotewrite_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,17 @@ package remotewrite
22

33
import (
44
"fmt"
5+
"io"
56
"math"
67
"testing"
78
"time"
89

10+
"github.com/sirupsen/logrus"
911
"github.com/stretchr/testify/assert"
1012
"github.com/stretchr/testify/require"
1113
prompb "go.buf.build/grpc/go/prometheus/prometheus"
14+
"go.k6.io/k6/lib/testutils"
15+
"go.k6.io/k6/lib/types"
1216
"go.k6.io/k6/metrics"
1317
"gopkg.in/guregu/null.v3"
1418
)
@@ -341,3 +345,52 @@ func TestOutputStaleMarkers(t *testing.T) {
341345
assert.True(t, math.IsNaN(markers[i].Samples[0].Value), "it isn't a StaleNaN value")
342346
}
343347
}
348+
349+
func TestOutputStopWithStaleMarkers(t *testing.T) {
350+
t.Parallel()
351+
352+
for _, tc := range []bool{true, false} {
353+
logHook := &testutils.SimpleLogrusHook{HookedLevels: []logrus.Level{logrus.DebugLevel}}
354+
logger := logrus.New()
355+
logger.SetLevel(logrus.DebugLevel)
356+
logger.AddHook(logHook)
357+
logger.SetOutput(io.Discard)
358+
359+
o := Output{
360+
logger: logger,
361+
config: Config{
362+
// setting a large interval so it does not trigger
363+
PushInterval: types.NullDurationFrom(1 * time.Hour),
364+
StaleMarkers: null.BoolFrom(tc),
365+
},
366+
}
367+
368+
err := o.Start()
369+
require.NoError(t, err)
370+
err = o.Stop()
371+
require.NoError(t, err)
372+
373+
// TODO: it isn't optimal to maintain
374+
// if a new logline is added in Start or flushMetrics
375+
// then this test will break
376+
// A mock of the client and check if Store is invoked
377+
// should be a more stable method.
378+
entries := logHook.Drain()
379+
require.NotEmpty(t, entries)
380+
381+
messages := func() []string {
382+
s := make([]string, 0, len(entries))
383+
for _, e := range entries {
384+
s = append(s, e.Message)
385+
}
386+
return s
387+
}()
388+
389+
msg := "No time series to mark as stale"
390+
assertfn := assert.Contains
391+
if !tc {
392+
assertfn = assert.NotContains
393+
}
394+
assertfn(t, messages, msg)
395+
}
396+
}

0 commit comments

Comments
 (0)