Skip to content

Commit df72d57

Browse files
authored
Add a basic metric for metastore DLQ recovery (#4563)
* Add a basic metric for metastore DLQ recovery * Add debug log for blocks successfully added to the DLQ * Cover new metric in existing test
1 parent 8db0333 commit df72d57

File tree

6 files changed

+61
-8
lines changed

6 files changed

+61
-8
lines changed

pkg/metastore/index/dlq/metrics.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package dlq
2+
3+
import (
4+
"github.com/prometheus/client_golang/prometheus"
5+
)
6+
7+
type metrics struct {
8+
recoveryAttempts *prometheus.CounterVec
9+
}
10+
11+
func newMetrics(reg prometheus.Registerer) *metrics {
12+
m := &metrics{
13+
recoveryAttempts: prometheus.NewCounterVec(
14+
prometheus.CounterOpts{
15+
Namespace: "pyroscope",
16+
Subsystem: "metastore",
17+
Name: "dlq_recovery_attempts_total",
18+
Help: "Total number of DLQ block recovery attempts by status.",
19+
},
20+
[]string{"status"},
21+
),
22+
}
23+
24+
if reg != nil {
25+
reg.MustRegister(m.recoveryAttempts)
26+
}
27+
28+
return m
29+
}

pkg/metastore/index/dlq/recovery.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/go-kit/log"
1212
"github.com/go-kit/log/level"
13+
"github.com/prometheus/client_golang/prometheus"
1314
"github.com/thanos-io/objstore"
1415
"google.golang.org/grpc/codes"
1516
"google.golang.org/grpc/status"
@@ -36,18 +37,20 @@ type Recovery struct {
3637
logger log.Logger
3738
metastore Metastore
3839
bucket objstore.Bucket
40+
metrics *metrics
3941

4042
started bool
4143
cancel func()
4244
m sync.Mutex
4345
}
4446

45-
func NewRecovery(logger log.Logger, config Config, metastore Metastore, bucket objstore.Bucket) *Recovery {
47+
func NewRecovery(logger log.Logger, config Config, metastore Metastore, bucket objstore.Bucket, reg prometheus.Registerer) *Recovery {
4648
return &Recovery{
4749
config: config,
4850
logger: logger,
4951
metastore: metastore,
5052
bucket: bucket,
53+
metrics: newMetrics(reg),
5154
}
5255
}
5356

@@ -121,37 +124,46 @@ func (r *Recovery) recover(ctx context.Context, path string) (err error) {
121124
switch {
122125
case err == nil:
123126
case errors.Is(err, context.Canceled):
127+
r.metrics.recoveryAttempts.WithLabelValues("canceled").Inc()
124128
return err
125129
case r.bucket.IsObjNotFoundErr(err):
126130
// This is somewhat opportunistic: the error is likely caused by a competing recovery
127131
// process that has already recovered the block, before we've discovered that the
128132
// leadership has changed.
133+
r.metrics.recoveryAttempts.WithLabelValues("not_found").Inc()
129134
level.Warn(r.logger).Log("msg", "block metadata not found; skipping", "path", path)
130135
return nil
131136
default:
132137
// This is somewhat opportunistic, as we don't know if the error is transient or not.
133138
// we should consider an explicit retry mechanism with backoff and a limit on the
134139
// number of attempts.
140+
r.metrics.recoveryAttempts.WithLabelValues("read_error").Inc()
135141
level.Warn(r.logger).Log("msg", "failed to read block metadata; to be retried", "err", err, "path", path)
136142
return err
137143
}
138144

139145
var meta metastorev1.BlockMeta
140146
if err = meta.UnmarshalVT(b); err != nil {
141-
level.Error(r.logger).Log("msg", "invalid block metadata; skipping", "err", err, "path", path)
147+
r.metrics.recoveryAttempts.WithLabelValues("unmarshal_error").Inc()
148+
level.Error(r.logger).Log("msg", "failed to unmarshal block metadata; skipping", "err", err, "path", path)
142149
return nil
143150
}
144151

145152
switch _, err = r.metastore.AddRecoveredBlock(ctx, &metastorev1.AddBlockRequest{Block: &meta}); {
146153
case err == nil:
154+
r.metrics.recoveryAttempts.WithLabelValues("success").Inc()
155+
level.Debug(r.logger).Log("msg", "successfully recovered block from DLQ", "block_id", meta.Id, "path", path)
147156
return nil
148157
case status.Code(err) == codes.InvalidArgument:
149-
level.Error(r.logger).Log("msg", "invalid block metadata", "err", err, "path", path)
158+
r.metrics.recoveryAttempts.WithLabelValues("invalid_metadata").Inc()
159+
level.Error(r.logger).Log("msg", "block metadata rejected by metastore; skipping", "err", err, "block_id", meta.Id, "path", path)
150160
return nil
151161
case raftnode.IsRaftLeadershipError(err):
162+
r.metrics.recoveryAttempts.WithLabelValues("leadership_change").Inc()
152163
level.Warn(r.logger).Log("msg", "leadership change; recovery interrupted", "err", err, "path", path)
153164
return err
154165
default:
166+
r.metrics.recoveryAttempts.WithLabelValues("metastore_error").Inc()
155167
level.Error(r.logger).Log("msg", "failed to add block metadata; to be retried", "err", err, "path", path)
156168
return err
157169
}

pkg/metastore/index/dlq/recovery_test.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
"testing"
77
"time"
88

9+
"github.com/prometheus/client_golang/prometheus"
10+
"github.com/prometheus/client_golang/prometheus/testutil"
911
"github.com/stretchr/testify/assert"
1012
"github.com/stretchr/testify/mock"
1113
"github.com/stretchr/testify/require"
@@ -51,7 +53,7 @@ func TestRecoverTick(t *testing.T) {
5153
addMeta(bucket, meta)
5254
}
5355

54-
r := NewRecovery(test.NewTestingLogger(t), Config{}, srv, bucket)
56+
r := NewRecovery(test.NewTestingLogger(t), Config{}, srv, bucket, prometheus.NewRegistry())
5557
r.recoverTick(context.Background())
5658

5759
expected := []*metastorev1.BlockMeta{
@@ -65,6 +67,10 @@ func TestRecoverTick(t *testing.T) {
6567
require.Equal(t, actual[i].Id, expected[i].Id)
6668
require.Equal(t, actual[i].Shard, expected[i].Shard)
6769
}
70+
71+
assert.Equal(t, 3.0, testutil.ToFloat64(r.metrics.recoveryAttempts.WithLabelValues("success")))
72+
assert.Equal(t, 0.0, testutil.ToFloat64(r.metrics.recoveryAttempts.WithLabelValues("unmarshal_error")))
73+
assert.Equal(t, 0.0, testutil.ToFloat64(r.metrics.recoveryAttempts.WithLabelValues("invalid_metadata")))
6874
}
6975

7076
func TestNotRaftLeader(t *testing.T) {
@@ -89,10 +95,13 @@ func TestNotRaftLeader(t *testing.T) {
8995
addMeta(bucket, meta)
9096
}
9197

92-
r := NewRecovery(test.NewTestingLogger(t), Config{}, srv, bucket)
98+
r := NewRecovery(test.NewTestingLogger(t), Config{}, srv, bucket, prometheus.NewRegistry())
9399
r.recoverTick(context.Background())
94100

95101
assert.Equal(t, 1, len(bucket.Objects()))
102+
103+
assert.Equal(t, 1.0, testutil.ToFloat64(r.metrics.recoveryAttempts.WithLabelValues("metastore_error")))
104+
assert.Equal(t, 0.0, testutil.ToFloat64(r.metrics.recoveryAttempts.WithLabelValues("success")))
96105
}
97106

98107
func TestStartStop(t *testing.T) {
@@ -129,7 +138,7 @@ func TestStartStop(t *testing.T) {
129138
addMeta(bucket, meta)
130139
}
131140

132-
r := NewRecovery(test.NewTestingLogger(t), Config{CheckInterval: time.Millisecond * 10}, srv, bucket)
141+
r := NewRecovery(test.NewTestingLogger(t), Config{CheckInterval: time.Millisecond * 10}, srv, bucket, prometheus.NewRegistry())
133142
r.Start()
134143
defer r.Stop()
135144

@@ -150,6 +159,8 @@ func TestStartStop(t *testing.T) {
150159
require.Equal(t, actual[i].Id, expected[i].Id)
151160
require.Equal(t, actual[i].Shard, expected[i].Shard)
152161
}
162+
163+
assert.Equal(t, 3.0, testutil.ToFloat64(r.metrics.recoveryAttempts.WithLabelValues("success")))
153164
}
154165

155166
func addMeta(bucket *memory.InMemBucket, meta *metastorev1.BlockMeta) {

pkg/metastore/metastore.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ func New(
175175
m.indexService = NewIndexService(m.logger, m.raft, m.leaderRead, m.index, m.placement)
176176
m.tenantService = NewTenantService(m.logger, m.followerRead, m.index)
177177
m.queryService = NewQueryService(m.logger, m.followerRead, m.index)
178-
m.recovery = dlq.NewRecovery(logger, config.Index.Recovery, m.indexService, bucket)
178+
m.recovery = dlq.NewRecovery(logger, config.Index.Recovery, m.indexService, bucket, m.reg)
179179
m.cleaner = cleaner.NewCleaner(m.logger, m.overrides, config.Index.Cleaner, m.indexService)
180180

181181
// These are the services that only run on the raft leader.

pkg/segmentwriter/segment.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -676,6 +676,7 @@ func (sw *segmentsWriter) storeMetadata(ctx context.Context, meta *metastorev1.B
676676
}()
677677

678678
if err = s.sw.storeMetadataDLQ(ctx, meta); err == nil {
679+
level.Debug(s.logger).Log("msg", "successfully wrote block metadata to DLQ", "block_id", meta.Id)
679680
return nil
680681
}
681682

pkg/segmentwriter/segment_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,7 @@ func TestDLQRecoveryMock(t *testing.T) {
392392
Return(&metastorev1.AddBlockResponse{}, nil)
393393
recovery := dlq.NewRecovery(test.NewTestingLogger(t), dlq.Config{
394394
CheckInterval: 100 * time.Millisecond,
395-
}, srv, sw.bucket)
395+
}, srv, sw.bucket, nil)
396396
recovery.Start()
397397
defer recovery.Stop()
398398

0 commit comments

Comments
 (0)