Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/pages/references/configuration.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ Global configurations are provided through env variables or a YAML file. ConfigM
| `REDIS_PORT` | Port number for the Redis server. | `6379` | Yes |
| `REDIS_TLS_ENABLED` | Enable TLS encryption for Redis connection. | `false` | No |
| `RETRY_INTERVAL_SECONDS` | Interval in seconds for exponential backoff retry strategy (base 2). Ignored if retry_schedule is provided. | `30` | No |
| `RETRY_POLL_BACKOFF_MS` | Backoff time in milliseconds when the retry monitor finds no messages to process. When a retry message is found, the monitor immediately polls for the next message without delay. Lower values provide faster retry processing but increase Redis load. For serverless Redis providers (Upstash, ElastiCache Serverless), consider increasing to 5000-10000ms to reduce costs. Default: 100 | `100` | No |
| `RETRY_SCHEDULE` | Comma-separated list of retry delays in seconds. If provided, overrides retry_interval_seconds and retry_max_limit. Schedule length defines the max number of retries. Example: '5,60,600,3600,7200' for 5 retries at 5s, 1m, 10m, 1h, 2h. | `[]` | No |
| `SERVICE` | Specifies the service type to run. Valid values: 'api', 'log', 'delivery', or empty/all for singular mode (runs all services). | `nil` | No |
| `TELEMETRY_BATCH_INTERVAL` | Maximum time in seconds to wait before sending a batch of telemetry events if batch size is not reached. | `5` | No |
Expand Down Expand Up @@ -571,6 +572,9 @@ retry_interval_seconds: 30
# Maximum number of retry attempts for a single event delivery before giving up. Ignored if retry_schedule is provided.
retry_max_limit: 10

# Backoff time in milliseconds when the retry monitor finds no messages to process. When a retry message is found, the monitor immediately polls for the next message without delay. Lower values provide faster retry processing but increase Redis load. For serverless Redis providers (Upstash, ElastiCache Serverless), consider increasing to 5000-10000ms to reduce costs. Default: 100
retry_poll_backoff_ms: 100

# Comma-separated list of retry delays in seconds. If provided, overrides retry_interval_seconds and retry_max_limit. Schedule length defines the max number of retries. Example: '5,60,600,3600,7200' for 5 retries at 5s, 1m, 10m, 1h, 2h.
retry_schedule: []

Expand Down
2 changes: 2 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ type Config struct {
RetrySchedule []int `yaml:"retry_schedule" env:"RETRY_SCHEDULE" envSeparator:"," desc:"Comma-separated list of retry delays in seconds. If provided, overrides retry_interval_seconds and retry_max_limit. Schedule length defines the max number of retries. Example: '5,60,600,3600,7200' for 5 retries at 5s, 1m, 10m, 1h, 2h." required:"N"`
RetryIntervalSeconds int `yaml:"retry_interval_seconds" env:"RETRY_INTERVAL_SECONDS" desc:"Interval in seconds for exponential backoff retry strategy (base 2). Ignored if retry_schedule is provided." required:"N"`
RetryMaxLimit int `yaml:"retry_max_limit" env:"MAX_RETRY_LIMIT" desc:"Maximum number of retry attempts for a single event delivery before giving up. Ignored if retry_schedule is provided." required:"N"`
RetryPollBackoffMs int `yaml:"retry_poll_backoff_ms" env:"RETRY_POLL_BACKOFF_MS" desc:"Backoff time in milliseconds when the retry monitor finds no messages to process. When a retry message is found, the monitor immediately polls for the next message without delay. Lower values provide faster retry processing but increase Redis load. For serverless Redis providers (Upstash, ElastiCache Serverless), consider increasing to 5000-10000ms to reduce costs. Default: 100" required:"N"`

// Event Delivery
MaxDestinationsPerTenant int `yaml:"max_destinations_per_tenant" env:"MAX_DESTINATIONS_PER_TENANT" desc:"Maximum number of destinations allowed per tenant/organization." required:"N"`
Expand Down Expand Up @@ -162,6 +163,7 @@ func (c *Config) InitDefaults() {
c.RetrySchedule = []int{} // Empty by default, falls back to exponential backoff
c.RetryIntervalSeconds = 30
c.RetryMaxLimit = 10
c.RetryPollBackoffMs = 100
c.MaxDestinationsPerTenant = 20
c.DeliveryTimeoutSeconds = 5
c.PublishIdempotencyKeyTTL = 3600 // 1 hour
Expand Down
5 changes: 3 additions & 2 deletions internal/deliverymq/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/hookdeck/outpost/internal/logging"
"github.com/hookdeck/outpost/internal/models"
Expand All @@ -12,7 +13,7 @@ import (
"github.com/hookdeck/outpost/internal/scheduler"
)

func NewRetryScheduler(deliverymq *DeliveryMQ, redisConfig *redis.RedisConfig, deploymentID string, logger *logging.Logger) (scheduler.Scheduler, error) {
func NewRetryScheduler(deliverymq *DeliveryMQ, redisConfig *redis.RedisConfig, deploymentID string, pollBackoff time.Duration, logger *logging.Logger) (scheduler.Scheduler, error) {
// Create Redis client for RSMQ
ctx := context.Background()
redisClient, err := redis.New(ctx, redisConfig)
Expand Down Expand Up @@ -52,7 +53,7 @@ func NewRetryScheduler(deliverymq *DeliveryMQ, redisConfig *redis.RedisConfig, d
return nil
}

return scheduler.New("deliverymq-retry", rsmqClient, exec), nil
return scheduler.New("deliverymq-retry", rsmqClient, exec, scheduler.WithPollBackoff(pollBackoff)), nil
}

type RetryMessage struct {
Expand Down
2 changes: 1 addition & 1 deletion internal/deliverymq/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (s *RetryDeliveryMQSuite) SetupTest(t *testing.T) {
require.NoError(t, err)

// Setup retry scheduler
retryScheduler, err := deliverymq.NewRetryScheduler(s.deliveryMQ, testutil.CreateTestRedisConfig(t), "", testutil.CreateTestLogger(t))
retryScheduler, err := deliverymq.NewRetryScheduler(s.deliveryMQ, testutil.CreateTestRedisConfig(t), "", 100*time.Millisecond, testutil.CreateTestLogger(t))
require.NoError(t, err)
require.NoError(t, retryScheduler.Init(s.ctx))
go retryScheduler.Monitor(s.ctx)
Expand Down
10 changes: 9 additions & 1 deletion internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type schedulerImpl struct {

type config struct {
visibilityTimeout uint
pollBackoff time.Duration
}

func WithVisibilityTimeout(vt uint) func(*config) {
Expand All @@ -44,9 +45,16 @@ func WithVisibilityTimeout(vt uint) func(*config) {
}
}

func WithPollBackoff(backoff time.Duration) func(*config) {
return func(c *config) {
c.pollBackoff = backoff
}
}

func New(name string, rsmqClient *rsmq.RedisSMQ, exec func(context.Context, string) error, opts ...func(*config)) Scheduler {
config := &config{
visibilityTimeout: rsmq.UnsetVt,
pollBackoff: 100 * time.Millisecond,
}
for _, opt := range opts {
opt(config)
Expand Down Expand Up @@ -100,7 +108,7 @@ func (s *schedulerImpl) Monitor(ctx context.Context) error {
return err
}
if msg == nil {
time.Sleep(time.Second / 10)
time.Sleep(s.config.pollBackoff)
continue
}
// TODO: consider using a worker pool to limit the number of concurrent executions
Expand Down
3 changes: 2 additions & 1 deletion internal/services/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ func NewService(ctx context.Context, wg *sync.WaitGroup, cfg *config.Config, log

// deliverymqRetryScheduler
logger.Debug("creating delivery MQ retry scheduler")
deliverymqRetryScheduler, err := deliverymq.NewRetryScheduler(deliveryMQ, cfg.Redis.ToConfig(), cfg.DeploymentID, logger)
pollBackoff := time.Duration(cfg.RetryPollBackoffMs) * time.Millisecond
deliverymqRetryScheduler, err := deliverymq.NewRetryScheduler(deliveryMQ, cfg.Redis.ToConfig(), cfg.DeploymentID, pollBackoff, logger)
if err != nil {
logger.Error("failed to create delivery MQ retry scheduler", zap.Error(err))
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion internal/services/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,8 @@ func (s *serviceInstance) initRetryScheduler(ctx context.Context, cfg *config.Co
return fmt.Errorf("delivery MQ must be initialized before retry scheduler")
}
logger.Debug("creating delivery MQ retry scheduler", zap.String("service", s.name))
retryScheduler, err := deliverymq.NewRetryScheduler(s.deliveryMQ, cfg.Redis.ToConfig(), cfg.DeploymentID, logger)
pollBackoff := time.Duration(cfg.RetryPollBackoffMs) * time.Millisecond
retryScheduler, err := deliverymq.NewRetryScheduler(s.deliveryMQ, cfg.Redis.ToConfig(), cfg.DeploymentID, pollBackoff, logger)
if err != nil {
logger.Error("failed to create delivery MQ retry scheduler", zap.String("service", s.name), zap.Error(err))
return err
Expand Down
3 changes: 2 additions & 1 deletion internal/services/delivery/delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ func NewService(ctx context.Context,
return nil, err
}

retryScheduler, err := deliverymq.NewRetryScheduler(deliveryMQ, cfg.Redis.ToConfig(), cfg.DeploymentID, logger)
pollBackoff := time.Duration(cfg.RetryPollBackoffMs) * time.Millisecond
retryScheduler, err := deliverymq.NewRetryScheduler(deliveryMQ, cfg.Redis.ToConfig(), cfg.DeploymentID, pollBackoff, logger)
if err != nil {
return nil, err
}
Expand Down