From e45083fdc8b86558615ab225e4f20132a771eb4d Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Wed, 12 Nov 2025 21:58:58 +0700 Subject: [PATCH 1/5] feat: mqs auto provision configuration --- internal/app/app.go | 19 ++++++++++-- internal/config/mq.go | 1 + internal/infra/infra.go | 19 ++++++++++++ internal/infra/infra_test.go | 59 ++++++++++++++++++++++++++++++++++++ 4 files changed, 95 insertions(+), 3 deletions(-) diff --git a/internal/app/app.go b/internal/app/app.go index 9e29ec08..59ea7512 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -89,9 +89,22 @@ func run(mainContext context.Context, cfg *config.Config) error { DeliveryMQ: cfg.MQs.ToInfraConfig("deliverymq"), LogMQ: cfg.MQs.ToInfraConfig("logmq"), }, redisClient) - if err := outpostInfra.Declare(mainContext); err != nil { - logger.Error("infrastructure declaration failed", zap.Error(err)) - return err + + // Only declare infrastructure if should_manage is true (default) + shouldManage := cfg.MQs.ShouldManage == nil || *cfg.MQs.ShouldManage + if shouldManage { + logger.Debug("infrastructure management enabled, declaring infrastructure") + if err := outpostInfra.Declare(mainContext); err != nil { + logger.Error("infrastructure declaration failed", zap.Error(err)) + return err + } + } else { + logger.Info("infrastructure management disabled, verifying infrastructure exists") + if err := outpostInfra.Verify(mainContext); err != nil { + logger.Error("infrastructure verification failed", zap.Error(err)) + return err + } + logger.Info("infrastructure verification successful") } installationID, err := getInstallation(mainContext, redisClient, cfg.Telemetry.ToTelemetryConfig()) diff --git a/internal/config/mq.go b/internal/config/mq.go index a2312a99..093b5c2a 100644 --- a/internal/config/mq.go +++ b/internal/config/mq.go @@ -19,6 +19,7 @@ type MQsConfig struct { AzureServiceBus AzureServiceBusConfig `yaml:"azure_servicebus" desc:"Configuration for using Azure Service Bus as the message queue. Only one MQ provider should be configured." required:"N"` GCPPubSub GCPPubSubConfig `yaml:"gcp_pubsub" desc:"Configuration for using GCP Pub/Sub as the message queue. Only one MQ provider should be configured." required:"N"` RabbitMQ RabbitMQConfig `yaml:"rabbitmq" desc:"Configuration for using RabbitMQ as the message queue. Only one MQ provider should be configured." required:"N"` + ShouldManage *bool `yaml:"should_manage" env:"MQS_SHOULD_MANAGE" desc:"Whether Outpost should create and manage message queue infrastructure. Set to false if you manage infrastructure externally (e.g., via Terraform). Defaults to true for backward compatibility." required:"N" default:"true"` adapter MQConfigAdapter } diff --git a/internal/infra/infra.go b/internal/infra/infra.go index cc57994b..86c02fa6 100644 --- a/internal/infra/infra.go +++ b/internal/infra/infra.go @@ -2,6 +2,7 @@ package infra import ( "context" + "errors" "fmt" "time" @@ -16,6 +17,11 @@ const ( lockTTL = 10 * time.Second ) +var ( + // ErrInfraNotFound is returned when infrastructure does not exist and auto provisioning is disabled + ErrInfraNotFound = errors.New("infrastructure does not exist and auto provisioning is disabled (MQS_SHOULD_MANAGE=false). Please create the required message queues manually or set MQS_SHOULD_MANAGE=true to enable auto provisioning") +) + type Infra struct { lock Lock provider InfraProvider @@ -150,6 +156,19 @@ func (infra *Infra) Declare(ctx context.Context) error { return fmt.Errorf("failed to acquire lock after %d attempts", lockAttempts) } +// Verify checks if the infrastructure exists and returns an error if it doesn't. +// This is useful when infrastructure management is disabled to fail fast with a clear error. +func (infra *Infra) Verify(ctx context.Context) error { + exists, err := infra.provider.Exist(ctx) + if err != nil { + return fmt.Errorf("failed to verify infrastructure exists: %w", err) + } + if !exists { + return ErrInfraNotFound + } + return nil +} + func (infra *Infra) Teardown(ctx context.Context) error { return infra.provider.Teardown(ctx) } diff --git a/internal/infra/infra_test.go b/internal/infra/infra_test.go index d6aa66da..a7ef7071 100644 --- a/internal/infra/infra_test.go +++ b/internal/infra/infra_test.go @@ -203,3 +203,62 @@ func TestInfra_LockExpiry(t *testing.T) { // Declaration should have succeeded assert.Equal(t, int32(1), mockProvider.declareCount.Load()) } + +func TestInfra_Verify_InfrastructureExists(t *testing.T) { + t.Parallel() + + ctx := context.Background() + mockProvider := &mockInfraProvider{} + mockProvider.exists.Store(true) // Infrastructure exists + lockKey := "test:lock:" + idgen.String() + + infra := newTestInfra(t, mockProvider, lockKey) + + // Verify should succeed when infrastructure exists + err := infra.Verify(ctx) + require.NoError(t, err) + + // Verify no declaration happened + assert.Equal(t, int32(0), mockProvider.declareCount.Load()) + assert.Equal(t, int32(1), mockProvider.existCallCount.Load()) +} + +func TestInfra_Verify_InfrastructureDoesNotExist(t *testing.T) { + t.Parallel() + + ctx := context.Background() + mockProvider := &mockInfraProvider{} + mockProvider.exists.Store(false) // Infrastructure does not exist + lockKey := "test:lock:" + idgen.String() + + infraInstance := newTestInfra(t, mockProvider, lockKey) + + // Verify should fail with ErrInfraNotFound + err := infraInstance.Verify(ctx) + require.Error(t, err) + assert.ErrorIs(t, err, infra.ErrInfraNotFound) + + // Verify no declaration happened + assert.Equal(t, int32(0), mockProvider.declareCount.Load()) + assert.Equal(t, int32(1), mockProvider.existCallCount.Load()) +} + +func TestInfra_Verify_ExistCheckError(t *testing.T) { + t.Parallel() + + ctx := context.Background() + mockProvider := &mockInfraProvider{ + existError: assert.AnError, + } + lockKey := "test:lock:" + idgen.String() + + infra := newTestInfra(t, mockProvider, lockKey) + + // Verify should fail with wrapped error + err := infra.Verify(ctx) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to verify infrastructure exists") + + // Verify no declaration happened + assert.Equal(t, int32(0), mockProvider.declareCount.Load()) +} From 4532a3225a71251ccc6a879c82d7e151034c380d Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Wed, 12 Nov 2025 22:28:06 +0700 Subject: [PATCH 2/5] refactor: infra.Init --- internal/app/app.go | 29 ++++++-------------- internal/infra/infra.go | 52 ++++++++++++++++++++++++++++-------- internal/infra/infra_test.go | 29 ++++++++++---------- 3 files changed, 64 insertions(+), 46 deletions(-) diff --git a/internal/app/app.go b/internal/app/app.go index 59ea7512..fc875ad8 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -84,27 +84,14 @@ func run(mainContext context.Context, cfg *config.Config) error { return err } - logger.Debug("creating Outpost infrastructure") - outpostInfra := infra.NewInfra(infra.Config{ - DeliveryMQ: cfg.MQs.ToInfraConfig("deliverymq"), - LogMQ: cfg.MQs.ToInfraConfig("logmq"), - }, redisClient) - - // Only declare infrastructure if should_manage is true (default) - shouldManage := cfg.MQs.ShouldManage == nil || *cfg.MQs.ShouldManage - if shouldManage { - logger.Debug("infrastructure management enabled, declaring infrastructure") - if err := outpostInfra.Declare(mainContext); err != nil { - logger.Error("infrastructure declaration failed", zap.Error(err)) - return err - } - } else { - logger.Info("infrastructure management disabled, verifying infrastructure exists") - if err := outpostInfra.Verify(mainContext); err != nil { - logger.Error("infrastructure verification failed", zap.Error(err)) - return err - } - logger.Info("infrastructure verification successful") + logger.Debug("initializing infrastructure") + if err := infra.Init(mainContext, infra.Config{ + DeliveryMQ: cfg.MQs.ToInfraConfig("deliverymq"), + LogMQ: cfg.MQs.ToInfraConfig("logmq"), + ShouldManage: cfg.MQs.ShouldManage, + }, redisClient); err != nil { + logger.Error("infrastructure initialization failed", zap.Error(err)) + return err } installationID, err := getInstallation(mainContext, redisClient, cfg.Telemetry.ToTelemetryConfig()) diff --git a/internal/infra/infra.go b/internal/infra/infra.go index 86c02fa6..bbb12aab 100644 --- a/internal/infra/infra.go +++ b/internal/infra/infra.go @@ -23,8 +23,9 @@ var ( ) type Infra struct { - lock Lock - provider InfraProvider + lock Lock + provider InfraProvider + shouldManage bool } // InfraProvider handles the actual infrastructure operations @@ -35,8 +36,9 @@ type InfraProvider interface { } type Config struct { - DeliveryMQ *mqinfra.MQInfraConfig - LogMQ *mqinfra.MQInfraConfig + DeliveryMQ *mqinfra.MQInfraConfig + LogMQ *mqinfra.MQInfraConfig + ShouldManage *bool } func (cfg *Config) SetSensiblePolicyDefaults() { @@ -103,17 +105,46 @@ func NewInfra(cfg Config, redisClient redis.Cmdable) Infra { logMQ: mqinfra.New(cfg.LogMQ), } + // Default shouldManage to true if not set (backward compatible) + shouldManage := true + if cfg.ShouldManage != nil { + shouldManage = *cfg.ShouldManage + } + return Infra{ - lock: NewRedisLock(redisClient), - provider: provider, + lock: NewRedisLock(redisClient), + provider: provider, + shouldManage: shouldManage, + } +} + +// Init initializes and verifies infrastructure based on configuration. +// If ShouldManage is true (default), it will create infrastructure if needed. +// If ShouldManage is false, it will only verify infrastructure exists. +func Init(ctx context.Context, cfg Config, redisClient redis.Cmdable) error { + infra := NewInfra(cfg, redisClient) + + if infra.shouldManage { + return infra.Declare(ctx) } + + // shouldManage is false, only verify existence + exists, err := infra.provider.Exist(ctx) + if err != nil { + return fmt.Errorf("failed to verify infrastructure exists: %w", err) + } + if !exists { + return ErrInfraNotFound + } + return nil } // NewInfraWithProvider creates an Infra instance with custom lock and provider (for testing) -func NewInfraWithProvider(lock Lock, provider InfraProvider) *Infra { +func NewInfraWithProvider(lock Lock, provider InfraProvider, shouldManage bool) *Infra { return &Infra{ - lock: lock, - provider: provider, + lock: lock, + provider: provider, + shouldManage: shouldManage, } } @@ -156,8 +187,7 @@ func (infra *Infra) Declare(ctx context.Context) error { return fmt.Errorf("failed to acquire lock after %d attempts", lockAttempts) } -// Verify checks if the infrastructure exists and returns an error if it doesn't. -// This is useful when infrastructure management is disabled to fail fast with a clear error. +// Verify checks if infrastructure exists and returns an error if it doesn't. func (infra *Infra) Verify(ctx context.Context) error { exists, err := infra.provider.Exist(ctx) if err != nil { diff --git a/internal/infra/infra_test.go b/internal/infra/infra_test.go index a7ef7071..765d9f3a 100644 --- a/internal/infra/infra_test.go +++ b/internal/infra/infra_test.go @@ -60,20 +60,20 @@ func (m *mockInfraProvider) Teardown(ctx context.Context) error { } // Helper to create test infra with custom provider -func newTestInfra(t *testing.T, provider infra.InfraProvider, lockKey string) *infra.Infra { +func newTestInfra(t *testing.T, provider infra.InfraProvider, lockKey string, shouldManage bool) *infra.Infra { redisConfig := testutil.CreateTestRedisConfig(t) ctx := context.Background() client, err := redis.New(ctx, redisConfig) require.NoError(t, err) - return newTestInfraWithRedis(t, provider, lockKey, client) + return newTestInfraWithRedis(t, provider, lockKey, client, shouldManage) } // Helper to create test infra with specific Redis client -func newTestInfraWithRedis(t *testing.T, provider infra.InfraProvider, lockKey string, client redis.Cmdable) *infra.Infra { +func newTestInfraWithRedis(t *testing.T, provider infra.InfraProvider, lockKey string, client redis.Cmdable, shouldManage bool) *infra.Infra { lock := infra.NewRedisLock(client, infra.LockWithKey(lockKey)) - return infra.NewInfraWithProvider(lock, provider) + return infra.NewInfraWithProvider(lock, provider, shouldManage) } func TestInfra_SingleNode(t *testing.T) { @@ -83,7 +83,7 @@ func TestInfra_SingleNode(t *testing.T) { mockProvider := &mockInfraProvider{} lockKey := "test:lock:" + idgen.String() - infra := newTestInfra(t, mockProvider, lockKey) + infra := newTestInfra(t, mockProvider, lockKey, true) // Infrastructure doesn't exist initially assert.False(t, mockProvider.exists.Load()) @@ -105,7 +105,7 @@ func TestInfra_InfrastructureAlreadyExists(t *testing.T) { mockProvider.exists.Store(true) // Infrastructure already exists lockKey := "test:lock:" + idgen.String() - infra := newTestInfra(t, mockProvider, lockKey) + infra := newTestInfra(t, mockProvider, lockKey, true) // Declare should succeed without acquiring lock err := infra.Declare(ctx) @@ -140,7 +140,7 @@ func TestInfra_ConcurrentNodes(t *testing.T) { defer wg.Done() // Each node gets its own Infra instance but shares the provider and Redis client - nodeInfra := newTestInfraWithRedis(t, mockProvider, lockKey, client) + nodeInfra := newTestInfraWithRedis(t, mockProvider, lockKey, client, true) errors[idx] = nodeInfra.Declare(ctx) }(i) } @@ -196,7 +196,7 @@ func TestInfra_LockExpiry(t *testing.T) { // Now another node should be able to acquire and declare // Use the same Redis client - nodeInfra := newTestInfraWithRedis(t, mockProvider, lockKey, client) + nodeInfra := newTestInfraWithRedis(t, mockProvider, lockKey, client, true) err = nodeInfra.Declare(ctx) require.NoError(t, err) @@ -212,9 +212,9 @@ func TestInfra_Verify_InfrastructureExists(t *testing.T) { mockProvider.exists.Store(true) // Infrastructure exists lockKey := "test:lock:" + idgen.String() - infra := newTestInfra(t, mockProvider, lockKey) + infra := newTestInfra(t, mockProvider, lockKey, false) - // Verify should succeed when infrastructure exists + // Verify should succeed when infrastructure exists (shouldManage=false) err := infra.Verify(ctx) require.NoError(t, err) @@ -231,9 +231,9 @@ func TestInfra_Verify_InfrastructureDoesNotExist(t *testing.T) { mockProvider.exists.Store(false) // Infrastructure does not exist lockKey := "test:lock:" + idgen.String() - infraInstance := newTestInfra(t, mockProvider, lockKey) + infraInstance := newTestInfra(t, mockProvider, lockKey, false) - // Verify should fail with ErrInfraNotFound + // Verify should fail with ErrInfraNotFound when shouldManage=false err := infraInstance.Verify(ctx) require.Error(t, err) assert.ErrorIs(t, err, infra.ErrInfraNotFound) @@ -252,9 +252,9 @@ func TestInfra_Verify_ExistCheckError(t *testing.T) { } lockKey := "test:lock:" + idgen.String() - infra := newTestInfra(t, mockProvider, lockKey) + infra := newTestInfra(t, mockProvider, lockKey, false) - // Verify should fail with wrapped error + // Verify should fail with wrapped error when shouldManage=false err := infra.Verify(ctx) require.Error(t, err) assert.Contains(t, err.Error(), "failed to verify infrastructure exists") @@ -262,3 +262,4 @@ func TestInfra_Verify_ExistCheckError(t *testing.T) { // Verify no declaration happened assert.Equal(t, int32(0), mockProvider.declareCount.Load()) } + From 94ad7085dfa28b35444c4b37d1da88d6adf15764 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Wed, 12 Nov 2025 22:53:36 +0700 Subject: [PATCH 3/5] refactor: app lifecycle --- internal/app/app.go | 337 ++++++++++++++++++-------------------- internal/app/migration.go | 129 +++++++++++++++ 2 files changed, 284 insertions(+), 182 deletions(-) create mode 100644 internal/app/migration.go diff --git a/internal/app/app.go b/internal/app/app.go index fc875ad8..c199a430 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -5,7 +5,6 @@ import ( "errors" "os" "os/signal" - "strings" "syscall" "time" @@ -13,16 +12,25 @@ import ( "github.com/hookdeck/outpost/internal/idgen" "github.com/hookdeck/outpost/internal/infra" "github.com/hookdeck/outpost/internal/logging" - "github.com/hookdeck/outpost/internal/migrator" "github.com/hookdeck/outpost/internal/otel" "github.com/hookdeck/outpost/internal/redis" "github.com/hookdeck/outpost/internal/services" "github.com/hookdeck/outpost/internal/telemetry" + "github.com/hookdeck/outpost/internal/worker" "go.uber.org/zap" ) type App struct { config *config.Config + logger *logging.Logger + + // Runtime dependencies + redisClient redis.Cmdable + telemetry telemetry.Telemetry + builder *services.ServiceBuilder + supervisor *worker.WorkerSupervisor + otelShutdown func(context.Context) error + installationID string } func New(cfg *config.Config) *App { @@ -32,103 +40,94 @@ func New(cfg *config.Config) *App { } func (a *App) Run(ctx context.Context) error { - return run(ctx, a.config) + if err := a.PreRun(ctx); err != nil { + return err + } + defer a.PostRun(ctx) + + return a.run(ctx) } -func run(mainContext context.Context, cfg *config.Config) error { - logger, err := logging.NewLogger( - logging.WithLogLevel(cfg.LogLevel), - logging.WithAuditLog(cfg.AuditLog), - ) - if err != nil { +// PreRun initializes all dependencies before starting the application +func (a *App) PreRun(ctx context.Context) error { + if err := a.setupLogger(); err != nil { return err } - defer logger.Sync() + defer func() { + if r := recover(); r != nil { + a.logger.Error("panic during PreRun", zap.Any("panic", r)) + } + }() + + a.logger.Info("starting outpost", + zap.String("config_path", a.config.ConfigFilePath()), + zap.String("service", a.config.MustGetService().String())) - logFields := []zap.Field{ - zap.String("config_path", cfg.ConfigFilePath()), - zap.String("service", cfg.MustGetService().String()), + if a.config.DeploymentID != "" { + a.logger.Info("deployment configured", zap.String("deployment_id", a.config.DeploymentID)) } - if cfg.DeploymentID != "" { - logFields = append(logFields, zap.String("deployment_id", cfg.DeploymentID)) + + if err := a.configureIDGenerators(); err != nil { + return err } - logger.Info("starting outpost", logFields...) - - // Initialize ID generators - logger.Debug("configuring ID generators", - zap.String("type", cfg.IDGen.Type), - zap.String("event_prefix", cfg.IDGen.EventPrefix), - zap.String("destination_prefix", cfg.IDGen.DestinationPrefix), - zap.String("delivery_prefix", cfg.IDGen.DeliveryPrefix), - zap.String("delivery_event_prefix", cfg.IDGen.DeliveryEventPrefix)) - if err := idgen.Configure(idgen.IDGenConfig{ - Type: cfg.IDGen.Type, - EventPrefix: cfg.IDGen.EventPrefix, - DestinationPrefix: cfg.IDGen.DestinationPrefix, - DeliveryPrefix: cfg.IDGen.DeliveryPrefix, - DeliveryEventPrefix: cfg.IDGen.DeliveryEventPrefix, - }); err != nil { - logger.Error("failed to configure ID generators", zap.Error(err)) + + if err := a.runMigrations(ctx); err != nil { return err } - if err := runMigration(mainContext, cfg, logger); err != nil { + if err := a.initializeRedis(ctx); err != nil { return err } - logger.Debug("initializing Redis client for infrastructure") - // Create Redis client for infrastructure components - redisClient, err := redis.New(mainContext, cfg.Redis.ToConfig()) - if err != nil { - logger.Error("Redis client initialization failed", zap.Error(err)) + if err := a.initializeInfrastructure(ctx); err != nil { return err } - logger.Debug("initializing infrastructure") - if err := infra.Init(mainContext, infra.Config{ - DeliveryMQ: cfg.MQs.ToInfraConfig("deliverymq"), - LogMQ: cfg.MQs.ToInfraConfig("logmq"), - ShouldManage: cfg.MQs.ShouldManage, - }, redisClient); err != nil { - logger.Error("infrastructure initialization failed", zap.Error(err)) + if err := a.initializeTelemetry(ctx); err != nil { return err } - installationID, err := getInstallation(mainContext, redisClient, cfg.Telemetry.ToTelemetryConfig()) - if err != nil { + if err := a.setupOpenTelemetry(ctx); err != nil { return err } - telemetry := telemetry.New(logger, cfg.Telemetry.ToTelemetryConfig(), installationID) - telemetry.Init(mainContext) - telemetry.ApplicationStarted(mainContext, cfg.ToTelemetryApplicationInfo()) + if err := a.buildServices(ctx); err != nil { + return err + } - // Set up cancellation context - ctx, cancel := context.WithCancel(mainContext) - defer cancel() + return nil +} - // Set up OpenTelemetry. - if cfg.OpenTelemetry.ToConfig() != nil { - otelShutdown, err := otel.SetupOTelSDK(ctx, cfg.OpenTelemetry.ToConfig()) - if err != nil { - return err - } - // Handle shutdown properly so nothing leaks. - defer func() { - err = errors.Join(err, otelShutdown(context.Background())) - }() +// PostRun handles cleanup after application exits +func (a *App) PostRun(ctx context.Context) { + if a.telemetry != nil { + a.telemetry.Flush() } - // Build services using ServiceBuilder - logger.Debug("building services") - builder := services.NewServiceBuilder(ctx, cfg, logger, telemetry) + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer shutdownCancel() - supervisor, err := builder.BuildWorkers() - if err != nil { - logger.Error("failed to build workers", zap.Error(err)) - return err + if a.builder != nil { + a.builder.Cleanup(shutdownCtx) + } + + if a.otelShutdown != nil { + if err := a.otelShutdown(context.Background()); err != nil { + a.logger.Error("OpenTelemetry shutdown error", zap.Error(err)) + } } + if a.logger != nil { + a.logger.Info("outpost shutdown complete") + a.logger.Sync() + } +} + +func (a *App) run(ctx context.Context) error { + // Set up cancellation context + ctx, cancel := context.WithCancel(ctx) + defer cancel() + // Handle sigterm and await termChan signal termChan := make(chan os.Signal, 1) signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM) @@ -136,154 +135,128 @@ func run(mainContext context.Context, cfg *config.Config) error { // Run workers in goroutine errChan := make(chan error, 1) go func() { - errChan <- supervisor.Run(ctx) + errChan <- a.supervisor.Run(ctx) }() // Wait for either termination signal or worker failure var exitErr error select { case <-termChan: - logger.Info("shutdown signal received") + a.logger.Info("shutdown signal received") cancel() // Cancel context to trigger graceful shutdown err := <-errChan // context.Canceled is expected during graceful shutdown if err != nil && !errors.Is(err, context.Canceled) { - logger.Error("error during graceful shutdown", zap.Error(err)) + a.logger.Error("error during graceful shutdown", zap.Error(err)) exitErr = err } case err := <-errChan: // Workers exited unexpectedly if err != nil { - logger.Error("workers exited unexpectedly", zap.Error(err)) + a.logger.Error("workers exited unexpectedly", zap.Error(err)) exitErr = err } } - telemetry.Flush() - - // Run cleanup functions - shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second) - defer shutdownCancel() - builder.Cleanup(shutdownCtx) - - logger.Info("outpost shutdown complete") - return exitErr } -// runMigration handles database schema migrations with retry logic for lock conflicts. -// -// MIGRATION LOCK BEHAVIOR: -// - Database locks are only acquired when migrations need to be performed -// - When multiple nodes start simultaneously and migrations are pending: -// 1. One node acquires the lock and performs migrations (ideally < 5 seconds) -// 2. Other nodes fail with lock errors ("try lock failed", "can't acquire lock") -// 3. Failed nodes wait 5 seconds and retry -// 4. On retry, migrations are complete and nodes proceed successfully -// -// RETRY STRATEGY: -// - Max 3 attempts with 5-second delays between retries -// - 5 seconds is sufficient because most migrations complete quickly -// - If no migrations are needed (common case), all nodes proceed immediately without lock contention -func runMigration(ctx context.Context, cfg *config.Config, logger *logging.Logger) error { - const ( - maxRetries = 3 - retryDelay = 5 * time.Second +func (a *App) setupLogger() error { + logger, err := logging.NewLogger( + logging.WithLogLevel(a.config.LogLevel), + logging.WithAuditLog(a.config.AuditLog), ) + if err != nil { + return err + } + a.logger = logger + return nil +} - var lastErr error - - for attempt := 1; attempt <= maxRetries; attempt++ { - migrator, err := migrator.New(cfg.ToMigratorOpts()) - if err != nil { - return err - } - - version, versionJumped, err := migrator.Up(ctx, -1) - - // Always close the migrator after each attempt - sourceErr, dbErr := migrator.Close(ctx) - if sourceErr != nil { - logger.Error("failed to close migrator source", zap.Error(sourceErr)) - } - if dbErr != nil { - logger.Error("failed to close migrator database connection", zap.Error(dbErr)) - } +func (a *App) configureIDGenerators() error { + a.logger.Debug("configuring ID generators", + zap.String("type", a.config.IDGen.Type), + zap.String("event_prefix", a.config.IDGen.EventPrefix), + zap.String("destination_prefix", a.config.IDGen.DestinationPrefix), + zap.String("delivery_prefix", a.config.IDGen.DeliveryPrefix), + zap.String("delivery_event_prefix", a.config.IDGen.DeliveryEventPrefix)) - if err == nil { - // Migration succeeded - if versionJumped > 0 { - logger.Info("migrations applied", - zap.Int("version", version), - zap.Int("version_applied", versionJumped)) - } else { - logger.Info("no migrations applied", zap.Int("version", version)) - } - return nil - } + if err := idgen.Configure(idgen.IDGenConfig{ + Type: a.config.IDGen.Type, + EventPrefix: a.config.IDGen.EventPrefix, + DestinationPrefix: a.config.IDGen.DestinationPrefix, + DeliveryPrefix: a.config.IDGen.DeliveryPrefix, + DeliveryEventPrefix: a.config.IDGen.DeliveryEventPrefix, + }); err != nil { + a.logger.Error("failed to configure ID generators", zap.Error(err)) + return err + } + return nil +} - // Check if this is a lock-related error - // Lock errors can manifest as: - // - "can't acquire lock" (database.ErrLocked) - // - "try lock failed" (postgres advisory lock failure) - // - "pg_advisory_lock" (postgres lock function errors) - isLockError := isLockRelatedError(err) - lastErr = err - - if !isLockError { - // Not a lock error, fail immediately - logger.Error("migration failed", zap.Error(err)) - return err - } +func (a *App) runMigrations(ctx context.Context) error { + return runMigration(ctx, a.config, a.logger) +} - // Lock error - retry if we have attempts remaining - if attempt < maxRetries { - logger.Warn("migration lock conflict, retrying", - zap.Int("attempt", attempt), - zap.Int("max_retries", maxRetries), - zap.Duration("retry_delay", retryDelay), - zap.Error(err)) - - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(retryDelay): - // Continue to next attempt - } - } else { - // Exhausted all retries - logger.Error("migration failed after retries", - zap.Int("attempts", maxRetries), - zap.Error(err)) - } +func (a *App) initializeRedis(ctx context.Context) error { + a.logger.Debug("initializing Redis client for infrastructure") + redisClient, err := redis.New(ctx, a.config.Redis.ToConfig()) + if err != nil { + a.logger.Error("Redis client initialization failed", zap.Error(err)) + return err } + a.redisClient = redisClient + return nil +} - return lastErr +func (a *App) initializeInfrastructure(ctx context.Context) error { + a.logger.Debug("initializing infrastructure") + if err := infra.Init(ctx, infra.Config{ + DeliveryMQ: a.config.MQs.ToInfraConfig("deliverymq"), + LogMQ: a.config.MQs.ToInfraConfig("logmq"), + ShouldManage: a.config.MQs.ShouldManage, + }, a.redisClient); err != nil { + a.logger.Error("infrastructure initialization failed", zap.Error(err)) + return err + } + return nil } -// isLockRelatedError checks if an error is related to database migration lock acquisition. -// This includes errors from golang-migrate's locking mechanism. -func isLockRelatedError(err error) bool { - if err == nil { - return false +func (a *App) initializeTelemetry(ctx context.Context) error { + installationID, err := getInstallation(ctx, a.redisClient, a.config.Telemetry.ToTelemetryConfig()) + if err != nil { + return err } + a.installationID = installationID - errMsg := err.Error() + a.telemetry = telemetry.New(a.logger, a.config.Telemetry.ToTelemetryConfig(), installationID) + a.telemetry.Init(ctx) + a.telemetry.ApplicationStarted(ctx, a.config.ToTelemetryApplicationInfo()) + return nil +} - // Check for lock-related error messages from golang-migrate: - // 1. "can't acquire lock" - database.ErrLocked from golang-migrate/migrate/v4/database - // 2. "try lock failed" - returned by postgres driver when pg_advisory_lock() fails - // See: https://github.com/golang-migrate/migrate/blob/master/database/postgres/postgres.go - lockIndicators := []string{ - "can't acquire lock", - "try lock failed", +func (a *App) setupOpenTelemetry(ctx context.Context) error { + if a.config.OpenTelemetry.ToConfig() != nil { + otelShutdown, err := otel.SetupOTelSDK(ctx, a.config.OpenTelemetry.ToConfig()) + if err != nil { + return err + } + a.otelShutdown = otelShutdown } + return nil +} - for _, indicator := range lockIndicators { - if strings.Contains(errMsg, indicator) { - return true - } +func (a *App) buildServices(ctx context.Context) error { + a.logger.Debug("building services") + builder := services.NewServiceBuilder(ctx, a.config, a.logger, a.telemetry) + + supervisor, err := builder.BuildWorkers() + if err != nil { + a.logger.Error("failed to build workers", zap.Error(err)) + return err } - return false + a.builder = builder + a.supervisor = supervisor + return nil } diff --git a/internal/app/migration.go b/internal/app/migration.go new file mode 100644 index 00000000..9938b659 --- /dev/null +++ b/internal/app/migration.go @@ -0,0 +1,129 @@ +package app + +import ( + "context" + "strings" + "time" + + "github.com/hookdeck/outpost/internal/config" + "github.com/hookdeck/outpost/internal/logging" + "github.com/hookdeck/outpost/internal/migrator" + "go.uber.org/zap" +) + +// runMigration handles database schema migrations with retry logic for lock conflicts. +// +// MIGRATION LOCK BEHAVIOR: +// - Database locks are only acquired when migrations need to be performed +// - When multiple nodes start simultaneously and migrations are pending: +// 1. One node acquires the lock and performs migrations (ideally < 5 seconds) +// 2. Other nodes fail with lock errors ("try lock failed", "can't acquire lock") +// 3. Failed nodes wait 5 seconds and retry +// 4. On retry, migrations are complete and nodes proceed successfully +// +// RETRY STRATEGY: +// - Max 3 attempts with 5-second delays between retries +// - 5 seconds is sufficient because most migrations complete quickly +// - If no migrations are needed (common case), all nodes proceed immediately without lock contention +func runMigration(ctx context.Context, cfg *config.Config, logger *logging.Logger) error { + const ( + maxRetries = 3 + retryDelay = 5 * time.Second + ) + + var lastErr error + + for attempt := 1; attempt <= maxRetries; attempt++ { + migrator, err := migrator.New(cfg.ToMigratorOpts()) + if err != nil { + return err + } + + version, versionJumped, err := migrator.Up(ctx, -1) + + // Always close the migrator after each attempt + sourceErr, dbErr := migrator.Close(ctx) + if sourceErr != nil { + logger.Error("failed to close migrator source", zap.Error(sourceErr)) + } + if dbErr != nil { + logger.Error("failed to close migrator database connection", zap.Error(dbErr)) + } + + if err == nil { + // Migration succeeded + if versionJumped > 0 { + logger.Info("migrations applied", + zap.Int("version", version), + zap.Int("version_applied", versionJumped)) + } else { + logger.Info("no migrations applied", zap.Int("version", version)) + } + return nil + } + + // Check if this is a lock-related error + // Lock errors can manifest as: + // - "can't acquire lock" (database.ErrLocked) + // - "try lock failed" (postgres advisory lock failure) + // - "pg_advisory_lock" (postgres lock function errors) + isLockError := isLockRelatedError(err) + lastErr = err + + if !isLockError { + // Not a lock error, fail immediately + logger.Error("migration failed", zap.Error(err)) + return err + } + + // Lock error - retry if we have attempts remaining + if attempt < maxRetries { + logger.Warn("migration lock conflict, retrying", + zap.Int("attempt", attempt), + zap.Int("max_retries", maxRetries), + zap.Duration("retry_delay", retryDelay), + zap.Error(err)) + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(retryDelay): + // Continue to next attempt + } + } else { + // Exhausted all retries + logger.Error("migration failed after retries", + zap.Int("attempts", maxRetries), + zap.Error(err)) + } + } + + return lastErr +} + +// isLockRelatedError checks if an error is related to database migration lock acquisition. +// This includes errors from golang-migrate's locking mechanism. +func isLockRelatedError(err error) bool { + if err == nil { + return false + } + + errMsg := err.Error() + + // Check for lock-related error messages from golang-migrate: + // 1. "can't acquire lock" - database.ErrLocked from golang-migrate/migrate/v4/database + // 2. "try lock failed" - returned by postgres driver when pg_advisory_lock() fails + // See: https://github.com/golang-migrate/migrate/blob/master/database/postgres/postgres.go + lockIndicators := []string{ + "can't acquire lock", + "try lock failed", + } + + for _, indicator := range lockIndicators { + if strings.Contains(errMsg, indicator) { + return true + } + } + + return false +} From c28036f27ce2c87046942e70dcd80e44b64a5ccd Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Wed, 12 Nov 2025 22:55:56 +0700 Subject: [PATCH 4/5] docs: generate config --- docs/pages/references/configuration.mdx | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/pages/references/configuration.mdx b/docs/pages/references/configuration.mdx index c22df5bc..d3694808 100644 --- a/docs/pages/references/configuration.mdx +++ b/docs/pages/references/configuration.mdx @@ -80,6 +80,7 @@ Global configurations are provided through env variables or a YAML file. ConfigM | `LOG_MAX_CONCURRENCY` | Maximum number of log writing operations to process concurrently. | `1` | No | | `MAX_DESTINATIONS_PER_TENANT` | Maximum number of destinations allowed per tenant/organization. | `20` | No | | `MAX_RETRY_LIMIT` | Maximum number of retry attempts for a single event delivery before giving up. Ignored if retry_schedule is provided. | `10` | No | +| `MQS_SHOULD_MANAGE` | Whether Outpost should create and manage message queue infrastructure. Set to false if you manage infrastructure externally (e.g., via Terraform). Defaults to true for backward compatibility. | `nil` | No | | `ORGANIZATION_NAME` | Name of the organization, used for display purposes and potentially in user agent strings. | `nil` | No | | `OTEL_EXPORTER` | Specifies the OTLP exporter to use for this telemetry type (e.g., 'otlp'). Typically used with environment variables like OTEL_EXPORTER_OTLP_TRACES_ENDPOINT. | `nil` | Conditional | | `OTEL_PROTOCOL` | Specifies the OTLP protocol ('grpc' or 'http') for this telemetry type. Typically used with environment variables like OTEL_EXPORTER_OTLP_TRACES_PROTOCOL. | `nil` | Conditional | @@ -377,6 +378,9 @@ mqs: server_url: "" + # Whether Outpost should create and manage message queue infrastructure. Set to false if you manage infrastructure externally (e.g., via Terraform). Defaults to true for backward compatibility. + should_manage: # <*bool> + # Maximum number of destinations allowed per tenant/organization. max_destinations_per_tenant: 20 From f68313919cdb603bf4242b01bc86f954b7db354c Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Wed, 12 Nov 2025 22:56:11 +0700 Subject: [PATCH 5/5] chore: gofmt --- internal/infra/infra_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/infra/infra_test.go b/internal/infra/infra_test.go index 765d9f3a..47609110 100644 --- a/internal/infra/infra_test.go +++ b/internal/infra/infra_test.go @@ -262,4 +262,3 @@ func TestInfra_Verify_ExistCheckError(t *testing.T) { // Verify no declaration happened assert.Equal(t, int32(0), mockProvider.declareCount.Load()) } -