Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion cmd/synthetic-monitoring-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/grafana/synthetic-monitoring-agent/internal/pusher"
pusherV1 "github.com/grafana/synthetic-monitoring-agent/internal/pusher/v1"
pusherV2 "github.com/grafana/synthetic-monitoring-agent/internal/pusher/v2"
"github.com/grafana/synthetic-monitoring-agent/internal/recall"
"github.com/grafana/synthetic-monitoring-agent/internal/scraper"
"github.com/grafana/synthetic-monitoring-agent/internal/secrets"
"github.com/grafana/synthetic-monitoring-agent/internal/telemetry"
Expand Down Expand Up @@ -77,6 +78,9 @@ func run(args []string, stdout io.Writer) error {
MemLimitRatio float64
DisableK6 bool
DisableUsageReports bool
RedisAddr string
RedisMaster string
RedisPassword string
}{
GrpcApiServerAddr: "localhost:4031",
HttpListenAddr: "localhost:4050",
Expand Down Expand Up @@ -110,6 +114,9 @@ func run(args []string, stdout io.Writer) error {
flags.BoolVar(&config.DisableUsageReports, "disable-usage-reports", config.DisableUsageReports, "Disable anonymous usage reports")
flags.Float64Var(&config.MemLimitRatio, "memlimit-ratio", config.MemLimitRatio, "fraction of available memory to use")
flags.Var(&features, "features", "optional feature flags")
flags.StringVar(&config.RedisAddr, "redis-address", config.RedisAddr, "redis address to keep frequency stable across restarts (optional)")
flags.StringVar(&config.RedisMaster, "redis-master", config.RedisMaster, "name of the redis master, for sentinel setups (optional)")
flags.StringVar(&config.RedisPassword, "redis-password", config.RedisPassword, "password to use for the redis connection (optional)")

if err := flags.Parse(args[1:]); err != nil {
return err
Expand Down Expand Up @@ -341,8 +348,8 @@ func run(args []string, stdout io.Writer) error {
Telemeter: telemetry,
UsageReporter: usageReporter,
CostAttributionLabels: cals,
Recaller: recaller(config.RedisAddr, config.RedisMaster, config.RedisPassword),
})

if err != nil {
return fmt.Errorf("cannot create checks updater: %w", err)
}
Expand Down Expand Up @@ -473,3 +480,16 @@ func setupGoMemLimit(ratio float64) error {

return nil
}

// recaller builds either a NopRecaller if redisAddr is empty, or a ValkeyRecaller otherwise.
func recaller(redisAddr, redisMaster, redisPassword string) recall.Recaller {
if redisAddr == "" {
return recall.NopRecaller{}
}

return recall.Valkey(recall.ValkeyOpts{
Address: redisAddr,
MasterName: redisMaster,
Password: redisPassword,
})
}
5 changes: 4 additions & 1 deletion internal/checks/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/grafana/synthetic-monitoring-agent/internal/limits"
"github.com/grafana/synthetic-monitoring-agent/internal/model"
"github.com/grafana/synthetic-monitoring-agent/internal/pusher"
"github.com/grafana/synthetic-monitoring-agent/internal/recall"
"github.com/grafana/synthetic-monitoring-agent/internal/scraper"
"github.com/grafana/synthetic-monitoring-agent/internal/secrets"
"github.com/grafana/synthetic-monitoring-agent/internal/telemetry"
Expand Down Expand Up @@ -86,6 +87,7 @@ type Updater struct {
telemeter *telemetry.Telemeter
usageReporter usage.Reporter
tenantCals *cals.CostAttributionLabels
recaller recall.Recaller
}

type apiInfo struct {
Expand Down Expand Up @@ -123,6 +125,7 @@ type UpdaterOptions struct {
Telemeter *telemetry.Telemeter
UsageReporter usage.Reporter
CostAttributionLabels *cals.CostAttributionLabels
Recaller recall.Recaller
}

func NewUpdater(opts UpdaterOptions) (*Updater, error) {
Expand Down Expand Up @@ -246,6 +249,7 @@ func NewUpdater(opts UpdaterOptions) (*Updater, error) {
tenantLimits: opts.TenantLimits,
tenantSecrets: opts.SecretProvider,
telemeter: opts.Telemeter,
recaller: opts.Recaller,
metrics: metrics{
changeErrorsCounter: changeErrorsCounter,
changesCounter: changesCounter,
Expand Down Expand Up @@ -941,7 +945,6 @@ func (c *Updater) addAndStartScraperWithLock(ctx context.Context, check model.Ch
c.k6Runner,
c.tenantLimits, c.telemeter, c.tenantSecrets, c.tenantCals,
)

if err != nil {
return fmt.Errorf("cannot create new scraper: %w", err)
}
Expand Down
44 changes: 35 additions & 9 deletions internal/scraper/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strings"
"time"

"github.com/grafana/synthetic-monitoring-agent/internal/recall"
"github.com/grafana/synthetic-monitoring-agent/internal/secrets"

kitlog "github.com/go-kit/kit/log" //nolint:staticcheck // TODO(mem): replace in BBE
Expand Down Expand Up @@ -82,6 +83,7 @@ type Scraper struct {
histograms map[uint64]prometheus.Histogram
telemeter Telemeter
cals TenantCals
recaller recall.Recaller
}

type Factory func(
Expand All @@ -94,6 +96,7 @@ type Factory func(
telemeter *telemetry.Telemeter,
secretStore secrets.SecretProvider,
cals TenantCals,
recaller recall.Recaller,
) (*Scraper, error)

type (
Expand Down Expand Up @@ -129,6 +132,7 @@ func New(
telemeter *telemetry.Telemeter,
secretStore secrets.SecretProvider,
cals TenantCals,
recaller recall.Recaller,
) (*Scraper, error) {
return NewWithOpts(ctx, check, ScraperOpts{
Probe: probe,
Expand All @@ -139,6 +143,7 @@ func New(
LabelsLimiter: labelsLimiter,
Telemeter: telemeter,
CostAttributionLabels: cals,
Recaller: recaller,
})
}

Expand All @@ -153,6 +158,7 @@ type ScraperOpts struct {
LabelsLimiter LabelsLimiter
Telemeter Telemeter
CostAttributionLabels TenantCals
Recaller recall.Recaller
}

func NewWithOpts(ctx context.Context, check model.Check, opts ScraperOpts) (*Scraper, error) {
Expand Down Expand Up @@ -191,6 +197,7 @@ func NewWithOpts(ctx context.Context, check model.Check, opts ScraperOpts) (*Scr
histograms: make(map[uint64]prometheus.Histogram),
telemeter: opts.Telemeter,
cals: opts.CostAttributionLabels,
recaller: opts.Recaller,
}, nil
}

Expand Down Expand Up @@ -241,15 +248,28 @@ func (s *Scraper) Run(ctx context.Context) {
// TODO(mem): keep count of the number of successive errors and
// collect logs if threshold is reached.

var (
frequency = ms(s.check.Frequency)
offset = ms(s.check.Offset)
)
frequency := ms(s.check.Frequency)

recallCtx, cancelRecall := context.WithTimeout(ctx, 5*time.Second)
defer cancelRecall()

lastRun, err := s.recaller.Recall(recallCtx, s.check.Id)
if err != nil {
s.logger.Error().Err(err).Msg("could not fetch time for last run")
}

offset := time.Until(lastRun.Add(frequency))

if offset == 0 {
if offset < 0 {
s.logger.Debug().Msg("no last run found for check, assuming it was never run")
offset = randDuration(min(frequency, maxPublishInterval))
}

if configOffset := ms(s.check.Offset); configOffset != 0 {
s.logger.Debug().Dur("configOffset", configOffset).Dur("computedOffset", offset).
Msg("Using fixed offset from config instead of computed one")
}

scrapeHandler := scrapeHandler{scraper: s}

tickWithOffset(
Expand All @@ -274,11 +294,17 @@ type scrapeHandler struct {
func (h *scrapeHandler) scrape(ctx context.Context, t time.Time) {
h.scraper.metrics.AddScrape()

var (
err error
duration time.Duration
)
recallCtx, cancelRecall := context.WithTimeout(ctx, 5*time.Second)
defer cancelRecall()

// Record time of last execution, for as long as 2x frequency of the check. Past that it's really not worth keeping,
// as we'd need to run it as soon as possible anyway.
err := h.scraper.recaller.Remember(recallCtx, h.scraper.check.Id, 2*ms(h.scraper.check.Frequency))
if err != nil {
h.scraper.logger.Warn().Err(err).Msg("could not store last execution time")
}

var duration time.Duration
h.payload, duration, err = h.scraper.collectData(ctx, t)

switch {
Expand Down