Skip to content

Commit a9a8111

Browse files
authored
chore: Make profileEntryIterator more configurable (#4621)
* chore: Make profileEntryIterator more configurable * Implement profileIDSelector and address review comments * Disable ID retrieval until it is used in #4615
1 parent d022b78 commit a9a8111

File tree

3 files changed

+216
-20
lines changed

3 files changed

+216
-20
lines changed

pkg/phlaredb/schemas/v1/profiles.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
)
1818

1919
const (
20+
IDColumnName = "ID"
2021
SeriesIndexColumnName = "SeriesIndex"
2122
TimeNanosColumnName = "TimeNanos"
2223
StacktracePartitionColumnName = "StacktracePartition"

pkg/querybackend/query_profile_entry.go

Lines changed: 199 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package querybackend
22

33
import (
4+
"slices"
5+
"strings"
6+
7+
"github.com/google/uuid"
48
"github.com/parquet-go/parquet-go"
59
"github.com/prometheus/common/model"
610
"github.com/prometheus/prometheus/model/labels"
@@ -23,39 +27,198 @@ type ProfileEntry struct {
2327
Fingerprint model.Fingerprint
2428
Labels phlaremodel.Labels
2529
Partition uint64
30+
ID string
2631
}
2732

2833
func (e ProfileEntry) RowNumber() int64 { return e.RowNum }
2934

30-
func profileEntryIterator(q *queryContext, groupBy ...string) (iter.Iterator[ProfileEntry], error) {
31-
series, err := getSeries(q.ds.Index(), q.req.matchers, groupBy...)
35+
type profileIteratorOption struct {
36+
iterator func(*iteratorOpts)
37+
series func(*seriesOpts)
38+
}
39+
40+
func withAllLabels() profileIteratorOption {
41+
return profileIteratorOption{
42+
series: func(opts *seriesOpts) {
43+
opts.allLabels = true
44+
},
45+
}
46+
}
47+
48+
func withGroupByLabels(by ...string) profileIteratorOption {
49+
return profileIteratorOption{
50+
series: func(opts *seriesOpts) {
51+
opts.groupBy = by
52+
},
53+
}
54+
}
55+
56+
func withFetchPartition(v bool) profileIteratorOption {
57+
return profileIteratorOption{
58+
iterator: func(opts *iteratorOpts) {
59+
opts.fetchPartition = v
60+
},
61+
}
62+
}
63+
64+
func withFetchProfileIDs(v bool) profileIteratorOption {
65+
return profileIteratorOption{
66+
iterator: func(opts *iteratorOpts) {
67+
opts.fetchProfileIDs = v
68+
},
69+
}
70+
}
71+
72+
func withProfileIDSelector(ids ...string) (profileIteratorOption, error) {
73+
// convert profile ids into uuids
74+
uuids := make([]string, 0, len(ids))
75+
for _, id := range ids {
76+
u, err := uuid.Parse(id)
77+
if err != nil {
78+
return profileIteratorOption{}, err
79+
}
80+
uuids = append(uuids, string(u[:]))
81+
}
82+
83+
return profileIteratorOption{
84+
iterator: func(opts *iteratorOpts) {
85+
opts.profileIDSelector = uuids
86+
},
87+
}, nil
88+
}
89+
90+
type iteratorOpts struct {
91+
profileIDSelector []string // this is a slice of the byte form of the UUID
92+
fetchProfileIDs bool
93+
fetchPartition bool
94+
}
95+
96+
func iteratorOptsFromOptions(options []profileIteratorOption) iteratorOpts {
97+
opts := iteratorOpts{
98+
fetchPartition: true,
99+
}
100+
for _, f := range options {
101+
if f.iterator != nil {
102+
f.iterator(&opts)
103+
}
104+
}
105+
return opts
106+
}
107+
108+
type queryColumn struct {
109+
name string
110+
predicate parquetquery.Predicate
111+
priority int
112+
}
113+
114+
type queryColumns []queryColumn
115+
116+
func (c queryColumns) names() []string {
117+
result := make([]string, len(c))
118+
for idx := range result {
119+
result[idx] = c[idx].name
120+
}
121+
return result
122+
}
123+
124+
func (c queryColumns) join(q *queryContext) parquetquery.Iterator {
125+
var result parquetquery.Iterator
126+
127+
// sort columns by priority, without modifying queryColumn slice
128+
order := make([]int, len(c))
129+
for idx := range order {
130+
order[idx] = idx
131+
}
132+
slices.SortFunc(order, func(a, b int) int {
133+
if r := c[a].priority - c[b].priority; r != 0 {
134+
return r
135+
}
136+
return strings.Compare(c[a].name, c[b].name)
137+
})
138+
139+
for _, idx := range order {
140+
it := q.ds.Profiles().Column(q.ctx, c[idx].name, c[idx].predicate)
141+
if result == nil {
142+
result = it
143+
continue
144+
}
145+
result = parquetquery.NewBinaryJoinIterator(0,
146+
result,
147+
it,
148+
)
149+
}
150+
return result
151+
}
152+
153+
func profileEntryIterator(q *queryContext, options ...profileIteratorOption) (iter.Iterator[ProfileEntry], error) {
154+
opts := iteratorOptsFromOptions(options)
155+
156+
series, err := getSeries(q.ds.Index(), q.req.matchers, options...)
32157
if err != nil {
33158
return nil, err
34159
}
35-
results := parquetquery.NewBinaryJoinIterator(0,
36-
q.ds.Profiles().Column(q.ctx, "SeriesIndex", parquetquery.NewMapPredicate(series)),
37-
q.ds.Profiles().Column(q.ctx, "TimeNanos", parquetquery.NewIntBetweenPredicate(q.req.startTime, q.req.endTime)),
38-
)
39-
results = parquetquery.NewBinaryJoinIterator(0, results,
40-
q.ds.Profiles().Column(q.ctx, "StacktracePartition", nil),
41-
)
42160

43-
buf := make([][]parquet.Value, 3)
161+
columns := queryColumns{
162+
{schemav1.SeriesIndexColumnName, parquetquery.NewMapPredicate(series), 10},
163+
{schemav1.TimeNanosColumnName, parquetquery.NewIntBetweenPredicate(q.req.startTime, q.req.endTime), 15},
164+
}
165+
processor := []func([][]parquet.Value, *ProfileEntry){}
166+
167+
// fetch partition if requested
168+
if opts.fetchPartition {
169+
offset := len(columns)
170+
columns = append(
171+
columns,
172+
queryColumn{schemav1.StacktracePartitionColumnName, nil, 20},
173+
)
174+
processor = append(processor, func(buf [][]parquet.Value, e *ProfileEntry) {
175+
e.Partition = buf[offset][0].Uint64()
176+
})
177+
}
178+
// fetch profile id if requested or part of the predicate
179+
if opts.fetchProfileIDs || len(opts.profileIDSelector) > 0 {
180+
var (
181+
predicate parquetquery.Predicate
182+
priority = 20
183+
)
184+
if len(opts.profileIDSelector) > 0 {
185+
predicate = parquetquery.NewStringInPredicate(opts.profileIDSelector)
186+
priority = 5
187+
}
188+
offset := len(columns)
189+
columns = append(
190+
columns,
191+
queryColumn{schemav1.IDColumnName, predicate, priority},
192+
)
193+
var u uuid.UUID
194+
processor = append(processor, func(buf [][]parquet.Value, e *ProfileEntry) {
195+
b := buf[offset][0].Bytes()
196+
if len(b) != 16 {
197+
return
198+
}
199+
copy(u[:], b)
200+
e.ID = u.String()
201+
})
202+
}
203+
204+
buf := make([][]parquet.Value, len(columns))
205+
columnNames := columns.names()
206+
44207
entries := iter.NewAsyncBatchIterator[*parquetquery.IteratorResult, ProfileEntry](
45-
results, bigBatchSize,
208+
columns.join(q), bigBatchSize,
46209
func(r *parquetquery.IteratorResult) ProfileEntry {
47-
buf = r.Columns(buf,
48-
schemav1.SeriesIndexColumnName,
49-
schemav1.TimeNanosColumnName,
50-
schemav1.StacktracePartitionColumnName)
210+
buf = r.Columns(buf, columnNames...)
51211
x := series[buf[0][0].Uint32()]
52-
return ProfileEntry{
212+
e := ProfileEntry{
53213
RowNum: r.RowNumber[0],
54214
Timestamp: model.TimeFromUnixNano(buf[1][0].Int64()),
55215
Fingerprint: x.fingerprint,
56216
Labels: x.labels,
57-
Partition: buf[2][0].Uint64(),
58217
}
218+
for _, proc := range processor {
219+
proc(buf, &e)
220+
}
221+
return e
59222
},
60223
func([]ProfileEntry) {},
61224
)
@@ -67,7 +230,19 @@ type series struct {
67230
labels phlaremodel.Labels
68231
}
69232

70-
func getSeries(reader phlaredb.IndexReader, matchers []*labels.Matcher, by ...string) (map[uint32]series, error) {
233+
type seriesOpts struct {
234+
allLabels bool // when this is true, groupBy is ignored
235+
groupBy []string
236+
}
237+
238+
func getSeries(reader phlaredb.IndexReader, matchers []*labels.Matcher, options ...profileIteratorOption) (map[uint32]series, error) {
239+
var opts seriesOpts
240+
for _, f := range options {
241+
if f.series != nil {
242+
f.series(&opts)
243+
}
244+
}
245+
71246
postings, err := getPostings(reader, matchers...)
72247
if err != nil {
73248
return nil, err
@@ -76,7 +251,12 @@ func getSeries(reader phlaredb.IndexReader, matchers []*labels.Matcher, by ...st
76251
s := make(map[uint32]series)
77252
l := make(phlaremodel.Labels, 0, 6)
78253
for postings.Next() {
79-
fp, err := reader.SeriesBy(postings.At(), &l, &chunks, by...)
254+
var fp uint64
255+
if opts.allLabels {
256+
fp, err = reader.Series(postings.At(), &l, &chunks)
257+
} else {
258+
fp, err = reader.SeriesBy(postings.At(), &l, &chunks, opts.groupBy...)
259+
}
80260
if err != nil {
81261
return nil, err
82262
}

pkg/querybackend/query_time_series.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,22 @@ func init() {
3131
}
3232

3333
func queryTimeSeries(q *queryContext, query *queryv1.Query) (r *queryv1.Report, err error) {
34-
entries, err := profileEntryIterator(q, query.TimeSeries.GroupBy...)
34+
opts := []profileIteratorOption{
35+
withFetchPartition(false), // Partition data not needed, as we don't access stacktraces at all
36+
}
37+
exemplarsEnabled := false // TODO: This will be enabled as part of #4615
38+
if exemplarsEnabled {
39+
opts = append(opts,
40+
withAllLabels(),
41+
withFetchProfileIDs(true),
42+
)
43+
} else {
44+
opts = append(opts,
45+
withGroupByLabels(query.TimeSeries.GroupBy...),
46+
)
47+
}
48+
49+
entries, err := profileEntryIterator(q, opts...)
3550
if err != nil {
3651
return nil, err
3752
}

0 commit comments

Comments
 (0)