Skip to content

Commit b95186b

Browse files
committed
wip: add recaller to scraper
1 parent 1d2fc6f commit b95186b

File tree

3 files changed

+60
-11
lines changed

3 files changed

+60
-11
lines changed

cmd/synthetic-monitoring-agent/main.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/grafana/synthetic-monitoring-agent/internal/pusher"
3434
pusherV1 "github.com/grafana/synthetic-monitoring-agent/internal/pusher/v1"
3535
pusherV2 "github.com/grafana/synthetic-monitoring-agent/internal/pusher/v2"
36+
"github.com/grafana/synthetic-monitoring-agent/internal/recall"
3637
"github.com/grafana/synthetic-monitoring-agent/internal/scraper"
3738
"github.com/grafana/synthetic-monitoring-agent/internal/secrets"
3839
"github.com/grafana/synthetic-monitoring-agent/internal/telemetry"
@@ -77,6 +78,9 @@ func run(args []string, stdout io.Writer) error {
7778
MemLimitRatio float64
7879
DisableK6 bool
7980
DisableUsageReports bool
81+
RedisAddr string
82+
RedisMaster string
83+
RedisPassword string
8084
}{
8185
GrpcApiServerAddr: "localhost:4031",
8286
HttpListenAddr: "localhost:4050",
@@ -110,6 +114,9 @@ func run(args []string, stdout io.Writer) error {
110114
flags.BoolVar(&config.DisableUsageReports, "disable-usage-reports", config.DisableUsageReports, "Disable anonymous usage reports")
111115
flags.Float64Var(&config.MemLimitRatio, "memlimit-ratio", config.MemLimitRatio, "fraction of available memory to use")
112116
flags.Var(&features, "features", "optional feature flags")
117+
flags.StringVar(&config.RedisAddr, "redis-address", config.RedisAddr, "redis address to keep frequency stable across restarts (optional)")
118+
flags.StringVar(&config.RedisMaster, "redis-master", config.RedisMaster, "name of the redis master, for sentinel setups (optional)")
119+
flags.StringVar(&config.RedisPassword, "redis-password", config.RedisPassword, "password to use for the redis connection (optional)")
113120

114121
if err := flags.Parse(args[1:]); err != nil {
115122
return err
@@ -341,8 +348,8 @@ func run(args []string, stdout io.Writer) error {
341348
Telemeter: telemetry,
342349
UsageReporter: usageReporter,
343350
CostAttributionLabels: cals,
351+
Recaller: recaller(config.RedisAddr, config.RedisMaster, config.RedisPassword),
344352
})
345-
346353
if err != nil {
347354
return fmt.Errorf("cannot create checks updater: %w", err)
348355
}
@@ -473,3 +480,16 @@ func setupGoMemLimit(ratio float64) error {
473480

474481
return nil
475482
}
483+
484+
// recaller builds either a NopRecaller if redisAddr is empty, or a ValkeyRecaller otherwise.
485+
func recaller(redisAddr, redisMaster, redisPassword string) recall.Recaller {
486+
if redisAddr == "" {
487+
return recall.NopRecaller{}
488+
}
489+
490+
return recall.Valkey(recall.ValkeyOpts{
491+
Address: redisAddr,
492+
MasterName: redisMaster,
493+
Password: redisPassword,
494+
})
495+
}

internal/checks/checks.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/grafana/synthetic-monitoring-agent/internal/limits"
3131
"github.com/grafana/synthetic-monitoring-agent/internal/model"
3232
"github.com/grafana/synthetic-monitoring-agent/internal/pusher"
33+
"github.com/grafana/synthetic-monitoring-agent/internal/recall"
3334
"github.com/grafana/synthetic-monitoring-agent/internal/scraper"
3435
"github.com/grafana/synthetic-monitoring-agent/internal/secrets"
3536
"github.com/grafana/synthetic-monitoring-agent/internal/telemetry"
@@ -86,6 +87,7 @@ type Updater struct {
8687
telemeter *telemetry.Telemeter
8788
usageReporter usage.Reporter
8889
tenantCals *cals.CostAttributionLabels
90+
recaller recall.Recaller
8991
}
9092

9193
type apiInfo struct {
@@ -123,6 +125,7 @@ type UpdaterOptions struct {
123125
Telemeter *telemetry.Telemeter
124126
UsageReporter usage.Reporter
125127
CostAttributionLabels *cals.CostAttributionLabels
128+
Recaller recall.Recaller
126129
}
127130

128131
func NewUpdater(opts UpdaterOptions) (*Updater, error) {
@@ -246,6 +249,7 @@ func NewUpdater(opts UpdaterOptions) (*Updater, error) {
246249
tenantLimits: opts.TenantLimits,
247250
tenantSecrets: opts.SecretProvider,
248251
telemeter: opts.Telemeter,
252+
recaller: opts.Recaller,
249253
metrics: metrics{
250254
changeErrorsCounter: changeErrorsCounter,
251255
changesCounter: changesCounter,
@@ -941,7 +945,6 @@ func (c *Updater) addAndStartScraperWithLock(ctx context.Context, check model.Ch
941945
c.k6Runner,
942946
c.tenantLimits, c.telemeter, c.tenantSecrets, c.tenantCals,
943947
)
944-
945948
if err != nil {
946949
return fmt.Errorf("cannot create new scraper: %w", err)
947950
}

internal/scraper/scraper.go

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"strings"
1212
"time"
1313

14+
"github.com/grafana/synthetic-monitoring-agent/internal/recall"
1415
"github.com/grafana/synthetic-monitoring-agent/internal/secrets"
1516

1617
kitlog "github.com/go-kit/kit/log" //nolint:staticcheck // TODO(mem): replace in BBE
@@ -82,6 +83,7 @@ type Scraper struct {
8283
histograms map[uint64]prometheus.Histogram
8384
telemeter Telemeter
8485
cals TenantCals
86+
recaller recall.Recaller
8587
}
8688

8789
type Factory func(
@@ -94,6 +96,7 @@ type Factory func(
9496
telemeter *telemetry.Telemeter,
9597
secretStore secrets.SecretProvider,
9698
cals TenantCals,
99+
recaller recall.Recaller,
97100
) (*Scraper, error)
98101

99102
type (
@@ -129,6 +132,7 @@ func New(
129132
telemeter *telemetry.Telemeter,
130133
secretStore secrets.SecretProvider,
131134
cals TenantCals,
135+
recaller recall.Recaller,
132136
) (*Scraper, error) {
133137
return NewWithOpts(ctx, check, ScraperOpts{
134138
Probe: probe,
@@ -139,6 +143,7 @@ func New(
139143
LabelsLimiter: labelsLimiter,
140144
Telemeter: telemeter,
141145
CostAttributionLabels: cals,
146+
Recaller: recaller,
142147
})
143148
}
144149

@@ -153,6 +158,7 @@ type ScraperOpts struct {
153158
LabelsLimiter LabelsLimiter
154159
Telemeter Telemeter
155160
CostAttributionLabels TenantCals
161+
Recaller recall.Recaller
156162
}
157163

158164
func NewWithOpts(ctx context.Context, check model.Check, opts ScraperOpts) (*Scraper, error) {
@@ -191,6 +197,7 @@ func NewWithOpts(ctx context.Context, check model.Check, opts ScraperOpts) (*Scr
191197
histograms: make(map[uint64]prometheus.Histogram),
192198
telemeter: opts.Telemeter,
193199
cals: opts.CostAttributionLabels,
200+
recaller: opts.Recaller,
194201
}, nil
195202
}
196203

@@ -241,15 +248,28 @@ func (s *Scraper) Run(ctx context.Context) {
241248
// TODO(mem): keep count of the number of successive errors and
242249
// collect logs if threshold is reached.
243250

244-
var (
245-
frequency = ms(s.check.Frequency)
246-
offset = ms(s.check.Offset)
247-
)
251+
frequency := ms(s.check.Frequency)
252+
253+
recallCtx, cancelRecall := context.WithTimeout(ctx, 5*time.Second)
254+
defer cancelRecall()
255+
256+
lastRun, err := s.recaller.Recall(recallCtx, s.check.Id)
257+
if err != nil {
258+
s.logger.Error().Err(err).Msg("could not fetch time for last run")
259+
}
260+
261+
offset := time.Until(lastRun.Add(frequency))
248262

249-
if offset == 0 {
263+
if offset < 0 {
264+
s.logger.Debug().Msg("no last run found for check, assuming it was never run")
250265
offset = randDuration(min(frequency, maxPublishInterval))
251266
}
252267

268+
if configOffset := ms(s.check.Offset); configOffset != 0 {
269+
s.logger.Debug().Dur("configOffset", configOffset).Dur("computedOffset", offset).
270+
Msg("Using fixed offset from config instead of computed one")
271+
}
272+
253273
scrapeHandler := scrapeHandler{scraper: s}
254274

255275
tickWithOffset(
@@ -274,11 +294,17 @@ type scrapeHandler struct {
274294
func (h *scrapeHandler) scrape(ctx context.Context, t time.Time) {
275295
h.scraper.metrics.AddScrape()
276296

277-
var (
278-
err error
279-
duration time.Duration
280-
)
297+
recallCtx, cancelRecall := context.WithTimeout(ctx, 5*time.Second)
298+
defer cancelRecall()
299+
300+
// Record time of last execution, for as long as 2x frequency of the check. Past that it's really not worth keeping,
301+
// as we'd need to run it as soon as possible anyway.
302+
err := h.scraper.recaller.Remember(recallCtx, h.scraper.check.Id, 2*ms(h.scraper.check.Frequency))
303+
if err != nil {
304+
h.scraper.logger.Warn().Err(err).Msg("could not store last execution time")
305+
}
281306

307+
var duration time.Duration
282308
h.payload, duration, err = h.scraper.collectData(ctx, t)
283309

284310
switch {

0 commit comments

Comments
 (0)