Skip to content

Commit af6b87b

Browse files
committed
Avoid retry on non-recoverable errors.
Only retry when calling seriesCache.get().
1 parent 2708583 commit af6b87b

File tree

2 files changed

+42
-33
lines changed

2 files changed

+42
-33
lines changed

retrieval/manager.go

Lines changed: 4 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,10 @@ func (r *PrometheusReader) Run(ctx context.Context, startOffset int) error {
162162
)
163163
go seriesCache.run(ctx)
164164

165-
builder := &sampleBuilder{series: seriesCache}
165+
builder := &sampleBuilder{
166+
logger: r.logger,
167+
series: seriesCache,
168+
}
166169

167170
// NOTE(fabxc): wrap the tailer into a buffered reader once we become concerned
168171
// with performance. The WAL reader will do a lot of tiny reads otherwise.
@@ -217,7 +220,6 @@ Outer:
217220
level.Error(r.logger).Log("error", err)
218221
continue
219222
}
220-
backoff := time.Duration(0)
221223
// Do not increment the metric for produced samples each time but rather
222224
// once at the end.
223225
// Otherwise it will increase CPU usage by ~10%.
@@ -229,19 +231,11 @@ Outer:
229231
break Outer
230232
default:
231233
}
232-
// We intentionally don't use time.After in the select statement above
233-
// since we'd unnecessarily spawn a new goroutine for each sample
234-
// we process even when there are no errors.
235-
if backoff > 0 {
236-
time.Sleep(backoff)
237-
}
238-
239234
var outputSample *monitoring_pb.TimeSeries
240235
var hash uint64
241236
outputSample, hash, samples, err = builder.next(ctx, samples)
242237
if err != nil {
243238
level.Warn(r.logger).Log("msg", "Failed to build sample", "err", err)
244-
backoff = exponential(backoff)
245239
continue
246240
}
247241
if outputSample == nil {
@@ -339,18 +333,3 @@ func hashSeries(s *monitoring_pb.TimeSeries) uint64 {
339333
}
340334
return h
341335
}
342-
343-
func exponential(d time.Duration) time.Duration {
344-
const (
345-
min = 10 * time.Millisecond
346-
max = 2 * time.Second
347-
)
348-
d *= 2
349-
if d < min {
350-
d = min
351-
}
352-
if d > max {
353-
d = max
354-
}
355-
return d
356-
}

retrieval/transform.go

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import (
2222
"time"
2323

2424
timestamp_pb "github.com/golang/protobuf/ptypes/timestamp"
25+
"github.com/go-kit/kit/log"
26+
"github.com/go-kit/kit/log/level"
2527
"github.com/pkg/errors"
2628
"github.com/prometheus/prometheus/pkg/textparse"
2729
"github.com/prometheus/tsdb"
@@ -31,6 +33,7 @@ import (
3133
)
3234

3335
type sampleBuilder struct {
36+
logger log.Logger
3437
series seriesGetter
3538
}
3639

@@ -39,10 +42,7 @@ type sampleBuilder struct {
3942
func (b *sampleBuilder) next(ctx context.Context, samples []tsdb.RefSample) (*monitoring_pb.TimeSeries, uint64, []tsdb.RefSample, error) {
4043
sample := samples[0]
4144

42-
entry, ok, err := b.series.get(ctx, sample.Ref)
43-
if err != nil {
44-
return nil, 0, samples, errors.Wrap(err, "get series information")
45-
}
45+
entry, ok := b.seriesGetWithRetry(ctx, sample)
4646
if !ok {
4747
return nil, 0, samples[1:], nil
4848
}
@@ -57,6 +57,7 @@ func (b *sampleBuilder) next(ctx context.Context, samples []tsdb.RefSample) (*mo
5757
}
5858
ts.Points = append(ts.Points, point)
5959

60+
var err error
6061
var resetTimestamp int64
6162

6263
switch entry.metadata.Type {
@@ -120,6 +121,23 @@ func (b *sampleBuilder) next(ctx context.Context, samples []tsdb.RefSample) (*mo
120121
return &ts, entry.hash, samples[1:], nil
121122
}
122123

124+
func (b *sampleBuilder) seriesGetWithRetry(ctx context.Context, sample tsdb.RefSample) (*seriesCacheEntry, bool) {
125+
backoff := time.Duration(0)
126+
entry, ok, err := b.series.get(ctx, sample.Ref)
127+
for {
128+
if err == nil {
129+
break
130+
}
131+
level.Warn(b.logger).Log("msg", "failed to get seriesCacheEntry", "err", err)
132+
backoff = exponential(backoff)
133+
if backoff > 0 {
134+
time.Sleep(backoff)
135+
}
136+
entry, ok, err = b.series.get(ctx, sample.Ref)
137+
}
138+
return entry, ok
139+
}
140+
123141
const (
124142
metricSuffixBucket = "_bucket"
125143
metricSuffixSum = "_sum"
@@ -198,10 +216,7 @@ func (b *sampleBuilder) buildDistribution(
198216
// until we hit a new metric.
199217
Loop:
200218
for i, s := range samples {
201-
e, ok, err := b.series.get(ctx, s.Ref)
202-
if err != nil {
203-
return nil, 0, samples, err
204-
}
219+
e, ok := b.seriesGetWithRetry(ctx, s)
205220
if !ok {
206221
consumed++
207222
// TODO(fabxc): increment metric.
@@ -337,3 +352,18 @@ func histogramLabelsEqual(a, b tsdbLabels.Labels) bool {
337352
// If one label set still has labels left, they are not equal.
338353
return i == len(a) && j == len(b)
339354
}
355+
356+
func exponential(d time.Duration) time.Duration {
357+
const (
358+
min = 10 * time.Millisecond
359+
max = 2 * time.Second
360+
)
361+
d *= 2
362+
if d < min {
363+
d = min
364+
}
365+
if d > max {
366+
d = max
367+
}
368+
return d
369+
}

0 commit comments

Comments
 (0)