Skip to content

Commit 6cb5332

Browse files
committed
fix: Add grpc-connect-timeout parameter
This controls the timeout for the GRPC connection. It defaults to 0, which means there's no timeout. Signed-off-by: Marcelo E. Magallon <marcelo.magallon@grafana.com>
1 parent 3a3f597 commit 6cb5332

File tree

3 files changed

+45
-7
lines changed

3 files changed

+45
-7
lines changed

cmd/synthetic-monitoring-agent/main.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ func run(args []string, stdout io.Writer) error {
6767
ReportVersion bool
6868
GrpcApiServerAddr string
6969
GrpcInsecure bool
70+
GrpcConnectTimeout time.Duration
7071
ApiToken Secret
7172
EnableChangeLogLevel bool
7273
EnableDisconnect bool
@@ -106,6 +107,7 @@ func run(args []string, stdout io.Writer) error {
106107
flags.BoolVar(&config.ReportVersion, "version", config.ReportVersion, "report version and exit")
107108
flags.StringVar(&config.GrpcApiServerAddr, "api-server-address", config.GrpcApiServerAddr, "GRPC API server address")
108109
flags.BoolVar(&config.GrpcInsecure, "api-insecure", config.GrpcInsecure, "Don't use TLS with connections to GRPC API")
110+
flags.DurationVar(&config.GrpcConnectTimeout, "grpc-connect-timeout", config.GrpcConnectTimeout, "timeout for initial GRPC connection (0 = no timeout)")
109111
flags.Var(&config.ApiToken, "api-token", `synthetic monitoring probe authentication token (default "")`)
110112
flags.BoolVar(&config.EnableChangeLogLevel, "enable-change-log-level", config.EnableChangeLogLevel, "enable changing the log level at runtime")
111113
flags.BoolVar(&config.EnableDisconnect, "enable-disconnect", config.EnableDisconnect, "enable HTTP /disconnect endpoint")
@@ -385,6 +387,7 @@ func run(args []string, stdout io.Writer) error {
385387
UsageReporter: usageReporter,
386388
CostAttributionLabels: cals,
387389
SupportsProtocolSecrets: config.EnableProtocolSecrets,
390+
GrpcConnectTimeout: config.GrpcConnectTimeout,
388391
})
389392

390393
if err != nil {
@@ -406,6 +409,7 @@ func run(args []string, stdout io.Writer) error {
406409
K6Runner: k6Runner,
407410
SecretProvider: secretProvider,
408411
SupportsProtocolSecrets: config.EnableProtocolSecrets,
412+
GrpcConnectTimeout: config.GrpcConnectTimeout,
409413
})
410414
if err != nil {
411415
return fmt.Errorf("cannot create ad-hoc checks handler: %w", err)

internal/adhoc/adhoc.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ type Handler struct {
4646
grpcAdhocChecksClientFactory func(conn ClientConn) (sm.AdHocChecksClient, error)
4747
proberFactory prober.ProberFactory
4848
supportsProtocolSecrets bool
49+
connectTimeout time.Duration
4950
}
5051

5152
// Error represents errors returned from this package.
@@ -120,6 +121,7 @@ type HandlerOpts struct {
120121
K6Runner k6runner.Runner
121122
SecretProvider secrets.SecretProvider
122123
SupportsProtocolSecrets bool
124+
GrpcConnectTimeout time.Duration
123125

124126
// these two fields exists so that tests can pass alternate
125127
// implementations, they are unexported so that clients of this
@@ -154,6 +156,7 @@ func NewHandler(opts HandlerOpts) (*Handler, error) {
154156
grpcAdhocChecksClientFactory: opts.grpcAdhocChecksClientFactory,
155157
proberFactory: prober.NewProberFactory(opts.K6Runner, 0, opts.Features, opts.SecretProvider),
156158
supportsProtocolSecrets: opts.SupportsProtocolSecrets,
159+
connectTimeout: opts.GrpcConnectTimeout,
157160
api: apiInfo{
158161
conn: opts.Conn,
159162
},
@@ -352,15 +355,28 @@ func (h *Handler) loop(ctx context.Context) error {
352355
}
353356
}
354357

358+
// Create a timeout context for RegisterProbe if configured.
359+
// This ensures we don't wait indefinitely if the server is unreachable.
360+
registerCtx := ctx
361+
var cancel context.CancelFunc
362+
363+
if h.connectTimeout > 0 {
364+
registerCtx, cancel = context.WithTimeout(ctx, h.connectTimeout)
365+
defer cancel()
366+
h.logger.Info().
367+
Dur("timeout", h.connectTimeout).
368+
Msg("using explicit connection timeout for RegisterProbe")
369+
}
370+
355371
result, err := client.RegisterProbe(
356-
ctx,
372+
registerCtx,
357373
&sm.ProbeInfo{
358374
Version: version.Short(),
359375
Commit: version.Commit(),
360376
Buildstamp: version.Buildstamp(),
361377
SupportsProtocolSecrets: h.supportsProtocolSecrets,
362378
},
363-
grpc.WaitForReady(true), // Wait for connection on critical startup RPC
379+
grpc.WaitForReady(true), // Wait for connection on critical startup RPC (respects context timeout)
364380
)
365381
if err != nil {
366382
return grpcErrorHandler(

internal/checks/checks.go

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ type Updater struct {
9090
usageReporter usage.Reporter
9191
tenantCals *cals.CostAttributionLabels
9292
supportsProtocolSecrets bool
93+
connectTimeout time.Duration
9394
}
9495

9596
type apiInfo struct {
@@ -128,6 +129,7 @@ type UpdaterOptions struct {
128129
UsageReporter usage.Reporter
129130
CostAttributionLabels *cals.CostAttributionLabels
130131
SupportsProtocolSecrets bool
132+
GrpcConnectTimeout time.Duration
131133
}
132134

133135
func NewUpdater(opts UpdaterOptions) (*Updater, error) {
@@ -252,6 +254,7 @@ func NewUpdater(opts UpdaterOptions) (*Updater, error) {
252254
tenantSecrets: opts.SecretProvider,
253255
telemeter: opts.Telemeter,
254256
supportsProtocolSecrets: opts.SupportsProtocolSecrets,
257+
connectTimeout: opts.GrpcConnectTimeout,
255258
metrics: metrics{
256259
changeErrorsCounter: changeErrorsCounter,
257260
changesCounter: changesCounter,
@@ -345,9 +348,11 @@ func handleError(ctx context.Context, logger zerolog.Logger, backoff Backoffer,
345348
return true, nil
346349

347350
case errors.As(err, &transientErr):
348-
logger.Warn().Err(err).Msg("transient error, trying to reconnect")
351+
dur := backoff.Duration()
349352

350-
if err := sleepCtx(ctx, backoff.Duration()); err != nil {
353+
logger.Warn().Err(err).Dur("backoff", dur).Msg("transient error, trying to reconnect")
354+
355+
if err := sleepCtx(ctx, dur); err != nil {
351356
return true, err
352357
}
353358

@@ -408,7 +413,7 @@ func (c *Updater) loop(ctx context.Context) (bool, error) {
408413
// Context was cancelled
409414
return context.Canceled
410415

411-
case codes.Unavailable:
416+
case codes.Unavailable, codes.DeadlineExceeded:
412417
// Network errors, connection resets, transport closing, etc.
413418
// All these are transient and should trigger retry logic
414419
return TransientError(fmt.Sprintf("%s: %s", action, st.Message()))
@@ -427,12 +432,25 @@ func (c *Updater) loop(ctx context.Context) (bool, error) {
427432
}
428433
}
429434

430-
result, err := client.RegisterProbe(ctx, &sm.ProbeInfo{
435+
// Create a timeout context for RegisterProbe if configured.
436+
// This ensures we don't wait indefinitely if the server is unreachable.
437+
registerCtx := ctx
438+
var cancel context.CancelFunc
439+
440+
if c.connectTimeout > 0 {
441+
registerCtx, cancel = context.WithTimeout(ctx, c.connectTimeout)
442+
defer cancel()
443+
c.logger.Info().
444+
Dur("timeout", c.connectTimeout).
445+
Msg("using explicit connection timeout for RegisterProbe")
446+
}
447+
448+
result, err := client.RegisterProbe(registerCtx, &sm.ProbeInfo{
431449
Version: version.Short(),
432450
Commit: version.Commit(),
433451
Buildstamp: version.Buildstamp(),
434452
SupportsProtocolSecrets: c.supportsProtocolSecrets,
435-
}, grpc.WaitForReady(true)) // Wait for connection on critical startup RPC
453+
}, grpc.WaitForReady(true)) // Wait for connection on critical startup RPC (respects context timeout)
436454
if err != nil {
437455
return connected, grpcErrorHandler("registering probe with synthetic-monitoring-api", err)
438456
}

0 commit comments

Comments
 (0)