Skip to content

Commit 5a4e4ef

Browse files
committed
feat: make retrymq monitor loop timeout configurable
1 parent ea98674 commit 5a4e4ef

File tree

8 files changed

+25
-7
lines changed

8 files changed

+25
-7
lines changed

docs/pages/references/configuration.mdx

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ Global configurations are provided through env variables or a YAML file. ConfigM
124124
| `REDIS_PORT` | Port number for the Redis server. | `6379` | Yes |
125125
| `REDIS_TLS_ENABLED` | Enable TLS encryption for Redis connection. | `false` | No |
126126
| `RETRY_INTERVAL_SECONDS` | Interval in seconds for exponential backoff retry strategy (base 2). Ignored if retry_schedule is provided. | `30` | No |
127+
| `RETRY_POLL_BACKOFF_MS` | Backoff time in milliseconds when the retry monitor finds no messages to process. When a retry message is found, the monitor immediately polls for the next message without delay. Lower values provide faster retry processing but increase Redis load. For serverless Redis providers (Upstash, ElastiCache Serverless), consider increasing to 5000-10000ms to reduce costs. Default: 100 | `100` | No |
127128
| `RETRY_SCHEDULE` | Comma-separated list of retry delays in seconds. If provided, overrides retry_interval_seconds and retry_max_limit. Schedule length defines the max number of retries. Example: '5,60,600,3600,7200' for 5 retries at 5s, 1m, 10m, 1h, 2h. | `[]` | No |
128129
| `SERVICE` | Specifies the service type to run. Valid values: 'api', 'log', 'delivery', or empty/all for singular mode (runs all services). | `nil` | No |
129130
| `TELEMETRY_BATCH_INTERVAL` | Maximum time in seconds to wait before sending a batch of telemetry events if batch size is not reached. | `5` | No |
@@ -571,6 +572,9 @@ retry_interval_seconds: 30
571572
# Maximum number of retry attempts for a single event delivery before giving up. Ignored if retry_schedule is provided.
572573
retry_max_limit: 10
573574

575+
# Backoff time in milliseconds when the retry monitor finds no messages to process. When a retry message is found, the monitor immediately polls for the next message without delay. Lower values provide faster retry processing but increase Redis load. For serverless Redis providers (Upstash, ElastiCache Serverless), consider increasing to 5000-10000ms to reduce costs. Default: 100
576+
retry_poll_backoff_ms: 100
577+
574578
# Comma-separated list of retry delays in seconds. If provided, overrides retry_interval_seconds and retry_max_limit. Schedule length defines the max number of retries. Example: '5,60,600,3600,7200' for 5 retries at 5s, 1m, 10m, 1h, 2h.
575579
retry_schedule: []
576580

internal/config/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ type Config struct {
7878
RetrySchedule []int `yaml:"retry_schedule" env:"RETRY_SCHEDULE" envSeparator:"," desc:"Comma-separated list of retry delays in seconds. If provided, overrides retry_interval_seconds and retry_max_limit. Schedule length defines the max number of retries. Example: '5,60,600,3600,7200' for 5 retries at 5s, 1m, 10m, 1h, 2h." required:"N"`
7979
RetryIntervalSeconds int `yaml:"retry_interval_seconds" env:"RETRY_INTERVAL_SECONDS" desc:"Interval in seconds for exponential backoff retry strategy (base 2). Ignored if retry_schedule is provided." required:"N"`
8080
RetryMaxLimit int `yaml:"retry_max_limit" env:"MAX_RETRY_LIMIT" desc:"Maximum number of retry attempts for a single event delivery before giving up. Ignored if retry_schedule is provided." required:"N"`
81+
RetryPollBackoffMs int `yaml:"retry_poll_backoff_ms" env:"RETRY_POLL_BACKOFF_MS" desc:"Backoff time in milliseconds when the retry monitor finds no messages to process. When a retry message is found, the monitor immediately polls for the next message without delay. Lower values provide faster retry processing but increase Redis load. For serverless Redis providers (Upstash, ElastiCache Serverless), consider increasing to 5000-10000ms to reduce costs. Default: 100" required:"N"`
8182

8283
// Event Delivery
8384
MaxDestinationsPerTenant int `yaml:"max_destinations_per_tenant" env:"MAX_DESTINATIONS_PER_TENANT" desc:"Maximum number of destinations allowed per tenant/organization." required:"N"`
@@ -162,6 +163,7 @@ func (c *Config) InitDefaults() {
162163
c.RetrySchedule = []int{} // Empty by default, falls back to exponential backoff
163164
c.RetryIntervalSeconds = 30
164165
c.RetryMaxLimit = 10
166+
c.RetryPollBackoffMs = 100
165167
c.MaxDestinationsPerTenant = 20
166168
c.DeliveryTimeoutSeconds = 5
167169
c.PublishIdempotencyKeyTTL = 3600 // 1 hour

internal/deliverymq/retry.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"time"
78

89
"github.com/hookdeck/outpost/internal/logging"
910
"github.com/hookdeck/outpost/internal/models"
@@ -12,7 +13,7 @@ import (
1213
"github.com/hookdeck/outpost/internal/scheduler"
1314
)
1415

15-
func NewRetryScheduler(deliverymq *DeliveryMQ, redisConfig *redis.RedisConfig, deploymentID string, logger *logging.Logger) (scheduler.Scheduler, error) {
16+
func NewRetryScheduler(deliverymq *DeliveryMQ, redisConfig *redis.RedisConfig, deploymentID string, pollBackoff time.Duration, logger *logging.Logger) (scheduler.Scheduler, error) {
1617
// Create Redis client for RSMQ
1718
ctx := context.Background()
1819
redisClient, err := redis.New(ctx, redisConfig)
@@ -52,7 +53,7 @@ func NewRetryScheduler(deliverymq *DeliveryMQ, redisConfig *redis.RedisConfig, d
5253
return nil
5354
}
5455

55-
return scheduler.New("deliverymq-retry", rsmqClient, exec), nil
56+
return scheduler.New("deliverymq-retry", rsmqClient, exec, scheduler.WithPollBackoff(pollBackoff)), nil
5657
}
5758

5859
type RetryMessage struct {

internal/deliverymq/retry_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func (s *RetryDeliveryMQSuite) SetupTest(t *testing.T) {
4646
require.NoError(t, err)
4747

4848
// Setup retry scheduler
49-
retryScheduler, err := deliverymq.NewRetryScheduler(s.deliveryMQ, testutil.CreateTestRedisConfig(t), "", testutil.CreateTestLogger(t))
49+
retryScheduler, err := deliverymq.NewRetryScheduler(s.deliveryMQ, testutil.CreateTestRedisConfig(t), "", 100*time.Millisecond, testutil.CreateTestLogger(t))
5050
require.NoError(t, err)
5151
require.NoError(t, retryScheduler.Init(s.ctx))
5252
go retryScheduler.Monitor(s.ctx)

internal/scheduler/scheduler.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ type schedulerImpl struct {
3636

3737
type config struct {
3838
visibilityTimeout uint
39+
pollBackoff time.Duration
3940
}
4041

4142
func WithVisibilityTimeout(vt uint) func(*config) {
@@ -44,9 +45,16 @@ func WithVisibilityTimeout(vt uint) func(*config) {
4445
}
4546
}
4647

48+
func WithPollBackoff(backoff time.Duration) func(*config) {
49+
return func(c *config) {
50+
c.pollBackoff = backoff
51+
}
52+
}
53+
4754
func New(name string, rsmqClient *rsmq.RedisSMQ, exec func(context.Context, string) error, opts ...func(*config)) Scheduler {
4855
config := &config{
4956
visibilityTimeout: rsmq.UnsetVt,
57+
pollBackoff: 100 * time.Millisecond,
5058
}
5159
for _, opt := range opts {
5260
opt(config)
@@ -100,7 +108,7 @@ func (s *schedulerImpl) Monitor(ctx context.Context) error {
100108
return err
101109
}
102110
if msg == nil {
103-
time.Sleep(time.Second / 10)
111+
time.Sleep(s.config.pollBackoff)
104112
continue
105113
}
106114
// TODO: consider using a worker pool to limit the number of concurrent executions

internal/services/api/api.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,8 @@ func NewService(ctx context.Context, wg *sync.WaitGroup, cfg *config.Config, log
150150

151151
// deliverymqRetryScheduler
152152
logger.Debug("creating delivery MQ retry scheduler")
153-
deliverymqRetryScheduler, err := deliverymq.NewRetryScheduler(deliveryMQ, cfg.Redis.ToConfig(), cfg.DeploymentID, logger)
153+
pollBackoff := time.Duration(cfg.RetryPollBackoffMs) * time.Millisecond
154+
deliverymqRetryScheduler, err := deliverymq.NewRetryScheduler(deliveryMQ, cfg.Redis.ToConfig(), cfg.DeploymentID, pollBackoff, logger)
154155
if err != nil {
155156
logger.Error("failed to create delivery MQ retry scheduler", zap.Error(err))
156157
return nil, err

internal/services/builder.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -605,7 +605,8 @@ func (s *serviceInstance) initRetryScheduler(ctx context.Context, cfg *config.Co
605605
return fmt.Errorf("delivery MQ must be initialized before retry scheduler")
606606
}
607607
logger.Debug("creating delivery MQ retry scheduler", zap.String("service", s.name))
608-
retryScheduler, err := deliverymq.NewRetryScheduler(s.deliveryMQ, cfg.Redis.ToConfig(), cfg.DeploymentID, logger)
608+
pollBackoff := time.Duration(cfg.RetryPollBackoffMs) * time.Millisecond
609+
retryScheduler, err := deliverymq.NewRetryScheduler(s.deliveryMQ, cfg.Redis.ToConfig(), cfg.DeploymentID, pollBackoff, logger)
609610
if err != nil {
610611
logger.Error("failed to create delivery MQ retry scheduler", zap.String("service", s.name), zap.Error(err))
611612
return err

internal/services/delivery/delivery.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,8 @@ func NewService(ctx context.Context,
112112
return nil, err
113113
}
114114

115-
retryScheduler, err := deliverymq.NewRetryScheduler(deliveryMQ, cfg.Redis.ToConfig(), cfg.DeploymentID, logger)
115+
pollBackoff := time.Duration(cfg.RetryPollBackoffMs) * time.Millisecond
116+
retryScheduler, err := deliverymq.NewRetryScheduler(deliveryMQ, cfg.Redis.ToConfig(), cfg.DeploymentID, pollBackoff, logger)
116117
if err != nil {
117118
return nil, err
118119
}

0 commit comments

Comments
 (0)