Skip to content

Commit 3905d55

Browse files
author
Yen-Cheng Chou
committed
Rename function; consider error from series cache; consider context.Done().
1 parent af6b87b commit 3905d55

File tree

1 file changed

+16
-7
lines changed

1 file changed

+16
-7
lines changed

retrieval/transform.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,10 @@ type sampleBuilder struct {
4242
func (b *sampleBuilder) next(ctx context.Context, samples []tsdb.RefSample) (*monitoring_pb.TimeSeries, uint64, []tsdb.RefSample, error) {
4343
sample := samples[0]
4444

45-
entry, ok := b.seriesGetWithRetry(ctx, sample)
45+
entry, ok, err := b.getSeriesWithRetry(ctx, sample)
46+
if err != nil {
47+
return nil, 0, samples, err
48+
}
4649
if !ok {
4750
return nil, 0, samples[1:], nil
4851
}
@@ -57,7 +60,6 @@ func (b *sampleBuilder) next(ctx context.Context, samples []tsdb.RefSample) (*mo
5760
}
5861
ts.Points = append(ts.Points, point)
5962

60-
var err error
6163
var resetTimestamp int64
6264

6365
switch entry.metadata.Type {
@@ -121,10 +123,15 @@ func (b *sampleBuilder) next(ctx context.Context, samples []tsdb.RefSample) (*mo
121123
return &ts, entry.hash, samples[1:], nil
122124
}
123125

124-
func (b *sampleBuilder) seriesGetWithRetry(ctx context.Context, sample tsdb.RefSample) (*seriesCacheEntry, bool) {
126+
func (b *sampleBuilder) getSeriesWithRetry(ctx context.Context, sample tsdb.RefSample) (entry *seriesCacheEntry, ok bool, err error) {
125127
backoff := time.Duration(0)
126-
entry, ok, err := b.series.get(ctx, sample.Ref)
127128
for {
129+
select {
130+
case <-ctx.Done():
131+
return nil, false, ctx.Err()
132+
default:
133+
}
134+
entry, ok, err = b.series.get(ctx, sample.Ref)
128135
if err == nil {
129136
break
130137
}
@@ -133,9 +140,8 @@ func (b *sampleBuilder) seriesGetWithRetry(ctx context.Context, sample tsdb.RefS
133140
if backoff > 0 {
134141
time.Sleep(backoff)
135142
}
136-
entry, ok, err = b.series.get(ctx, sample.Ref)
137143
}
138-
return entry, ok
144+
return entry, ok, nil
139145
}
140146

141147
const (
@@ -216,7 +222,10 @@ func (b *sampleBuilder) buildDistribution(
216222
// until we hit a new metric.
217223
Loop:
218224
for i, s := range samples {
219-
e, ok := b.seriesGetWithRetry(ctx, s)
225+
e, ok, err := b.getSeriesWithRetry(ctx, s)
226+
if err != nil {
227+
return nil, 0, samples, err
228+
}
220229
if !ok {
221230
consumed++
222231
// TODO(fabxc): increment metric.

0 commit comments

Comments
 (0)