From 1cefe7868e95134ba67afadaeae0c1c834461da4 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Wed, 12 Nov 2025 23:26:45 +0700 Subject: [PATCH] feat: make retrymq monitor loop timeout configurable --- docs/pages/references/configuration.mdx | 4 ++++ internal/config/config.go | 2 ++ internal/deliverymq/retry.go | 5 +++-- internal/deliverymq/retry_test.go | 2 +- internal/scheduler/scheduler.go | 10 +++++++++- internal/services/api/api.go | 3 ++- internal/services/builder.go | 3 ++- internal/services/delivery/delivery.go | 3 ++- 8 files changed, 25 insertions(+), 7 deletions(-) diff --git a/docs/pages/references/configuration.mdx b/docs/pages/references/configuration.mdx index d3694808..3e1796cf 100644 --- a/docs/pages/references/configuration.mdx +++ b/docs/pages/references/configuration.mdx @@ -124,6 +124,7 @@ Global configurations are provided through env variables or a YAML file. ConfigM | `REDIS_PORT` | Port number for the Redis server. | `6379` | Yes | | `REDIS_TLS_ENABLED` | Enable TLS encryption for Redis connection. | `false` | No | | `RETRY_INTERVAL_SECONDS` | Interval in seconds for exponential backoff retry strategy (base 2). Ignored if retry_schedule is provided. | `30` | No | +| `RETRY_POLL_BACKOFF_MS` | Backoff time in milliseconds when the retry monitor finds no messages to process. When a retry message is found, the monitor immediately polls for the next message without delay. Lower values provide faster retry processing but increase Redis load. For serverless Redis providers (Upstash, ElastiCache Serverless), consider increasing to 5000-10000ms to reduce costs. Default: 100 | `100` | No | | `RETRY_SCHEDULE` | Comma-separated list of retry delays in seconds. If provided, overrides retry_interval_seconds and retry_max_limit. Schedule length defines the max number of retries. Example: '5,60,600,3600,7200' for 5 retries at 5s, 1m, 10m, 1h, 2h. | `[]` | No | | `SERVICE` | Specifies the service type to run. Valid values: 'api', 'log', 'delivery', or empty/all for singular mode (runs all services). | `nil` | No | | `TELEMETRY_BATCH_INTERVAL` | Maximum time in seconds to wait before sending a batch of telemetry events if batch size is not reached. | `5` | No | @@ -571,6 +572,9 @@ retry_interval_seconds: 30 # Maximum number of retry attempts for a single event delivery before giving up. Ignored if retry_schedule is provided. retry_max_limit: 10 +# Backoff time in milliseconds when the retry monitor finds no messages to process. When a retry message is found, the monitor immediately polls for the next message without delay. Lower values provide faster retry processing but increase Redis load. For serverless Redis providers (Upstash, ElastiCache Serverless), consider increasing to 5000-10000ms to reduce costs. Default: 100 +retry_poll_backoff_ms: 100 + # Comma-separated list of retry delays in seconds. If provided, overrides retry_interval_seconds and retry_max_limit. Schedule length defines the max number of retries. Example: '5,60,600,3600,7200' for 5 retries at 5s, 1m, 10m, 1h, 2h. retry_schedule: [] diff --git a/internal/config/config.go b/internal/config/config.go index 9c6b735b..b31aec01 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -78,6 +78,7 @@ type Config struct { RetrySchedule []int `yaml:"retry_schedule" env:"RETRY_SCHEDULE" envSeparator:"," desc:"Comma-separated list of retry delays in seconds. If provided, overrides retry_interval_seconds and retry_max_limit. Schedule length defines the max number of retries. Example: '5,60,600,3600,7200' for 5 retries at 5s, 1m, 10m, 1h, 2h." required:"N"` RetryIntervalSeconds int `yaml:"retry_interval_seconds" env:"RETRY_INTERVAL_SECONDS" desc:"Interval in seconds for exponential backoff retry strategy (base 2). Ignored if retry_schedule is provided." required:"N"` RetryMaxLimit int `yaml:"retry_max_limit" env:"MAX_RETRY_LIMIT" desc:"Maximum number of retry attempts for a single event delivery before giving up. Ignored if retry_schedule is provided." required:"N"` + RetryPollBackoffMs int `yaml:"retry_poll_backoff_ms" env:"RETRY_POLL_BACKOFF_MS" desc:"Backoff time in milliseconds when the retry monitor finds no messages to process. When a retry message is found, the monitor immediately polls for the next message without delay. Lower values provide faster retry processing but increase Redis load. For serverless Redis providers (Upstash, ElastiCache Serverless), consider increasing to 5000-10000ms to reduce costs. Default: 100" required:"N"` // Event Delivery MaxDestinationsPerTenant int `yaml:"max_destinations_per_tenant" env:"MAX_DESTINATIONS_PER_TENANT" desc:"Maximum number of destinations allowed per tenant/organization." required:"N"` @@ -162,6 +163,7 @@ func (c *Config) InitDefaults() { c.RetrySchedule = []int{} // Empty by default, falls back to exponential backoff c.RetryIntervalSeconds = 30 c.RetryMaxLimit = 10 + c.RetryPollBackoffMs = 100 c.MaxDestinationsPerTenant = 20 c.DeliveryTimeoutSeconds = 5 c.PublishIdempotencyKeyTTL = 3600 // 1 hour diff --git a/internal/deliverymq/retry.go b/internal/deliverymq/retry.go index d99636a4..e4a5b086 100644 --- a/internal/deliverymq/retry.go +++ b/internal/deliverymq/retry.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "time" "github.com/hookdeck/outpost/internal/logging" "github.com/hookdeck/outpost/internal/models" @@ -12,7 +13,7 @@ import ( "github.com/hookdeck/outpost/internal/scheduler" ) -func NewRetryScheduler(deliverymq *DeliveryMQ, redisConfig *redis.RedisConfig, deploymentID string, logger *logging.Logger) (scheduler.Scheduler, error) { +func NewRetryScheduler(deliverymq *DeliveryMQ, redisConfig *redis.RedisConfig, deploymentID string, pollBackoff time.Duration, logger *logging.Logger) (scheduler.Scheduler, error) { // Create Redis client for RSMQ ctx := context.Background() redisClient, err := redis.New(ctx, redisConfig) @@ -52,7 +53,7 @@ func NewRetryScheduler(deliverymq *DeliveryMQ, redisConfig *redis.RedisConfig, d return nil } - return scheduler.New("deliverymq-retry", rsmqClient, exec), nil + return scheduler.New("deliverymq-retry", rsmqClient, exec, scheduler.WithPollBackoff(pollBackoff)), nil } type RetryMessage struct { diff --git a/internal/deliverymq/retry_test.go b/internal/deliverymq/retry_test.go index 49dfc32f..08eb7eb7 100644 --- a/internal/deliverymq/retry_test.go +++ b/internal/deliverymq/retry_test.go @@ -46,7 +46,7 @@ func (s *RetryDeliveryMQSuite) SetupTest(t *testing.T) { require.NoError(t, err) // Setup retry scheduler - retryScheduler, err := deliverymq.NewRetryScheduler(s.deliveryMQ, testutil.CreateTestRedisConfig(t), "", testutil.CreateTestLogger(t)) + retryScheduler, err := deliverymq.NewRetryScheduler(s.deliveryMQ, testutil.CreateTestRedisConfig(t), "", 100*time.Millisecond, testutil.CreateTestLogger(t)) require.NoError(t, err) require.NoError(t, retryScheduler.Init(s.ctx)) go retryScheduler.Monitor(s.ctx) diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 5d4a4ed7..2fb3e52f 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -36,6 +36,7 @@ type schedulerImpl struct { type config struct { visibilityTimeout uint + pollBackoff time.Duration } func WithVisibilityTimeout(vt uint) func(*config) { @@ -44,9 +45,16 @@ func WithVisibilityTimeout(vt uint) func(*config) { } } +func WithPollBackoff(backoff time.Duration) func(*config) { + return func(c *config) { + c.pollBackoff = backoff + } +} + func New(name string, rsmqClient *rsmq.RedisSMQ, exec func(context.Context, string) error, opts ...func(*config)) Scheduler { config := &config{ visibilityTimeout: rsmq.UnsetVt, + pollBackoff: 100 * time.Millisecond, } for _, opt := range opts { opt(config) @@ -100,7 +108,7 @@ func (s *schedulerImpl) Monitor(ctx context.Context) error { return err } if msg == nil { - time.Sleep(time.Second / 10) + time.Sleep(s.config.pollBackoff) continue } // TODO: consider using a worker pool to limit the number of concurrent executions diff --git a/internal/services/api/api.go b/internal/services/api/api.go index cae7847d..730663bf 100644 --- a/internal/services/api/api.go +++ b/internal/services/api/api.go @@ -150,7 +150,8 @@ func NewService(ctx context.Context, wg *sync.WaitGroup, cfg *config.Config, log // deliverymqRetryScheduler logger.Debug("creating delivery MQ retry scheduler") - deliverymqRetryScheduler, err := deliverymq.NewRetryScheduler(deliveryMQ, cfg.Redis.ToConfig(), cfg.DeploymentID, logger) + pollBackoff := time.Duration(cfg.RetryPollBackoffMs) * time.Millisecond + deliverymqRetryScheduler, err := deliverymq.NewRetryScheduler(deliveryMQ, cfg.Redis.ToConfig(), cfg.DeploymentID, pollBackoff, logger) if err != nil { logger.Error("failed to create delivery MQ retry scheduler", zap.Error(err)) return nil, err diff --git a/internal/services/builder.go b/internal/services/builder.go index e7dc3915..6bc2fd0b 100644 --- a/internal/services/builder.go +++ b/internal/services/builder.go @@ -620,7 +620,8 @@ func (s *serviceInstance) initRetryScheduler(ctx context.Context, cfg *config.Co return fmt.Errorf("delivery MQ must be initialized before retry scheduler") } logger.Debug("creating delivery MQ retry scheduler", zap.String("service", s.name)) - retryScheduler, err := deliverymq.NewRetryScheduler(s.deliveryMQ, cfg.Redis.ToConfig(), cfg.DeploymentID, logger) + pollBackoff := time.Duration(cfg.RetryPollBackoffMs) * time.Millisecond + retryScheduler, err := deliverymq.NewRetryScheduler(s.deliveryMQ, cfg.Redis.ToConfig(), cfg.DeploymentID, pollBackoff, logger) if err != nil { logger.Error("failed to create delivery MQ retry scheduler", zap.String("service", s.name), zap.Error(err)) return err diff --git a/internal/services/delivery/delivery.go b/internal/services/delivery/delivery.go index 6f8de68d..26f3eba6 100644 --- a/internal/services/delivery/delivery.go +++ b/internal/services/delivery/delivery.go @@ -112,7 +112,8 @@ func NewService(ctx context.Context, return nil, err } - retryScheduler, err := deliverymq.NewRetryScheduler(deliveryMQ, cfg.Redis.ToConfig(), cfg.DeploymentID, logger) + pollBackoff := time.Duration(cfg.RetryPollBackoffMs) * time.Millisecond + retryScheduler, err := deliverymq.NewRetryScheduler(deliveryMQ, cfg.Redis.ToConfig(), cfg.DeploymentID, pollBackoff, logger) if err != nil { return nil, err }