Skip to content
This repository was archived by the owner on Jul 19, 2023. It is now read-only.

Commit 05b4713

Browse files
committed
Add support for parca profilestore API
1 parent b1583e4 commit 05b4713

File tree

6 files changed

+56
-3
lines changed

6 files changed

+56
-3
lines changed

pkg/distributor/distributor.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,13 @@ import (
1212

1313
"github.com/bufbuild/connect-go"
1414
"github.com/go-kit/log"
15+
"github.com/go-kit/log/level"
1516
"github.com/google/uuid"
1617
"github.com/grafana/dskit/ring"
1718
ring_client "github.com/grafana/dskit/ring/client"
1819
"github.com/grafana/dskit/services"
1920
"github.com/opentracing/opentracing-go"
21+
parcastorev1 "github.com/parca-dev/parca/gen/proto/go/parca/profilestore/v1alpha1"
2022
"github.com/parca-dev/parca/pkg/scrape"
2123
"github.com/pkg/errors"
2224
"github.com/prometheus/client_golang/prometheus"
@@ -147,6 +149,8 @@ func (d *Distributor) Push(ctx context.Context, req *connect.Request[pushv1.Push
147149

148150
p.Normalize()
149151

152+
level.Warn(d.logger).Log("msg", "received sample", "labels", firemodel.LabelPairsString(series.Labels), "type", p.StringTable[p.SampleType[0].Type])
153+
150154
// reuse the data buffer if possible
151155
size := p.SizeVT()
152156
if cap(data) < size {
@@ -309,3 +313,44 @@ func TokenFor(tenantID, labels string) uint32 {
309313
_, _ = h.Write([]byte(labels))
310314
return h.Sum32()
311315
}
316+
317+
func (d *Distributor) ParcaProfileStore() parcastorev1.ProfileStoreServiceServer {
318+
return &ParcaProfileStore{
319+
distributor: d,
320+
}
321+
}
322+
323+
type ParcaProfileStore struct {
324+
parcastorev1.UnimplementedProfileStoreServiceServer
325+
distributor *Distributor
326+
}
327+
328+
func (s *ParcaProfileStore) WriteRaw(ctx context.Context, req *parcastorev1.WriteRawRequest) (*parcastorev1.WriteRawResponse, error) {
329+
nReq := &pushv1.PushRequest{
330+
Series: make([]*pushv1.RawProfileSeries, len(req.Series)),
331+
}
332+
for idxSeries, series := range req.Series {
333+
nReq.Series[idxSeries] = &pushv1.RawProfileSeries{
334+
Samples: make([]*pushv1.RawSample, len(series.Samples)),
335+
Labels: make([]*commonv1.LabelPair, len(series.Labels.Labels)),
336+
}
337+
for idx, l := range series.Labels.Labels {
338+
nReq.Series[idxSeries].Labels[idx] = &commonv1.LabelPair{
339+
Name: l.Name,
340+
Value: l.Value,
341+
}
342+
}
343+
for idx, s := range series.Samples {
344+
nReq.Series[idxSeries].Samples[idx] = &pushv1.RawSample{
345+
RawProfile: s.RawProfile,
346+
}
347+
}
348+
level.Warn(s.distributor.logger).Log("msg", "converted parca sample", "labels", firemodel.LabelPairsString(nReq.Series[idxSeries].Labels))
349+
}
350+
351+
if _, err := s.distributor.Push(ctx, connect.NewRequest(nReq)); err != nil {
352+
return nil, err
353+
}
354+
355+
return &parcastorev1.WriteRawResponse{}, nil
356+
}

pkg/fire/modules.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ import (
1313
"github.com/grafana/dskit/ring"
1414
"github.com/grafana/dskit/services"
1515
grpcgw "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
16+
parcastorev1 "github.com/parca-dev/parca/gen/proto/go/parca/profilestore/v1alpha1"
1617
"github.com/pkg/errors"
17-
1818
"github.com/prometheus/client_golang/prometheus"
1919
"github.com/prometheus/common/version"
2020
"github.com/thanos-io/thanos/pkg/discovery/dns"
@@ -107,7 +107,12 @@ func (f *Fire) initDistributor() (services.Service, error) {
107107
// initialise direct pusher, this overwrites the default HTTP client
108108
f.pusherClient = d
109109

110+
// register pusher
110111
pushv1connect.RegisterPusherServiceHandler(f.Server.HTTP, d)
112+
113+
// register parca compatible profile store
114+
parcastorev1.RegisterProfileStoreServiceServer(f.Server.GRPC, d.ParcaProfileStore())
115+
111116
return d, nil
112117
}
113118

pkg/firedb/head_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,7 @@ func TestHeadIngestRealProfiles(t *testing.T) {
301301
profilePaths := []string{
302302
"testdata/heap",
303303
"testdata/profile",
304+
"testdata/parca-agent",
304305
}
305306

306307
head, err := NewHead(t.TempDir())

pkg/firedb/locations.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,10 @@ func (*locationsHelper) addToRewriter(r *rewriter, elemRewriter idConversionTabl
5353
}
5454

5555
func (*locationsHelper) rewrite(r *rewriter, l *profilev1.Location) error {
56-
r.mappings.rewriteUint64(&l.MappingId)
56+
// ignore mappingIDs of 0, as they indicate that it has already been symbolized.
57+
if l.MappingId != 0 {
58+
r.mappings.rewriteUint64(&l.MappingId)
59+
}
5760

5861
for pos := range l.Line {
5962
r.functions.rewriteUint64(&l.Line[pos].FunctionId)

pkg/firedb/profiles.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,6 @@ func (*profilesHelper) key(s *schemav1.Profile) noKey {
280280
}
281281

282282
func (*profilesHelper) addToRewriter(r *rewriter, elemRewriter idConversionTable) {
283-
r.locations = elemRewriter
284283
}
285284

286285
func (*profilesHelper) rewrite(r *rewriter, s *schemav1.Profile) error {

pkg/firedb/testdata/parca-agent

10.6 KB
Binary file not shown.

0 commit comments

Comments
 (0)