Skip to content

Commit ebb40e6

Browse files
StevenYCChouYen-Cheng Chou
authored andcommitted
Avoid retry on non-recoverable errors.
Only retry when calling seriesCache.get().
1 parent c4b3b4e commit ebb40e6

File tree

2 files changed

+42
-33
lines changed

2 files changed

+42
-33
lines changed

retrieval/manager.go

Lines changed: 4 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,10 @@ func (r *PrometheusReader) Run(ctx context.Context, startOffset int) error {
166166
)
167167
go seriesCache.run(ctx)
168168

169-
builder := &sampleBuilder{series: seriesCache}
169+
builder := &sampleBuilder{
170+
logger: r.logger,
171+
series: seriesCache,
172+
}
170173

171174
// NOTE(fabxc): wrap the tailer into a buffered reader once we become concerned
172175
// with performance. The WAL reader will do a lot of tiny reads otherwise.
@@ -221,7 +224,6 @@ Outer:
221224
level.Error(r.logger).Log("error", err)
222225
continue
223226
}
224-
backoff := time.Duration(0)
225227
// Do not increment the metric for produced samples each time but rather
226228
// once at the end.
227229
// Otherwise it will increase CPU usage by ~10%.
@@ -233,19 +235,11 @@ Outer:
233235
break Outer
234236
default:
235237
}
236-
// We intentionally don't use time.After in the select statement above
237-
// since we'd unnecessarily spawn a new goroutine for each sample
238-
// we process even when there are no errors.
239-
if backoff > 0 {
240-
time.Sleep(backoff)
241-
}
242-
243238
var outputSample *monitoring_pb.TimeSeries
244239
var hash uint64
245240
outputSample, hash, samples, err = builder.next(ctx, samples)
246241
if err != nil {
247242
level.Warn(r.logger).Log("msg", "Failed to build sample", "err", err)
248-
backoff = exponential(backoff)
249243
continue
250244
}
251245
if outputSample == nil {
@@ -343,18 +337,3 @@ func hashSeries(s *monitoring_pb.TimeSeries) uint64 {
343337
}
344338
return h
345339
}
346-
347-
func exponential(d time.Duration) time.Duration {
348-
const (
349-
min = 10 * time.Millisecond
350-
max = 2 * time.Second
351-
)
352-
d *= 2
353-
if d < min {
354-
d = min
355-
}
356-
if d > max {
357-
d = max
358-
}
359-
return d
360-
}

retrieval/transform.go

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import (
2222
"time"
2323

2424
timestamp_pb "github.com/golang/protobuf/ptypes/timestamp"
25+
"github.com/go-kit/kit/log"
26+
"github.com/go-kit/kit/log/level"
2527
"github.com/pkg/errors"
2628
"github.com/prometheus/prometheus/pkg/textparse"
2729
"github.com/prometheus/tsdb"
@@ -31,6 +33,7 @@ import (
3133
)
3234

3335
type sampleBuilder struct {
36+
logger log.Logger
3437
series seriesGetter
3538
}
3639

@@ -39,10 +42,7 @@ type sampleBuilder struct {
3942
func (b *sampleBuilder) next(ctx context.Context, samples []tsdb.RefSample) (*monitoring_pb.TimeSeries, uint64, []tsdb.RefSample, error) {
4043
sample := samples[0]
4144

42-
entry, ok, err := b.series.get(ctx, sample.Ref)
43-
if err != nil {
44-
return nil, 0, samples, errors.Wrap(err, "get series information")
45-
}
45+
entry, ok := b.seriesGetWithRetry(ctx, sample)
4646
if !ok {
4747
return nil, 0, samples[1:], nil
4848
}
@@ -65,6 +65,7 @@ func (b *sampleBuilder) next(ctx context.Context, samples []tsdb.RefSample) (*mo
6565
}
6666
ts.Points = append(ts.Points, point)
6767

68+
var err error
6869
var resetTimestamp int64
6970

7071
switch entry.metadata.Type {
@@ -128,6 +129,23 @@ func (b *sampleBuilder) next(ctx context.Context, samples []tsdb.RefSample) (*mo
128129
return &ts, entry.hash, samples[1:], nil
129130
}
130131

132+
func (b *sampleBuilder) seriesGetWithRetry(ctx context.Context, sample tsdb.RefSample) (*seriesCacheEntry, bool) {
133+
backoff := time.Duration(0)
134+
entry, ok, err := b.series.get(ctx, sample.Ref)
135+
for {
136+
if err == nil {
137+
break
138+
}
139+
level.Warn(b.logger).Log("msg", "failed to get seriesCacheEntry", "err", err)
140+
backoff = exponential(backoff)
141+
if backoff > 0 {
142+
time.Sleep(backoff)
143+
}
144+
entry, ok, err = b.series.get(ctx, sample.Ref)
145+
}
146+
return entry, ok
147+
}
148+
131149
const (
132150
metricSuffixBucket = "_bucket"
133151
metricSuffixSum = "_sum"
@@ -210,10 +228,7 @@ func (b *sampleBuilder) buildDistribution(
210228
// until we hit a new metric.
211229
Loop:
212230
for i, s := range samples {
213-
e, ok, err := b.series.get(ctx, s.Ref)
214-
if err != nil {
215-
return nil, 0, samples, err
216-
}
231+
e, ok := b.seriesGetWithRetry(ctx, s)
217232
if !ok {
218233
consumed++
219234
// TODO(fabxc): increment metric.
@@ -349,3 +364,18 @@ func histogramLabelsEqual(a, b tsdbLabels.Labels) bool {
349364
// If one label set still has labels left, they are not equal.
350365
return i == len(a) && j == len(b)
351366
}
367+
368+
func exponential(d time.Duration) time.Duration {
369+
const (
370+
min = 10 * time.Millisecond
371+
max = 2 * time.Second
372+
)
373+
d *= 2
374+
if d < min {
375+
d = min
376+
}
377+
if d > max {
378+
d = max
379+
}
380+
return d
381+
}

0 commit comments

Comments
 (0)