Skip to content

Commit 17adc08

Browse files
author
Yen-Cheng Chou
committed
Rename function; consider error from series cache; consider context.Done().
1 parent ebb40e6 commit 17adc08

File tree

1 file changed

+16
-7
lines changed

1 file changed

+16
-7
lines changed

retrieval/transform.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,10 @@ type sampleBuilder struct {
4242
func (b *sampleBuilder) next(ctx context.Context, samples []tsdb.RefSample) (*monitoring_pb.TimeSeries, uint64, []tsdb.RefSample, error) {
4343
sample := samples[0]
4444

45-
entry, ok := b.seriesGetWithRetry(ctx, sample)
45+
entry, ok, err := b.getSeriesWithRetry(ctx, sample)
46+
if err != nil {
47+
return nil, 0, samples, err
48+
}
4649
if !ok {
4750
return nil, 0, samples[1:], nil
4851
}
@@ -65,7 +68,6 @@ func (b *sampleBuilder) next(ctx context.Context, samples []tsdb.RefSample) (*mo
6568
}
6669
ts.Points = append(ts.Points, point)
6770

68-
var err error
6971
var resetTimestamp int64
7072

7173
switch entry.metadata.Type {
@@ -129,10 +131,15 @@ func (b *sampleBuilder) next(ctx context.Context, samples []tsdb.RefSample) (*mo
129131
return &ts, entry.hash, samples[1:], nil
130132
}
131133

132-
func (b *sampleBuilder) seriesGetWithRetry(ctx context.Context, sample tsdb.RefSample) (*seriesCacheEntry, bool) {
134+
func (b *sampleBuilder) getSeriesWithRetry(ctx context.Context, sample tsdb.RefSample) (entry *seriesCacheEntry, ok bool, err error) {
133135
backoff := time.Duration(0)
134-
entry, ok, err := b.series.get(ctx, sample.Ref)
135136
for {
137+
select {
138+
case <-ctx.Done():
139+
return nil, false, ctx.Err()
140+
default:
141+
}
142+
entry, ok, err = b.series.get(ctx, sample.Ref)
136143
if err == nil {
137144
break
138145
}
@@ -141,9 +148,8 @@ func (b *sampleBuilder) seriesGetWithRetry(ctx context.Context, sample tsdb.RefS
141148
if backoff > 0 {
142149
time.Sleep(backoff)
143150
}
144-
entry, ok, err = b.series.get(ctx, sample.Ref)
145151
}
146-
return entry, ok
152+
return entry, ok, nil
147153
}
148154

149155
const (
@@ -228,7 +234,10 @@ func (b *sampleBuilder) buildDistribution(
228234
// until we hit a new metric.
229235
Loop:
230236
for i, s := range samples {
231-
e, ok := b.seriesGetWithRetry(ctx, s)
237+
e, ok, err := b.getSeriesWithRetry(ctx, s)
238+
if err != nil {
239+
return nil, 0, samples, err
240+
}
232241
if !ok {
233242
consumed++
234243
// TODO(fabxc): increment metric.

0 commit comments

Comments
 (0)