Skip to content

Commit 88e48bd

Browse files
cyriltovenasimonswinekolesnikovae
authored
Compact Blocks (#2134)
Add first compact blocks implementations. This is still a work in progress. --------- Co-authored-by: Christian Simon <simon@swine.de> Co-authored-by: Anton Kolesnikov <anton.e.kolesnikov@gmail.com>
1 parent 351696b commit 88e48bd

23 files changed

+1991
-232
lines changed

pkg/iter/iter.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,20 @@ func NewSliceSeekIterator[A constraints.Ordered](s []A) SeekIterator[A, A] {
103103
}
104104
}
105105

106+
type slicePositionIterator[T constraints.Integer, M any] struct {
107+
i Iterator[T]
108+
s []M
109+
}
110+
111+
func NewSliceIndexIterator[T constraints.Integer, M any](s []M, i Iterator[T]) Iterator[M] {
112+
return slicePositionIterator[T, M]{s: s, i: i}
113+
}
114+
115+
func (i slicePositionIterator[T, M]) Next() bool { return i.i.Next() }
116+
func (i slicePositionIterator[T, M]) At() M { return i.s[i.i.At()] }
117+
func (i slicePositionIterator[T, M]) Err() error { return i.i.Err() }
118+
func (i slicePositionIterator[T, M]) Close() error { return i.i.Close() }
119+
106120
type sliceSeekIterator[A constraints.Ordered] struct {
107121
*sliceIterator[A]
108122
}

pkg/phlaredb/block_querier.go

Lines changed: 66 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,13 @@ type singleBlockQuerier struct {
305305
type StacktraceDB interface {
306306
Open(ctx context.Context) error
307307
Close() error
308-
Resolve(ctx context.Context, mapping uint64, locs locationsIdsByStacktraceID, stacktraceIDs []uint32) error
308+
309+
// Load the database into memory entirely.
310+
// This method is used at compaction.
311+
Load(context.Context) error
312+
WriteStats(partition uint64, s *symdb.Stats)
313+
314+
Resolve(ctx context.Context, partition uint64, locs symdb.StacktraceInserter, stacktraceIDs []uint32) error
309315
}
310316

311317
type stacktraceResolverV1 struct {
@@ -321,18 +327,33 @@ func (r *stacktraceResolverV1) Close() error {
321327
return r.stacktraces.Close()
322328
}
323329

324-
func (r *stacktraceResolverV1) Resolve(ctx context.Context, mapping uint64, locs locationsIdsByStacktraceID, stacktraceIDs []uint32) error {
330+
func (r *stacktraceResolverV1) Resolve(ctx context.Context, _ uint64, locs symdb.StacktraceInserter, stacktraceIDs []uint32) error {
325331
stacktraces := repeatedColumnIter(ctx, r.stacktraces.file, "LocationIDs.list.element", iter.NewSliceIterator(stacktraceIDs))
326332
defer stacktraces.Close()
327-
333+
t := make([]int32, 0, 64)
328334
for stacktraces.Next() {
329335
s := stacktraces.At()
330-
locs.addFromParquet(int64(s.Row), s.Values)
331-
336+
t = grow(t, len(s.Values))
337+
for i, v := range s.Values {
338+
t[i] = v.Int32()
339+
}
340+
locs.InsertStacktrace(s.Row, t)
332341
}
333342
return stacktraces.Err()
334343
}
335344

345+
func (r *stacktraceResolverV1) WriteStats(_ uint64, s *symdb.Stats) {
346+
s.StacktracesTotal = int(r.stacktraces.file.NumRows())
347+
s.MaxStacktraceID = s.StacktracesTotal
348+
}
349+
350+
func (r *stacktraceResolverV1) Load(context.Context) error {
351+
// FIXME(kolesnikovae): Loading all stacktraces from parquet file
352+
// into memory is likely a bad choice. Instead we could convert
353+
// it to symdb first.
354+
return nil
355+
}
356+
336357
type stacktraceResolverV2 struct {
337358
reader *symdb.Reader
338359
bucketReader phlareobj.Bucket
@@ -351,19 +372,25 @@ func (r *stacktraceResolverV2) Close() error {
351372
return nil
352373
}
353374

354-
func (r *stacktraceResolverV2) Resolve(ctx context.Context, mapping uint64, locs locationsIdsByStacktraceID, stacktraceIDs []uint32) error {
355-
mr, ok := r.reader.MappingReader(mapping)
375+
func (r *stacktraceResolverV2) Resolve(ctx context.Context, partition uint64, locs symdb.StacktraceInserter, stacktraceIDs []uint32) error {
376+
mr, ok := r.reader.SymbolsResolver(partition)
356377
if !ok {
357378
return nil
358379
}
359380
resolver := mr.StacktraceResolver()
360381
defer resolver.Release()
382+
return resolver.ResolveStacktraces(ctx, locs, stacktraceIDs)
383+
}
361384

362-
return resolver.ResolveStacktraces(ctx, symdb.StacktraceInserterFn(
363-
func(stacktraceID uint32, locations []int32) {
364-
locs.add(int64(stacktraceID), locations)
365-
},
366-
), stacktraceIDs)
385+
func (r *stacktraceResolverV2) Load(ctx context.Context) error {
386+
return r.reader.Load(ctx)
387+
}
388+
389+
func (r *stacktraceResolverV2) WriteStats(partition uint64, s *symdb.Stats) {
390+
mr, ok := r.reader.SymbolsResolver(partition)
391+
if ok {
392+
mr.WriteStats(s)
393+
}
367394
}
368395

369396
func NewSingleBlockQuerierFromMeta(phlarectx context.Context, bucketReader phlareobj.Bucket, meta *block.Meta) *singleBlockQuerier {
@@ -427,6 +454,33 @@ func newStacktraceResolverV2(bucketReader phlareobj.Bucket) StacktraceDB {
427454
}
428455
}
429456

457+
func (b *singleBlockQuerier) Profiles() []parquet.RowGroup {
458+
return b.profiles.file.RowGroups()
459+
}
460+
461+
func (b *singleBlockQuerier) Index() IndexReader {
462+
return b.index
463+
}
464+
465+
func (b *singleBlockQuerier) Symbols() SymbolsReader {
466+
return &inMemorySymbolsReader{
467+
partitions: make(map[uint64]*inMemorySymbolsResolver),
468+
469+
strings: b.strings,
470+
functions: b.functions,
471+
locations: b.locations,
472+
mappings: b.mappings,
473+
stacktraces: b.stacktraces,
474+
}
475+
}
476+
477+
func (b *singleBlockQuerier) Meta() block.Meta {
478+
if b.meta == nil {
479+
return block.Meta{}
480+
}
481+
return *b.meta
482+
}
483+
430484
func (b *singleBlockQuerier) Close() error {
431485
b.openLock.Lock()
432486
defer func() {
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package phlaredb
2+
3+
import (
4+
"context"
5+
6+
"github.com/grafana/pyroscope/pkg/iter"
7+
schemav1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1"
8+
"github.com/grafana/pyroscope/pkg/phlaredb/symdb"
9+
)
10+
11+
// TODO(kolesnikovae): Refactor to symdb.
12+
13+
type SymbolsReader interface {
14+
SymbolsResolver(partition uint64) (SymbolsResolver, error)
15+
}
16+
17+
type SymbolsResolver interface {
18+
ResolveStacktraces(ctx context.Context, dst symdb.StacktraceInserter, stacktraces []uint32) error
19+
20+
Locations(iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryLocation]
21+
Mappings(iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryMapping]
22+
Functions(iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryFunction]
23+
Strings(iter.Iterator[uint32]) iter.Iterator[string]
24+
25+
WriteStats(*symdb.Stats)
26+
}
27+
28+
type inMemorySymbolsReader struct {
29+
partitions map[uint64]*inMemorySymbolsResolver
30+
31+
// TODO(kolesnikovae): Split into partitions.
32+
strings inMemoryparquetReader[string, *schemav1.StringPersister]
33+
functions inMemoryparquetReader[*schemav1.InMemoryFunction, *schemav1.FunctionPersister]
34+
locations inMemoryparquetReader[*schemav1.InMemoryLocation, *schemav1.LocationPersister]
35+
mappings inMemoryparquetReader[*schemav1.InMemoryMapping, *schemav1.MappingPersister]
36+
stacktraces StacktraceDB
37+
}
38+
39+
func (r *inMemorySymbolsReader) SymbolsResolver(partition uint64) (SymbolsResolver, error) {
40+
p, ok := r.partitions[partition]
41+
if !ok {
42+
p = &inMemorySymbolsResolver{
43+
partition: partition,
44+
reader: r,
45+
}
46+
r.partitions[partition] = p
47+
}
48+
return p, nil
49+
}
50+
51+
type inMemorySymbolsResolver struct {
52+
partition uint64
53+
reader *inMemorySymbolsReader
54+
}
55+
56+
func (s inMemorySymbolsResolver) ResolveStacktraces(ctx context.Context, dst symdb.StacktraceInserter, stacktraces []uint32) error {
57+
return s.reader.stacktraces.Resolve(ctx, s.partition, dst, stacktraces)
58+
}
59+
60+
func (s inMemorySymbolsResolver) Locations(i iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryLocation] {
61+
return iter.NewSliceIndexIterator(s.reader.locations.cache, i)
62+
}
63+
64+
func (s inMemorySymbolsResolver) Mappings(i iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryMapping] {
65+
return iter.NewSliceIndexIterator(s.reader.mappings.cache, i)
66+
}
67+
68+
func (s inMemorySymbolsResolver) Functions(i iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryFunction] {
69+
return iter.NewSliceIndexIterator(s.reader.functions.cache, i)
70+
}
71+
72+
func (s inMemorySymbolsResolver) Strings(i iter.Iterator[uint32]) iter.Iterator[string] {
73+
return iter.NewSliceIndexIterator(s.reader.strings.cache, i)
74+
}
75+
76+
func (s inMemorySymbolsResolver) WriteStats(stats *symdb.Stats) {
77+
s.reader.stacktraces.WriteStats(s.partition, stats)
78+
stats.LocationsTotal = len(s.reader.locations.cache)
79+
stats.MappingsTotal = len(s.reader.mappings.cache)
80+
stats.FunctionsTotal = len(s.reader.functions.cache)
81+
stats.StringsTotal = len(s.reader.strings.cache)
82+
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package phlaredb
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"path/filepath"
7+
8+
schemav1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1"
9+
"github.com/grafana/pyroscope/pkg/phlaredb/symdb"
10+
)
11+
12+
// TODO(kolesnikovae): Refactor to symdb.
13+
14+
type SymbolsWriter interface {
15+
SymbolsAppender(partition uint64) (SymbolsAppender, error)
16+
}
17+
18+
type SymbolsAppender interface {
19+
AppendStacktraces([]uint32, []*schemav1.Stacktrace)
20+
AppendLocations([]uint32, []*schemav1.InMemoryLocation)
21+
AppendMappings([]uint32, []*schemav1.InMemoryMapping)
22+
AppendFunctions([]uint32, []*schemav1.InMemoryFunction)
23+
AppendStrings([]uint32, []string)
24+
}
25+
26+
type symbolsWriter struct {
27+
partitions map[uint64]*symbolsAppender
28+
29+
locations deduplicatingSlice[*schemav1.InMemoryLocation, locationsKey, *locationsHelper, *schemav1.LocationPersister]
30+
mappings deduplicatingSlice[*schemav1.InMemoryMapping, mappingsKey, *mappingsHelper, *schemav1.MappingPersister]
31+
functions deduplicatingSlice[*schemav1.InMemoryFunction, functionsKey, *functionsHelper, *schemav1.FunctionPersister]
32+
strings deduplicatingSlice[string, string, *stringsHelper, *schemav1.StringPersister]
33+
tables []Table
34+
35+
symdb *symdb.SymDB
36+
}
37+
38+
func newSymbolsWriter(dst string, cfg *ParquetConfig) (*symbolsWriter, error) {
39+
w := symbolsWriter{
40+
partitions: make(map[uint64]*symbolsAppender),
41+
}
42+
dir := filepath.Join(dst, symdb.DefaultDirName)
43+
w.symdb = symdb.NewSymDB(symdb.DefaultConfig().WithDirectory(dir))
44+
w.tables = []Table{
45+
&w.locations,
46+
&w.mappings,
47+
&w.functions,
48+
&w.strings,
49+
}
50+
for _, t := range w.tables {
51+
if err := t.Init(dst, cfg, contextHeadMetrics(context.Background())); err != nil {
52+
return nil, err
53+
}
54+
}
55+
return &w, nil
56+
}
57+
58+
func (w *symbolsWriter) SymbolsAppender(partition uint64) (SymbolsAppender, error) {
59+
p, ok := w.partitions[partition]
60+
if !ok {
61+
appender := w.symdb.SymbolsAppender(partition)
62+
x := &symbolsAppender{
63+
stacktraces: appender.StacktraceAppender(),
64+
writer: w,
65+
}
66+
w.partitions[partition] = x
67+
p = x
68+
}
69+
return p, nil
70+
}
71+
72+
func (w *symbolsWriter) Close() error {
73+
for _, t := range w.tables {
74+
_, _, err := t.Flush(context.Background())
75+
if err != nil {
76+
return fmt.Errorf("flushing table %s: %w", t.Name(), err)
77+
}
78+
if err = t.Close(); err != nil {
79+
return fmt.Errorf("closing table %s: %w", t.Name(), err)
80+
}
81+
}
82+
if err := w.symdb.Flush(); err != nil {
83+
return fmt.Errorf("flushing symbol database: %w", err)
84+
}
85+
return nil
86+
}
87+
88+
type symbolsAppender struct {
89+
stacktraces symdb.StacktraceAppender
90+
writer *symbolsWriter
91+
}
92+
93+
func (s symbolsAppender) AppendStacktraces(dst []uint32, stacktraces []*schemav1.Stacktrace) {
94+
s.stacktraces.AppendStacktrace(dst, stacktraces)
95+
}
96+
97+
func (s symbolsAppender) AppendLocations(dst []uint32, locations []*schemav1.InMemoryLocation) {
98+
s.writer.locations.append(dst, locations)
99+
}
100+
101+
func (s symbolsAppender) AppendMappings(dst []uint32, mappings []*schemav1.InMemoryMapping) {
102+
s.writer.mappings.append(dst, mappings)
103+
}
104+
105+
func (s symbolsAppender) AppendFunctions(dst []uint32, functions []*schemav1.InMemoryFunction) {
106+
s.writer.functions.append(dst, functions)
107+
}
108+
109+
func (s symbolsAppender) AppendStrings(dst []uint32, strings []string) {
110+
s.writer.strings.append(dst, strings)
111+
}

0 commit comments

Comments
 (0)