From 420afeba381aa98792fc42b756a3c7a7b1fc5c1a Mon Sep 17 00:00:00 2001 From: Nadia Santalla Date: Fri, 14 Nov 2025 17:34:51 +0100 Subject: [PATCH] wip: add recaller to scraper --- cmd/synthetic-monitoring-agent/main.go | 22 ++++++++++++- internal/checks/checks.go | 5 ++- internal/scraper/scraper.go | 44 ++++++++++++++++++++------ 3 files changed, 60 insertions(+), 11 deletions(-) diff --git a/cmd/synthetic-monitoring-agent/main.go b/cmd/synthetic-monitoring-agent/main.go index f6c1b6609..f9d127e12 100644 --- a/cmd/synthetic-monitoring-agent/main.go +++ b/cmd/synthetic-monitoring-agent/main.go @@ -33,6 +33,7 @@ import ( "github.com/grafana/synthetic-monitoring-agent/internal/pusher" pusherV1 "github.com/grafana/synthetic-monitoring-agent/internal/pusher/v1" pusherV2 "github.com/grafana/synthetic-monitoring-agent/internal/pusher/v2" + "github.com/grafana/synthetic-monitoring-agent/internal/recall" "github.com/grafana/synthetic-monitoring-agent/internal/scraper" "github.com/grafana/synthetic-monitoring-agent/internal/secrets" "github.com/grafana/synthetic-monitoring-agent/internal/telemetry" @@ -77,6 +78,9 @@ func run(args []string, stdout io.Writer) error { MemLimitRatio float64 DisableK6 bool DisableUsageReports bool + RedisAddr string + RedisMaster string + RedisPassword string }{ GrpcApiServerAddr: "localhost:4031", HttpListenAddr: "localhost:4050", @@ -110,6 +114,9 @@ func run(args []string, stdout io.Writer) error { flags.BoolVar(&config.DisableUsageReports, "disable-usage-reports", config.DisableUsageReports, "Disable anonymous usage reports") flags.Float64Var(&config.MemLimitRatio, "memlimit-ratio", config.MemLimitRatio, "fraction of available memory to use") flags.Var(&features, "features", "optional feature flags") + flags.StringVar(&config.RedisAddr, "redis-address", config.RedisAddr, "redis address to keep frequency stable across restarts (optional)") + flags.StringVar(&config.RedisMaster, "redis-master", config.RedisMaster, "name of the redis master, for sentinel setups (optional)") + flags.StringVar(&config.RedisPassword, "redis-password", config.RedisPassword, "password to use for the redis connection (optional)") if err := flags.Parse(args[1:]); err != nil { return err @@ -341,8 +348,8 @@ func run(args []string, stdout io.Writer) error { Telemeter: telemetry, UsageReporter: usageReporter, CostAttributionLabels: cals, + Recaller: recaller(config.RedisAddr, config.RedisMaster, config.RedisPassword), }) - if err != nil { return fmt.Errorf("cannot create checks updater: %w", err) } @@ -473,3 +480,16 @@ func setupGoMemLimit(ratio float64) error { return nil } + +// recaller builds either a NopRecaller if redisAddr is empty, or a ValkeyRecaller otherwise. +func recaller(redisAddr, redisMaster, redisPassword string) recall.Recaller { + if redisAddr == "" { + return recall.NopRecaller{} + } + + return recall.Valkey(recall.ValkeyOpts{ + Address: redisAddr, + MasterName: redisMaster, + Password: redisPassword, + }) +} diff --git a/internal/checks/checks.go b/internal/checks/checks.go index d6e92d011..e5ec3bf91 100644 --- a/internal/checks/checks.go +++ b/internal/checks/checks.go @@ -30,6 +30,7 @@ import ( "github.com/grafana/synthetic-monitoring-agent/internal/limits" "github.com/grafana/synthetic-monitoring-agent/internal/model" "github.com/grafana/synthetic-monitoring-agent/internal/pusher" + "github.com/grafana/synthetic-monitoring-agent/internal/recall" "github.com/grafana/synthetic-monitoring-agent/internal/scraper" "github.com/grafana/synthetic-monitoring-agent/internal/secrets" "github.com/grafana/synthetic-monitoring-agent/internal/telemetry" @@ -86,6 +87,7 @@ type Updater struct { telemeter *telemetry.Telemeter usageReporter usage.Reporter tenantCals *cals.CostAttributionLabels + recaller recall.Recaller } type apiInfo struct { @@ -123,6 +125,7 @@ type UpdaterOptions struct { Telemeter *telemetry.Telemeter UsageReporter usage.Reporter CostAttributionLabels *cals.CostAttributionLabels + Recaller recall.Recaller } func NewUpdater(opts UpdaterOptions) (*Updater, error) { @@ -246,6 +249,7 @@ func NewUpdater(opts UpdaterOptions) (*Updater, error) { tenantLimits: opts.TenantLimits, tenantSecrets: opts.SecretProvider, telemeter: opts.Telemeter, + recaller: opts.Recaller, metrics: metrics{ changeErrorsCounter: changeErrorsCounter, changesCounter: changesCounter, @@ -941,7 +945,6 @@ func (c *Updater) addAndStartScraperWithLock(ctx context.Context, check model.Ch c.k6Runner, c.tenantLimits, c.telemeter, c.tenantSecrets, c.tenantCals, ) - if err != nil { return fmt.Errorf("cannot create new scraper: %w", err) } diff --git a/internal/scraper/scraper.go b/internal/scraper/scraper.go index 87c60d5b1..1f0f7b406 100644 --- a/internal/scraper/scraper.go +++ b/internal/scraper/scraper.go @@ -11,6 +11,7 @@ import ( "strings" "time" + "github.com/grafana/synthetic-monitoring-agent/internal/recall" "github.com/grafana/synthetic-monitoring-agent/internal/secrets" kitlog "github.com/go-kit/kit/log" //nolint:staticcheck // TODO(mem): replace in BBE @@ -82,6 +83,7 @@ type Scraper struct { histograms map[uint64]prometheus.Histogram telemeter Telemeter cals TenantCals + recaller recall.Recaller } type Factory func( @@ -94,6 +96,7 @@ type Factory func( telemeter *telemetry.Telemeter, secretStore secrets.SecretProvider, cals TenantCals, + recaller recall.Recaller, ) (*Scraper, error) type ( @@ -129,6 +132,7 @@ func New( telemeter *telemetry.Telemeter, secretStore secrets.SecretProvider, cals TenantCals, + recaller recall.Recaller, ) (*Scraper, error) { return NewWithOpts(ctx, check, ScraperOpts{ Probe: probe, @@ -139,6 +143,7 @@ func New( LabelsLimiter: labelsLimiter, Telemeter: telemeter, CostAttributionLabels: cals, + Recaller: recaller, }) } @@ -153,6 +158,7 @@ type ScraperOpts struct { LabelsLimiter LabelsLimiter Telemeter Telemeter CostAttributionLabels TenantCals + Recaller recall.Recaller } func NewWithOpts(ctx context.Context, check model.Check, opts ScraperOpts) (*Scraper, error) { @@ -191,6 +197,7 @@ func NewWithOpts(ctx context.Context, check model.Check, opts ScraperOpts) (*Scr histograms: make(map[uint64]prometheus.Histogram), telemeter: opts.Telemeter, cals: opts.CostAttributionLabels, + recaller: opts.Recaller, }, nil } @@ -241,15 +248,28 @@ func (s *Scraper) Run(ctx context.Context) { // TODO(mem): keep count of the number of successive errors and // collect logs if threshold is reached. - var ( - frequency = ms(s.check.Frequency) - offset = ms(s.check.Offset) - ) + frequency := ms(s.check.Frequency) + + recallCtx, cancelRecall := context.WithTimeout(ctx, 5*time.Second) + defer cancelRecall() + + lastRun, err := s.recaller.Recall(recallCtx, s.check.Id) + if err != nil { + s.logger.Error().Err(err).Msg("could not fetch time for last run") + } + + offset := time.Until(lastRun.Add(frequency)) - if offset == 0 { + if offset < 0 { + s.logger.Debug().Msg("no last run found for check, assuming it was never run") offset = randDuration(min(frequency, maxPublishInterval)) } + if configOffset := ms(s.check.Offset); configOffset != 0 { + s.logger.Debug().Dur("configOffset", configOffset).Dur("computedOffset", offset). + Msg("Using fixed offset from config instead of computed one") + } + scrapeHandler := scrapeHandler{scraper: s} tickWithOffset( @@ -274,11 +294,17 @@ type scrapeHandler struct { func (h *scrapeHandler) scrape(ctx context.Context, t time.Time) { h.scraper.metrics.AddScrape() - var ( - err error - duration time.Duration - ) + recallCtx, cancelRecall := context.WithTimeout(ctx, 5*time.Second) + defer cancelRecall() + + // Record time of last execution, for as long as 2x frequency of the check. Past that it's really not worth keeping, + // as we'd need to run it as soon as possible anyway. + err := h.scraper.recaller.Remember(recallCtx, h.scraper.check.Id, 2*ms(h.scraper.check.Frequency)) + if err != nil { + h.scraper.logger.Warn().Err(err).Msg("could not store last execution time") + } + var duration time.Duration h.payload, duration, err = h.scraper.collectData(ctx, t) switch {