Skip to content

Commit 58c6cf2

Browse files
authored
feat: Pull cost attribution labels for tenant (#1544)
Following a similar path as limits to pull cost attribution labels for tenants. Adds a new internal library `cals/cals.go` that provides an interface that wraps the `TenantProvider` client. Updates all of the wiring from Updated => Scraper to pass along the new cal client. Starts the process of using the library in `scraper.go`, which right now finds the cost attribution labels for a specific tenant. Returns an empty slice if nothing is found. This will be dependent upon the API changes being released to get cost attribution labels defined at the tenant level. My goal is to have a follow up PR that updates the telemeter bits to track executions by cost attribution labels. - relies on grafana/synthetic-monitoring-api#1750
1 parent 5eec220 commit 58c6cf2

File tree

7 files changed

+187
-44
lines changed

7 files changed

+187
-44
lines changed

cmd/synthetic-monitoring-agent/main.go

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"google.golang.org/grpc/grpclog"
2525

2626
"github.com/grafana/synthetic-monitoring-agent/internal/adhoc"
27+
"github.com/grafana/synthetic-monitoring-agent/internal/cals"
2728
"github.com/grafana/synthetic-monitoring-agent/internal/checks"
2829
"github.com/grafana/synthetic-monitoring-agent/internal/feature"
2930
"github.com/grafana/synthetic-monitoring-agent/internal/http"
@@ -315,6 +316,7 @@ func run(args []string, stdout io.Writer) error {
315316
publisher := publisherFactory(ctx, tm, zl.With().Str("subsystem", "publisher").Str("version", config.SelectedPublisher).Logger(), promRegisterer)
316317
limits := limits.NewTenantLimits(tm)
317318
secretProvider := secrets.NewSecretProvider(tm, 60*time.Second, zl.With().Str("subsystem", "secretstore").Logger())
319+
cals := cals.NewCostAttributionLabels(tm)
318320

319321
telemetry := telemetry.NewTelemeter(
320322
ctx, uuid.New().String(), time.Duration(config.TelemetryTimeSpan)*time.Minute,
@@ -324,21 +326,23 @@ func run(args []string, stdout io.Writer) error {
324326
)
325327

326328
checksUpdater, err := checks.NewUpdater(checks.UpdaterOptions{
327-
Conn: conn,
328-
Logger: zl.With().Str("subsystem", "updater").Logger(),
329-
Backoff: newConnectionBackoff(),
330-
Publisher: publisher,
331-
TenantCh: tenantCh,
332-
IsConnected: readynessHandler.Set,
333-
PromRegisterer: promRegisterer,
334-
Features: features,
335-
K6Runner: k6Runner,
336-
ScraperFactory: scraper.New,
337-
TenantLimits: limits,
338-
SecretProvider: secretProvider,
339-
Telemeter: telemetry,
340-
UsageReporter: usageReporter,
329+
Conn: conn,
330+
Logger: zl.With().Str("subsystem", "updater").Logger(),
331+
Backoff: newConnectionBackoff(),
332+
Publisher: publisher,
333+
TenantCh: tenantCh,
334+
IsConnected: readynessHandler.Set,
335+
PromRegisterer: promRegisterer,
336+
Features: features,
337+
K6Runner: k6Runner,
338+
ScraperFactory: scraper.New,
339+
TenantLimits: limits,
340+
SecretProvider: secretProvider,
341+
Telemeter: telemetry,
342+
UsageReporter: usageReporter,
343+
CostAttributionLabels: cals,
341344
})
345+
342346
if err != nil {
343347
return fmt.Errorf("cannot create checks updater: %w", err)
344348
}

internal/cals/cals.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package cals
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
8+
"github.com/grafana/synthetic-monitoring-agent/internal/model"
9+
sm "github.com/grafana/synthetic-monitoring-agent/pkg/pb/synthetic_monitoring"
10+
)
11+
12+
var (
13+
ErrTenantProvider = errors.New("fetching tenant data")
14+
)
15+
16+
type TenantProvider interface {
17+
GetTenant(context.Context, *sm.TenantInfo) (*sm.Tenant, error)
18+
}
19+
20+
// CostAttributionLabels has a TenantProvider that pulls data about a specific tenant
21+
type CostAttributionLabels struct {
22+
provider TenantProvider
23+
}
24+
25+
// NewCostAttributionLabels is a helper method to create a NewCostAttributionLabels provider
26+
func NewCostAttributionLabels(provider TenantProvider) *CostAttributionLabels {
27+
return &CostAttributionLabels{
28+
provider: provider,
29+
}
30+
}
31+
32+
// CostAttributionLabels will call TenantProvider.GetTenant to search for a specific tenant and returns Tenant.CostAttributionLabels
33+
func (tcal CostAttributionLabels) CostAttributionLabels(ctx context.Context, tenantID model.GlobalID) ([]string, error) {
34+
tenant, err := tcal.provider.GetTenant(ctx, &sm.TenantInfo{
35+
Id: int64(tenantID),
36+
})
37+
38+
if err != nil {
39+
return nil, fmt.Errorf("%w: %v", ErrTenantProvider, err)
40+
}
41+
42+
return tenant.CostAttributionLabels, nil
43+
}

internal/cals/cals_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package cals
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"testing"
7+
8+
sm "github.com/grafana/synthetic-monitoring-agent/pkg/pb/synthetic_monitoring"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
type mockTenantProvider struct {
13+
tenant *sm.Tenant
14+
err error
15+
}
16+
17+
func (m mockTenantProvider) GetTenant(_ context.Context, _ *sm.TenantInfo) (*sm.Tenant, error) {
18+
return m.tenant, m.err
19+
}
20+
21+
func TestTenantCostAttributionLabels_GetCostAttributionLabels(t *testing.T) {
22+
testcases := map[string]struct {
23+
tenantProvider *mockTenantProvider
24+
expectedCostAttributionLabels []string
25+
expectError bool
26+
}{
27+
"cals should match up": {
28+
tenantProvider: &mockTenantProvider{
29+
tenant: &sm.Tenant{
30+
CostAttributionLabels: []string{"this", "is", "a", "test"},
31+
},
32+
},
33+
expectedCostAttributionLabels: []string{"this", "is", "a", "test"},
34+
expectError: false,
35+
},
36+
"Handle returning an error": {
37+
tenantProvider: &mockTenantProvider{
38+
tenant: &sm.Tenant{},
39+
err: fmt.Errorf("error getting tenant info"),
40+
},
41+
expectedCostAttributionLabels: []string{},
42+
expectError: true,
43+
},
44+
}
45+
for name, testcase := range testcases {
46+
t.Run(name, func(t *testing.T) {
47+
tcal := NewCostAttributionLabels(testcase.tenantProvider)
48+
cals, err := tcal.CostAttributionLabels(context.Background(), 1)
49+
if testcase.expectError {
50+
require.ErrorIs(t, err, ErrTenantProvider)
51+
}
52+
require.ElementsMatch(t, cals, testcase.expectedCostAttributionLabels)
53+
})
54+
}
55+
}

internal/checks/checks.go

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
logproto "github.com/grafana/loki/pkg/push"
2525

26+
"github.com/grafana/synthetic-monitoring-agent/internal/cals"
2627
"github.com/grafana/synthetic-monitoring-agent/internal/error_types"
2728
"github.com/grafana/synthetic-monitoring-agent/internal/feature"
2829
"github.com/grafana/synthetic-monitoring-agent/internal/k6runner"
@@ -84,6 +85,7 @@ type Updater struct {
8485
tenantSecrets secrets.SecretProvider
8586
telemeter *telemetry.Telemeter
8687
usageReporter usage.Reporter
88+
tenantCals *cals.CostAttributionLabels
8789
}
8890

8991
type apiInfo struct {
@@ -106,20 +108,21 @@ type (
106108
)
107109

108110
type UpdaterOptions struct {
109-
Conn *grpc.ClientConn
110-
Logger zerolog.Logger
111-
Backoff Backoffer
112-
Publisher pusher.Publisher
113-
TenantCh chan<- sm.Tenant
114-
IsConnected func(bool)
115-
PromRegisterer prometheus.Registerer
116-
Features feature.Collection
117-
K6Runner k6runner.Runner
118-
ScraperFactory scraper.Factory
119-
TenantLimits *limits.TenantLimits
120-
SecretProvider secrets.SecretProvider
121-
Telemeter *telemetry.Telemeter
122-
UsageReporter usage.Reporter
111+
Conn *grpc.ClientConn
112+
Logger zerolog.Logger
113+
Backoff Backoffer
114+
Publisher pusher.Publisher
115+
TenantCh chan<- sm.Tenant
116+
IsConnected func(bool)
117+
PromRegisterer prometheus.Registerer
118+
Features feature.Collection
119+
K6Runner k6runner.Runner
120+
ScraperFactory scraper.Factory
121+
TenantLimits *limits.TenantLimits
122+
SecretProvider secrets.SecretProvider
123+
Telemeter *telemetry.Telemeter
124+
UsageReporter usage.Reporter
125+
CostAttributionLabels *cals.CostAttributionLabels
123126
}
124127

125128
func NewUpdater(opts UpdaterOptions) (*Updater, error) {
@@ -253,6 +256,7 @@ func NewUpdater(opts UpdaterOptions) (*Updater, error) {
253256
scrapesCounter: scrapesCounter,
254257
},
255258
usageReporter: opts.UsageReporter,
259+
tenantCals: opts.CostAttributionLabels,
256260
}, nil
257261
}
258262

@@ -935,8 +939,9 @@ func (c *Updater) addAndStartScraperWithLock(ctx context.Context, check model.Ch
935939
c.logger,
936940
metrics,
937941
c.k6Runner,
938-
c.tenantLimits, c.telemeter, c.tenantSecrets,
942+
c.tenantLimits, c.telemeter, c.tenantSecrets, c.tenantCals,
939943
)
944+
940945
if err != nil {
941946
return fmt.Errorf("cannot create new scraper: %w", err)
942947
}

internal/checks/checks_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -474,6 +474,7 @@ func testScraperFactory(ctx context.Context, check model.Check, publisher pusher
474474
labelsLimiter scraper.LabelsLimiter,
475475
telemeter *telemetry.Telemeter,
476476
secretStore secrets.SecretProvider,
477+
cals scraper.TenantCals,
477478
) (*scraper.Scraper, error) {
478479
return scraper.NewWithOpts(
479480
ctx,

internal/scraper/scraper.go

Lines changed: 39 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ type LabelsLimiter interface {
5858
LogLabels(ctx context.Context, tenantID model.GlobalID) (int, error)
5959
}
6060

61+
type TenantCals interface {
62+
CostAttributionLabels(ctx context.Context, tenantID model.GlobalID) ([]string, error)
63+
}
64+
6165
type Telemeter interface {
6266
AddExecution(e telemetry.Execution)
6367
}
@@ -77,6 +81,7 @@ type Scraper struct {
7781
summaries map[uint64]prometheus.Summary
7882
histograms map[uint64]prometheus.Histogram
7983
telemeter Telemeter
84+
cals TenantCals
8085
}
8186

8287
type Factory func(
@@ -88,6 +93,7 @@ type Factory func(
8893
labelsLimiter LabelsLimiter,
8994
telemeter *telemetry.Telemeter,
9095
secretStore secrets.SecretProvider,
96+
cals TenantCals,
9197
) (*Scraper, error)
9298

9399
type (
@@ -122,28 +128,31 @@ func New(
122128
labelsLimiter LabelsLimiter,
123129
telemeter *telemetry.Telemeter,
124130
secretStore secrets.SecretProvider,
131+
cals TenantCals,
125132
) (*Scraper, error) {
126133
return NewWithOpts(ctx, check, ScraperOpts{
127-
Probe: probe,
128-
Publisher: publisher,
129-
Logger: logger,
130-
Metrics: metrics,
131-
ProbeFactory: prober.NewProberFactory(k6runner, probe.Id, features, secretStore),
132-
LabelsLimiter: labelsLimiter,
133-
Telemeter: telemeter,
134+
Probe: probe,
135+
Publisher: publisher,
136+
Logger: logger,
137+
Metrics: metrics,
138+
ProbeFactory: prober.NewProberFactory(k6runner, probe.Id, features, secretStore),
139+
LabelsLimiter: labelsLimiter,
140+
Telemeter: telemeter,
141+
CostAttributionLabels: cals,
134142
})
135143
}
136144

137145
var _ Factory = New
138146

139147
type ScraperOpts struct {
140-
Probe sm.Probe
141-
Publisher pusher.Publisher
142-
Logger zerolog.Logger
143-
Metrics Metrics
144-
ProbeFactory prober.ProberFactory
145-
LabelsLimiter LabelsLimiter
146-
Telemeter Telemeter
148+
Probe sm.Probe
149+
Publisher pusher.Publisher
150+
Logger zerolog.Logger
151+
Metrics Metrics
152+
ProbeFactory prober.ProberFactory
153+
LabelsLimiter LabelsLimiter
154+
Telemeter Telemeter
155+
CostAttributionLabels TenantCals
147156
}
148157

149158
func NewWithOpts(ctx context.Context, check model.Check, opts ScraperOpts) (*Scraper, error) {
@@ -181,6 +190,7 @@ func NewWithOpts(ctx context.Context, check model.Check, opts ScraperOpts) (*Scr
181190
summaries: make(map[uint64]prometheus.Summary),
182191
histograms: make(map[uint64]prometheus.Histogram),
183192
telemeter: opts.Telemeter,
193+
cals: opts.CostAttributionLabels,
184194
}, nil
185195
}
186196

@@ -289,12 +299,26 @@ func (h *scrapeHandler) scrape(ctx context.Context, t time.Time) {
289299
})
290300
}
291301

302+
costAttributionLabels, err := h.scraper.cals.CostAttributionLabels(ctx, h.payload.tenantId)
303+
if err != nil {
304+
// If cals can't be found, do not block
305+
h.scraper.logger.Error().
306+
Int64("tenantId", int64(h.payload.tenantId)).
307+
Msg("Could not load cals")
308+
}
309+
310+
h.scraper.logger.Debug().
311+
Int64("tenantId", int64(h.payload.tenantId)).
312+
Int("costAttributionLabelsCount", len(costAttributionLabels)).
313+
Msgf("Cost Attribution Labels: %v", costAttributionLabels)
314+
292315
// If we are dropping the data in case of errors, we should not count that execution.
293316
h.scraper.telemeter.AddExecution(telemetry.Execution{
294317
LocalTenantID: h.scraper.check.TenantId,
295318
RegionID: int32(h.scraper.check.RegionId),
296319
CheckClass: h.scraper.check.Class(),
297-
Duration: duration,
320+
// TODO(@pokom): Add cost attribution label bits here
321+
Duration: duration,
298322
})
299323

300324
if h.payload != nil {

internal/scraper/scraper_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1312,6 +1312,14 @@ func (l testLabelsLimiter) LogLabels(ctx context.Context, tenantID model.GlobalI
13121312
return l.maxLogLabels, nil
13131313
}
13141314

1315+
type testCalTenants struct {
1316+
costAttributionLabels []string
1317+
}
1318+
1319+
func (t testCalTenants) CostAttributionLabels(_ context.Context, tenantID model.GlobalID) ([]string, error) {
1320+
return t.costAttributionLabels, nil
1321+
}
1322+
13151323
func TestScraperCollectData(t *testing.T) {
13161324
const (
13171325
checkName = "check name"
@@ -1879,6 +1887,9 @@ func TestScraperRun(t *testing.T) {
18791887
maxLogLabels: 15,
18801888
},
18811889
Telemeter: testTelemeter,
1890+
CostAttributionLabels: testCalTenants{
1891+
costAttributionLabels: []string{"testing", "you"},
1892+
},
18821893
})
18831894

18841895
require.NoError(t, err)

0 commit comments

Comments
 (0)