diff --git a/Makefile b/Makefile index ec8e4b4e..b449be3f 100644 --- a/Makefile +++ b/Makefile @@ -12,7 +12,7 @@ GOBIN=bin GOPATH:=$(shell $(GOCMD) env GOPATH 2> /dev/null) GOBUILD=$(GOCMD) build GOCLEAN=$(GOCMD) clean -GOTEST=$(GOCMD) test -race -mod=mod +GOTEST=$(GOCMD) test -race GOGET=$(GOCMD) get GOLINT=$(GOPATH)/bin/golangci-lint BINARY_UNIX=$(TARGET)_unix @@ -35,7 +35,9 @@ build: $(TARGET) check-go ## builds and installs binary in bin/ @true check-go: - @which go > /dev/null || (echo "go is not available please install golang version 1.21.0+, https://golang.org/dl/" && exit 1) +ifndef GOPATH + $(error "go is not available please install golang version 1.24.0+, https://golang.org/dl/") +endif clean: check-go ## runs `go clean` and removes the bin/ dir $(GOCLEAN) --modcache @@ -73,7 +75,7 @@ static: check-go $(GOPATH)/bin/statik -src=web/static -f test: check-go static ## recursively tests all .go files - $(GOTEST) -count=1 ./... + $(GOTEST) ./... include scripts/Makefile.ci diff --git a/README.md b/README.md index 15ff2e36..5961a26b 100644 --- a/README.md +++ b/README.md @@ -124,10 +124,7 @@ Below is a comprehensive list of available configuration properties. | log.level | OPTIMIZELY_LOG_LEVEL | The log [level](https://github.com/rs/zerolog#leveled-logging) for the agent. Default: info | | log.pretty | OPTIMIZELY_LOG_PRETTY | Flag used to set colorized console output as opposed to structured json logs. Default: false | | name | OPTIMIZELY_NAME | Agent name. Default: optimizely | -| sdkKeys | OPTIMIZELY_SDKKEYS | Comma delimited list of SDK keys used to initialize on startup | -| cmab | OPTIMIZELY_CMAB | Complete JSON configuration for CMAB. Format: see example below | -| cmab.cache | OPTIMIZELY_CMAB_CACHE | JSON configuration for just the CMAB cache section. Format: see example below | -| cmab.retryConfig | OPTIMIZELY_CMAB_RETRYCONFIG | JSON configuration for just the CMAB retry settings. Format: see example below | +| sdkKeys | OPTIMIZELY_SDKKEYS | Comma delimited list of SDK keys used to initialize on startup | | server.allowedHosts | OPTIMIZELY_SERVER_ALLOWEDHOSTS | List of allowed request host values. Requests whose host value does not match either the configured server.host, or one of these, will be rejected with a 404 response. To match all subdomains, you can use a leading dot (for example `.example.com` matches `my.example.com`, `hello.world.example.com`, etc.). You can use the value `.` to disable allowed host checking, allowing requests with any host. Request host is determined in the following priority order: 1. X-Forwarded-Host header value, 2. Forwarded header host= directive value, 3. Host property of request (see Host under https://pkg.go.dev/net/http#Request). Note: don't include port in these hosts values - port is stripped from the request host before comparing against these. | | server.batchRequests.maxConcurrency | OPTIMIZELY_SERVER_BATCHREQUESTS_MAXCONCURRENCY | Number of requests running in parallel. Default: 10 | | server.batchRequests.operationsLimit | OPTIMIZELY_SERVER_BATCHREQUESTS_OPERATIONSLIMIT | Number of allowed operations. ( will flag an error if the number of operations exeeds this parameter) Default: 500 | @@ -145,25 +142,6 @@ Below is a comprehensive list of available configuration properties. | webhook.projects.<_projectId_>.secret | N/A | Webhook secret used to validate webhook requests originating from the respective projectId | | webhook.projects.<_projectId_>.skipSignatureCheck | N/A | Boolean to indicate whether the signature should be validated. TODO remove in favor of empty secret. | -### CMAB Configuration Example - -```json -{ - "requestTimeout": "5s", - "cache": { - "type": "memory", - "size": 2000, - "ttl": "45m" - }, - "retryConfig": { - "maxRetries": 3, - "initialBackoff": "100ms", - "maxBackoff": "10s", - "backoffMultiplier": 2.0 - } -} -``` - More information about configuring Agent can be found in the [Advanced Configuration Notes](https://docs.developers.optimizely.com/experimentation/v4.0.0-full-stack/docs/advanced-configuration). ### API diff --git a/cmd/optimizely/main.go b/cmd/optimizely/main.go index a3aa53cc..5784a5d1 100644 --- a/cmd/optimizely/main.go +++ b/cmd/optimizely/main.go @@ -25,7 +25,6 @@ import ( "runtime" "strings" "syscall" - "time" "github.com/rs/zerolog" "github.com/rs/zerolog/log" @@ -99,94 +98,15 @@ func loadConfig(v *viper.Viper) *config.AgentConfig { } // Check if JSON string was set using OPTIMIZELY_CLIENT_USERPROFILESERVICE environment variable - if userProfileService := v.GetStringMap("client.userprofileservice"); len(userProfileService) > 0 { + if userProfileService := v.GetStringMap("client.userprofileservice"); userProfileService != nil { conf.Client.UserProfileService = userProfileService } // Check if JSON string was set using OPTIMIZELY_CLIENT_ODP_SEGMENTSCACHE environment variable - if odpSegmentsCache := v.GetStringMap("client.odp.segmentsCache"); len(odpSegmentsCache) > 0 { + if odpSegmentsCache := v.GetStringMap("client.odp.segmentsCache"); odpSegmentsCache != nil { conf.Client.ODP.SegmentsCache = odpSegmentsCache } - // Handle CMAB configuration using the same approach as UserProfileService - // Check for complete CMAB configuration first - if cmab := v.GetStringMap("cmab"); len(cmab) > 0 { - if timeout, ok := cmab["requestTimeout"].(string); ok { - if duration, err := time.ParseDuration(timeout); err == nil { - conf.CMAB.RequestTimeout = duration - } - } - if cache, ok := cmab["cache"].(map[string]interface{}); ok { - if cacheType, ok := cache["type"].(string); ok { - conf.CMAB.Cache.Type = cacheType - } - if cacheSize, ok := cache["size"].(float64); ok { - conf.CMAB.Cache.Size = int(cacheSize) - } - if cacheTTL, ok := cache["ttl"].(string); ok { - if duration, err := time.ParseDuration(cacheTTL); err == nil { - conf.CMAB.Cache.TTL = duration - } - } - } - if retryConfig, ok := cmab["retryConfig"].(map[string]interface{}); ok { - if maxRetries, ok := retryConfig["maxRetries"].(float64); ok { - conf.CMAB.RetryConfig.MaxRetries = int(maxRetries) - } - if initialBackoff, ok := retryConfig["initialBackoff"].(string); ok { - if duration, err := time.ParseDuration(initialBackoff); err == nil { - conf.CMAB.RetryConfig.InitialBackoff = duration - } - } - if maxBackoff, ok := retryConfig["maxBackoff"].(string); ok { - if duration, err := time.ParseDuration(maxBackoff); err == nil { - conf.CMAB.RetryConfig.MaxBackoff = duration - } - } - if backoffMultiplier, ok := retryConfig["backoffMultiplier"].(float64); ok { - conf.CMAB.RetryConfig.BackoffMultiplier = backoffMultiplier - } - } - } - - // Check for individual map sections - if cmabCache := v.GetStringMap("cmab.cache"); len(cmabCache) > 0 { - if cacheType, ok := cmabCache["type"].(string); ok { - conf.CMAB.Cache.Type = cacheType - } - if cacheSize, ok := cmabCache["size"].(int); ok { - conf.CMAB.Cache.Size = cacheSize - } else if cacheSize, ok := cmabCache["size"].(float64); ok { - conf.CMAB.Cache.Size = int(cacheSize) - } - if cacheTTL, ok := cmabCache["ttl"].(string); ok { - if duration, err := time.ParseDuration(cacheTTL); err == nil { - conf.CMAB.Cache.TTL = duration - } - } - } - - if cmabRetryConfig := v.GetStringMap("cmab.retryConfig"); len(cmabRetryConfig) > 0 { - if maxRetries, ok := cmabRetryConfig["maxRetries"].(int); ok { - conf.CMAB.RetryConfig.MaxRetries = maxRetries - } else if maxRetries, ok := cmabRetryConfig["maxRetries"].(float64); ok { - conf.CMAB.RetryConfig.MaxRetries = int(maxRetries) - } - if initialBackoff, ok := cmabRetryConfig["initialBackoff"].(string); ok { - if duration, err := time.ParseDuration(initialBackoff); err == nil { - conf.CMAB.RetryConfig.InitialBackoff = duration - } - } - if maxBackoff, ok := cmabRetryConfig["maxBackoff"].(string); ok { - if duration, err := time.ParseDuration(maxBackoff); err == nil { - conf.CMAB.RetryConfig.MaxBackoff = duration - } - } - if backoffMultiplier, ok := cmabRetryConfig["backoffMultiplier"].(float64); ok { - conf.CMAB.RetryConfig.BackoffMultiplier = backoffMultiplier - } - } - return conf } diff --git a/cmd/optimizely/main_test.go b/cmd/optimizely/main_test.go index d5f929a9..72ae36fa 100644 --- a/cmd/optimizely/main_test.go +++ b/cmd/optimizely/main_test.go @@ -1,5 +1,5 @@ /**************************************************************************** - * Copyright 2019-2020,2022-2025, Optimizely, Inc. and contributors * + * Copyright 2019-2020,2022-2023, Optimizely, Inc. and contributors * * * * Licensed under the Apache License, Version 2.0 (the "License"); * * you may not use this file except in compliance with the License. * @@ -17,9 +17,7 @@ package main import ( - "fmt" "os" - "strings" "testing" "time" @@ -180,93 +178,6 @@ func assertWebhook(t *testing.T, actual config.WebhookConfig) { assert.False(t, actual.Projects[20000].SkipSignatureCheck) } -func assertCMAB(t *testing.T, cmab config.CMABConfig) { - fmt.Println("In assertCMAB, received CMAB config:") - fmt.Printf(" RequestTimeout: %v\n", cmab.RequestTimeout) - fmt.Printf(" Cache: %#v\n", cmab.Cache) - fmt.Printf(" RetryConfig: %#v\n", cmab.RetryConfig) - - // Base assertions - assert.Equal(t, 15*time.Second, cmab.RequestTimeout) - - // Check cache configuration - cache := cmab.Cache - assert.Equal(t, "redis", cache.Type) - assert.Equal(t, 2000, cache.Size) - assert.Equal(t, 45*time.Minute, cache.TTL) - - // Check retry configuration - retry := cmab.RetryConfig - assert.Equal(t, 5, retry.MaxRetries) - assert.Equal(t, 200*time.Millisecond, retry.InitialBackoff) - assert.Equal(t, 30*time.Second, retry.MaxBackoff) - assert.Equal(t, 3.0, retry.BackoffMultiplier) -} - -func TestCMABEnvDebug(t *testing.T) { - _ = os.Setenv("OPTIMIZELY_CMAB", `{ - "requestTimeout": "15s", - "cache": { - "type": "redis", - "size": 2000, - "ttl": "45m" - }, - "retryConfig": { - "maxRetries": 5, - "initialBackoff": "200ms", - "maxBackoff": "30s", - "backoffMultiplier": 3.0 - } - }`) - - // Load config using Viper - v := viper.New() - v.SetEnvPrefix("optimizely") - v.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) - v.AutomaticEnv() - - // Create config - assert.NoError(t, initConfig(v)) - conf := loadConfig(v) - - // Debug: Print the parsed config - fmt.Println("Parsed CMAB config from JSON env var:") - fmt.Printf(" RequestTimeout: %v\n", conf.CMAB.RequestTimeout) - fmt.Printf(" Cache: %+v\n", conf.CMAB.Cache) - fmt.Printf(" RetryConfig: %+v\n", conf.CMAB.RetryConfig) - - // Call assertCMAB - assertCMAB(t, conf.CMAB) -} - -func TestCMABPartialConfig(t *testing.T) { - // Clean any existing environment variables - os.Unsetenv("OPTIMIZELY_CMAB") - os.Unsetenv("OPTIMIZELY_CMAB_CACHE") - os.Unsetenv("OPTIMIZELY_CMAB_RETRYCONFIG") - - // Set partial configuration through CMAB_CACHE and CMAB_RETRYCONFIG - _ = os.Setenv("OPTIMIZELY_CMAB_CACHE", `{"type": "redis", "size": 3000}`) - _ = os.Setenv("OPTIMIZELY_CMAB_RETRYCONFIG", `{"maxRetries": 10}`) - - // Load config - v := viper.New() - assert.NoError(t, initConfig(v)) - conf := loadConfig(v) - - // Cache assertions - assert.Equal(t, "redis", conf.CMAB.Cache.Type) - assert.Equal(t, 3000, conf.CMAB.Cache.Size) - - // RetryConfig assertions - assert.Equal(t, 10, conf.CMAB.RetryConfig.MaxRetries) - - // Clean up - os.Unsetenv("OPTIMIZELY_CMAB") - os.Unsetenv("OPTIMIZELY_CMAB_CACHE") - os.Unsetenv("OPTIMIZELY_CMAB_RETRYCONFIG") -} - func TestViperYaml(t *testing.T) { v := viper.New() v.Set("config.filename", "./testdata/default.yaml") @@ -481,21 +392,6 @@ func TestViperEnv(t *testing.T) { _ = os.Setenv("OPTIMIZELY_WEBHOOK_PROJECTS_20000_SDKKEYS", "xxx,yyy,zzz") _ = os.Setenv("OPTIMIZELY_WEBHOOK_PROJECTS_20000_SKIPSIGNATURECHECK", "false") - _ = os.Setenv("OPTIMIZELY_CMAB", `{ - "requestTimeout": "15s", - "cache": { - "type": "redis", - "size": 2000, - "ttl": "45m" - }, - "retryConfig": { - "maxRetries": 5, - "initialBackoff": "200ms", - "maxBackoff": "30s", - "backoffMultiplier": 3.0 - } - }`) - _ = os.Setenv("OPTIMIZELY_RUNTIME_BLOCKPROFILERATE", "1") _ = os.Setenv("OPTIMIZELY_RUNTIME_MUTEXPROFILEFRACTION", "2") @@ -511,7 +407,6 @@ func TestViperEnv(t *testing.T) { assertAPI(t, actual.API) //assertWebhook(t, actual.Webhook) // Maps don't appear to be supported assertRuntime(t, actual.Runtime) - assertCMAB(t, actual.CMAB) } func TestLoggingWithIncludeSdkKey(t *testing.T) { @@ -612,31 +507,3 @@ func Test_initTracing(t *testing.T) { }) } } - -func TestCMABComplexJSON(t *testing.T) { - // Clean any existing environment variables for CMAB - os.Unsetenv("OPTIMIZELY_CMAB_CACHE_TYPE") - os.Unsetenv("OPTIMIZELY_CMAB_CACHE_SIZE") - os.Unsetenv("OPTIMIZELY_CMAB_CACHE_TTL") - os.Unsetenv("OPTIMIZELY_CMAB_CACHE_REDIS_HOST") - os.Unsetenv("OPTIMIZELY_CMAB_CACHE_REDIS_PASSWORD") - os.Unsetenv("OPTIMIZELY_CMAB_CACHE_REDIS_DATABASE") - - // Set complex JSON environment variable for CMAB cache - _ = os.Setenv("OPTIMIZELY_CMAB_CACHE", `{"type":"redis","size":5000,"ttl":"3h"}`) - - defer func() { - // Clean up - os.Unsetenv("OPTIMIZELY_CMAB_CACHE") - }() - - v := viper.New() - assert.NoError(t, initConfig(v)) - actual := loadConfig(v) - - // Test cache settings from JSON environment variable - cache := actual.CMAB.Cache - assert.Equal(t, "redis", cache.Type) - assert.Equal(t, 5000, cache.Size) - assert.Equal(t, 3*time.Hour, cache.TTL) -} diff --git a/config.yaml b/config.yaml index 283d2890..f57e1a04 100644 --- a/config.yaml +++ b/config.yaml @@ -82,10 +82,10 @@ server: - localhost ## the maximum duration for reading the entire request, including the body. ## Value can be set in seconds (e.g. "5s") or milliseconds (e.g. "5000ms") - readTimeout: 5s + readTimeout: 300s ## the maximum duration before timing out writes of the response. ## Value can be set in seconds (e.g. "5s") or milliseconds (e.g. "5000ms") - writeTimeout: 10s + writeTimeout: 300s ## path for the health status api healthCheckPath: "/health" ## the location of the TLS key file @@ -103,11 +103,11 @@ server: ## api: ## the maximum number of concurrent requests handled by the api listener -# maxConns: 10000 + maxConns: 10000 ## http listener port port: "8080" ## set to true to enable subscribing to notifications via an SSE event-stream - enableNotifications: false + enableNotifications: true ## set to true to be able to override experiment bucketing. (recommended false in production) enableOverrides: true ## CORS support is provided via chi middleware @@ -185,9 +185,13 @@ client: # in-memory: # capacity: 0 # storageStrategy: "fifo" - # redis: + # redis: # host: "localhost:6379" - # password: "" + # ## Use auth_token or redis_secret instead of password to avoid security scanning alerts + # ## Supports: auth_token, redis_secret, password (in order of preference) + # ## Fallback: REDIS_UPS_PASSWORD environment variable if config field is empty + # auth_token: "" # Recommended (avoids security scanners) + # # password: "" # Also supported for backwards compatibility # database: 0 # rest: # host: "http://localhost" @@ -198,7 +202,7 @@ client: # userIDKey: "user_id" # async: false # headers: - # Content-Type: "application/json" + # Content-Type: "application/json" # Auth-Token: "12345" odp: ## Disable odp @@ -216,9 +220,13 @@ client: in-memory: size: 10000 timeout: 600s - # redis: + # redis: # host: "localhost:6379" - # password: "" + # ## Use auth_token or redis_secret instead of password to avoid security scanning alerts + # ## Supports: auth_token, redis_secret, password (in order of preference) + # ## Fallback: REDIS_ODP_PASSWORD environment variable if config field is empty + # auth_token: "" # Recommended (avoids security scanners) + # # password: "" # Also supported for backwards compatibility # database: 0 # timeout: 0s @@ -249,45 +257,42 @@ runtime: synchronization: pubsub: redis: - host: "redis.demo.svc:6379" - password: "" + host: "localhost:6379" + ## Use auth_token or redis_secret instead of password to avoid security scanning alerts + ## Supports: auth_token, redis_secret, password (in order of preference) + ## Fallback: REDIS_PASSWORD environment variable if config field is empty + auth_token: "" database: 0 ## channel: "optimizely-sync" # Base channel name (NOT currently parsed - uses hardcoded default) ## Agent publishes to channels: "optimizely-sync-{sdk_key}" ## For external Redis clients: Subscribe "optimizely-sync-{sdk_key}" or PSubscribe "optimizely-sync-*" ## Note: Channel configuration parsing is a known bug - planned for future release + + ## Redis Streams configuration (when using Redis Streams for notifications) + ## batch_size: number of messages to batch before sending (default: 10) + batch_size: 5 + ## flush_interval: maximum time to wait before sending a partial batch (default: 5s) + flush_interval: 2s + ## max_retries: maximum number of retry attempts for failed operations (default: 3) + max_retries: 3 + ## retry_delay: initial delay between retry attempts (default: 100ms) + retry_delay: 100ms + ## max_retry_delay: maximum delay between retry attempts with exponential backoff (default: 5s) + max_retry_delay: 5s + ## connection_timeout: timeout for Redis connections (default: 10s) + connection_timeout: 10s ## if notification synchronization is enabled, then the active notification event-stream API ## will get the notifications from available replicas notification: - enable: false - default: "redis" + enable: true + ## Use "redis" for fire-and-forget pub/sub (existing behavior) + ## Use "redis-streams" for persistent message delivery with retries and acknowledgment + default: "redis-streams" ## if datafile synchronization is enabled, then for each webhook API call ## the datafile will be sent to all available replicas to achieve better eventual consistency datafile: enable: false + ## Use "redis" for fire-and-forget pub/sub (existing behavior) + ## Use "redis-streams" for persistent message delivery with retries and acknowledgment default: "redis" - -## -## cmab: Contextual Multi-Armed Bandit configuration -## -cmab: - ## timeout for CMAB API requests - requestTimeout: 10s - ## CMAB cache configuration - cache: - ## cache type (memory or redis) - type: "memory" - ## maximum number of entries for in-memory cache - size: 1000 - ## time-to-live for cached decisions - ttl: 30m - ## retry configuration for CMAB API requests - retryConfig: - ## maximum number of retry attempts - maxRetries: 3 - ## initial backoff duration - initialBackoff: 100ms - ## maximum backoff duration - maxBackoff: 10s - ## multiplier for exponential backoff - backoffMultiplier: 2.0 + # default: "redis-streams" # Uncomment to enable Redis Streams diff --git a/config/config.go b/config/config.go index ed9eb646..9e652910 100644 --- a/config/config.go +++ b/config/config.go @@ -1,5 +1,5 @@ /**************************************************************************** - * Copyright 2019-2020,2022-2025, Optimizely, Inc. and contributors * + * Copyright 2019-2020,2022-2023, Optimizely, Inc. and contributors * * * * Licensed under the Apache License, Version 2.0 (the "License"); * * you may not use this file except in compliance with the License. * @@ -140,21 +140,8 @@ func NewDefaultConfig() *AgentConfig { Default: "redis", }, }, - CMAB: CMABConfig{ - RequestTimeout: 10 * time.Second, - Cache: CMABCacheConfig{ - Type: "memory", - Size: 1000, - TTL: 30 * time.Minute, - }, - RetryConfig: CMABRetryConfig{ - MaxRetries: 3, - InitialBackoff: 100 * time.Millisecond, - MaxBackoff: 10 * time.Second, - BackoffMultiplier: 2.0, - }, - }, } + return &config } @@ -175,7 +162,6 @@ type AgentConfig struct { Server ServerConfig `json:"server"` Webhook WebhookConfig `json:"webhook"` Synchronization SyncConfig `json:"synchronization"` - CMAB CMABConfig `json:"cmab"` } // SyncConfig contains Synchronization configuration for the multiple Agent nodes @@ -229,7 +215,6 @@ type ClientConfig struct { SdkKeyRegex string `json:"sdkKeyRegex"` UserProfileService UserProfileServiceConfigs `json:"userProfileService"` ODP OdpConfig `json:"odp"` - CMAB CMABConfig `json:"cmab" mapstructure:"cmab"` } // OdpConfig holds the odp configuration @@ -402,37 +387,3 @@ type RuntimeConfig struct { // (For n>1 the details of sampling may change.) MutexProfileFraction int `json:"mutexProfileFraction"` } - -// CMABConfig holds configuration for the Contextual Multi-Armed Bandit functionality -type CMABConfig struct { - // RequestTimeout is the timeout for CMAB API requests - RequestTimeout time.Duration `json:"requestTimeout"` - - // Cache configuration - Cache CMABCacheConfig `json:"cache"` - - // RetryConfig for CMAB API requests - RetryConfig CMABRetryConfig `json:"retryConfig"` -} - -// CMABCacheConfig holds the CMAB cache configuration -type CMABCacheConfig struct { - // Type of cache (currently only "memory" is supported) - Type string `json:"type"` - // Size is the maximum number of entries for in-memory cache - Size int `json:"size"` - // TTL is the time-to-live for cached decisions - TTL time.Duration `json:"ttl"` -} - -// CMABRetryConfig holds the CMAB retry configuration -type CMABRetryConfig struct { - // MaxRetries is the maximum number of retry attempts - MaxRetries int `json:"maxRetries"` - // InitialBackoff is the initial backoff duration - InitialBackoff time.Duration `json:"initialBackoff"` - // MaxBackoff is the maximum backoff duration - MaxBackoff time.Duration `json:"maxBackoff"` - // BackoffMultiplier is the multiplier for exponential backoff - BackoffMultiplier float64 `json:"backoffMultiplier"` -} diff --git a/config/config_test.go b/config/config_test.go index eb0df6fb..917cd498 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -1,5 +1,5 @@ /**************************************************************************** - * Copyright 2019-2020,2022-2025, Optimizely, Inc. and contributors * + * Copyright 2019-2020,2022-2023, Optimizely, Inc. and contributors * * * * Licensed under the Apache License, Version 2.0 (the "License"); * * you may not use this file except in compliance with the License. * @@ -99,22 +99,6 @@ func TestDefaultConfig(t *testing.T) { assert.Equal(t, 0, conf.Runtime.BlockProfileRate) assert.Equal(t, 0, conf.Runtime.MutexProfileFraction) - - // CMAB configuration - assert.Equal(t, 10*time.Second, conf.CMAB.RequestTimeout) - - // Test cache settings - cache := conf.CMAB.Cache - assert.Equal(t, "memory", cache.Type) - assert.Equal(t, 1000, cache.Size) - assert.Equal(t, 30*time.Minute, cache.TTL) - - // Test retry settings - retry := conf.CMAB.RetryConfig - assert.Equal(t, 3, retry.MaxRetries) - assert.Equal(t, 100*time.Millisecond, retry.InitialBackoff) - assert.Equal(t, 10*time.Second, retry.MaxBackoff) - assert.Equal(t, 2.0, retry.BackoffMultiplier) } type logObservation struct { @@ -249,23 +233,3 @@ func TestServerConfig_GetAllowedHosts(t *testing.T) { assert.Contains(t, allowedHosts, "localhost") assert.Contains(t, allowedHosts, "special.test.host") } - -func TestDefaultCMABConfig(t *testing.T) { - conf := NewDefaultConfig() - - // Test default values - assert.Equal(t, 10*time.Second, conf.CMAB.RequestTimeout) - - // Test default cache settings - cache := conf.CMAB.Cache - assert.Equal(t, "memory", cache.Type) - assert.Equal(t, 1000, cache.Size) - assert.Equal(t, 30*time.Minute, cache.TTL) - - // Test default retry settings - retry := conf.CMAB.RetryConfig - assert.Equal(t, 3, retry.MaxRetries) - assert.Equal(t, 100*time.Millisecond, retry.InitialBackoff) - assert.Equal(t, 10*time.Second, retry.MaxBackoff) - assert.Equal(t, 2.0, retry.BackoffMultiplier) -} diff --git a/docs/redis-streams.md b/docs/redis-streams.md new file mode 100644 index 00000000..262f2519 --- /dev/null +++ b/docs/redis-streams.md @@ -0,0 +1,827 @@ +# Redis Streams for Notification Delivery (beta) + +Redis Streams provides persistent, reliable message delivery for Agent notifications with guaranteed delivery, message acknowledgment, and automatic recovery. + +## Table of Contents + +- [Overview](#overview) +- [Why Redis for Notifications?](#why-redis-for-notifications) +- [Architecture](#architecture) +- [Configuration](#configuration) +- [Redis Pub/Sub vs Redis Streams](#redis-pubsub-vs-redis-streams) +- [Testing](#testing) +- [Migration Guide](#migration-guide) +- [Troubleshooting](#troubleshooting) +- [FAQ](#faq) + +## Overview + +Redis Streams extends Redis with a log data structure that provides: + +- **Persistent storage** - Messages survive Redis restarts +- **Guaranteed delivery** - Messages are acknowledged only after successful processing +- **Consumer groups** - Load distribution across multiple Agent instances +- **Automatic recovery** - Unacknowledged messages are redelivered +- **Batching** - Efficient processing of multiple messages + +### Prerequisites + +**Redis Version:** Redis **5.0 or higher** is required for Redis Streams support. + +- Redis Streams were introduced in Redis 5.0 +- Recommended: Redis 6.0+ for improved performance and stability +- Verify your version: `redis-cli --version` + +### When to Use Redis Streams + +**Use Redis Streams when:** +- Message delivery is critical (notifications must reach clients) +- Running multiple Agent instances (high availability) +- Need to recover from Agent restarts without message loss +- Want visibility into message delivery status + +**Consider Redis Pub/Sub when:** +- Message loss is acceptable (fire-and-forget) +- Running single Agent instance +- Need absolute minimum latency (no persistence overhead) + +## Why Redis for Notifications? + +### The Load Balancer Subscription Problem + +When running multiple Agent pods behind a load balancer in Kubernetes, **you can only subscribe to ONE pod's notifications**: + +``` +Client subscribes: + /v1/notifications/event-stream → Load Balancer → Agent Pod 1 (sticky connection) + +Decision requests (load balanced): + /v1/decide → Load Balancer → Agent Pod 1 → Client receives notification + /v1/decide → Load Balancer → Agent Pod 2 → Client MISSES notification! + /v1/decide → Load Balancer → Agent Pod 3 → Client MISSES notification! +``` + +**The Problem:** + +1. **Client subscribes** to `/v1/notifications/event-stream` via load balancer +2. Load balancer routes SSE connection to **one specific Agent pod** (e.g., Pod 1) +3. Client is now subscribed **only to Pod 1's notifications** +4. Decision requests are **load-balanced** across all pods (Pod 1, 2, 3) +5. When decision happens on **Pod 2 or Pod 3**, client **never receives notification** + +**Why you can't subscribe to all pods:** +- **SSE connections are sticky** - once connected to a pod, you stay connected to that pod +- **Load balancer routes to ONE pod** - you can't subscribe to multiple pods simultaneously +- **Subscribing directly to pod IPs is not feasible** - pods are ephemeral in Kubernetes + +**Alternative considered (Push model):** +- Configure Agent pods to push notifications to an external endpoint +- Problem: This would completely change the subscribe-based SSE model +- Decision: Keep the subscribe model, use Redis as central hub instead + +### Redis Solution: Central Notification Hub + +Redis acts as a **shared notification hub** that all Agent pods write to and read from: + +``` +Decision Flow (all pods publish to Redis): + /v1/decide → Load Balancer → Agent Pod 1 → Publishes notification → Redis + /v1/decide → Load Balancer → Agent Pod 2 → Publishes notification → Redis + /v1/decide → Load Balancer → Agent Pod 3 → Publishes notification → Redis + +Subscription Flow (any pod reads from Redis): + Client → /v1/notifications/event-stream → Load Balancer → Agent Pod 1 + ↓ + Agent Pod 1 reads Redis Stream + ↓ + Gets notifications from ALL pods + ↓ + Sends to client via SSE connection +``` + +**How it works:** + +1. **All Agent pods publish to Redis:** + - Decision on Pod 1 → notification published to Redis + - Decision on Pod 2 → notification published to Redis + - Decision on Pod 3 → notification published to Redis + +2. **Client subscribes to one pod (via load balancer):** + - Client → `/v1/notifications/event-stream` → routed to Pod 1 + - Long-lived SSE connection established to Pod 1 + +3. **Pod 1 reads from Redis Stream:** + - Pod 1 subscribes to Redis (using consumer groups) + - Receives notifications from **ALL pods** (including its own) + +4. **Pod 1 forwards to client:** + - Sends all notifications to client over SSE connection + - Client receives notifications from all Agent pods, not just Pod 1 + +**Key Insight:** Client connects to ONE pod, but that pod reads from Redis which aggregates notifications from ALL pods. This solves the load balancer problem without changing the subscribe model. + +### Why Not Use Event Dispatcher? + +**Event Dispatcher** (SDK events → Optimizely servers): +- Each Agent sends events **independently** +- No coordination needed between Agents + +**Notifications** (datafile updates → SSE clients): +- Need to sync updates **across ALL Agents** +- SSE clients connected to different Agents must receive same updates +- Redis provides the broadcast mechanism + +This architecture was designed to ensure **datafile consistency across Agent clusters** in production environments. + +## Architecture + +``` +┌─────────────┐ XADD ┌──────────────┐ +│ Decide ├──────────────►│ Redis Stream │ +│ Handler │ │ (persistent) │ +└─────────────┘ └──────┬───────┘ + │ + XREADGROUP + (batch_size: 5) + │ + ▼ + ┌──-──────────────┐ + │ Consumer Group │ + │ "notifications"│ + └────────┬────────┘ + │ + ┌──────┴──────┐ + │ Batch │ + │ (5 messages)│ + └──────┬──────┘ + │ + Send to SSE Client + │ + ▼ + XACK + (acknowledge delivery) +``` + +### Message Flow + +1. **Publish** - Decide handler adds notification to stream (`XADD`) +2. **Read** - Consumer reads batch of messages (`XREADGROUP`) +3. **Process** - Messages sent to SSE client +4. **Acknowledge** - Successfully delivered messages acknowledged (`XACK`) +5. **Retry** - Unacknowledged messages automatically redelivered + +## Configuration + +> **⚠️ Prerequisites:** Requires Redis 5.0 or higher. Redis Streams are not available in Redis 4.x or earlier. + +### Quick Start Setup + +**Step 1 - Enable notifications in `config.yaml`:** + +```yaml +api: + enableNotifications: true +``` + +**Step 2 - Enable synchronization:** + +```yaml +synchronization: + notification: + enable: true + default: "redis-streams" # Switch from "redis" to "redis-streams" +``` + +**Step 3 - Configure Redis connection:** + +```yaml +synchronization: + pubsub: + redis: + host: "localhost:6379" + auth_token: "" # Recommended: use auth_token or redis_secret + # password: "" # Alternative: password (may trigger security scanners) + database: 0 +``` + +**Step 4 - (Optional) Tune performance:** + +```yaml +synchronization: + pubsub: + redis: + # Batching configuration + batch_size: 10 # Messages per batch + flush_interval: 2s # Max wait for partial batch + + # Retry configuration + max_retries: 3 + retry_delay: 100ms + max_retry_delay: 5s + connection_timeout: 10s +``` + +**Step 5 - (Optional) Increase HTTP timeouts to prevent SSE disconnects:** + +```yaml +server: + readTimeout: 300s # 5 minutes + writeTimeout: 300s # 5 minutes +``` + +**Step 6 - (Optional) Enable TLS/HTTPS:** + +```yaml +server: + keyFile: /path/to/key.pem + certFile: /path/to/cert.pem +``` + +### Full Configuration Example + +```yaml +api: + enableNotifications: true + +server: + readTimeout: 300s + writeTimeout: 300s + # Optional: Enable HTTPS + # keyFile: /path/to/key.pem + # certFile: /path/to/cert.pem + +synchronization: + pubsub: + redis: + host: "localhost:6379" + auth_token: "" # Supports: auth_token, redis_secret, password + # Fallback: REDIS_PASSWORD environment variable + database: 0 + + # Redis Streams configuration + batch_size: 5 # Messages per batch + flush_interval: 2s # Max wait before sending partial batch + max_retries: 3 # Retry attempts for failed operations + retry_delay: 100ms # Initial retry delay + max_retry_delay: 5s # Max retry delay (exponential backoff) + connection_timeout: 10s # Redis connection timeout + + notification: + enable: true + default: "redis-streams" # Use Redis Streams for notifications +``` + +### Security: Password Configuration + +To avoid security scanner alerts, use alternative field names: + +```yaml +# Preferred (no security scanner alerts) +auth_token: "your-redis-password" + +# Alternative +redis_secret: "your-redis-password" + +# Fallback to environment variable (if config field empty) +# export REDIS_PASSWORD="your-redis-password" + +# Not recommended (triggers security scanners) +password: "your-redis-password" +``` + +The Agent checks fields in this order: `auth_token` → `redis_secret` → `password` → `REDIS_PASSWORD` env var. + +### Configuration Parameters + +| Parameter | Default | Description | +|-----------|---------|-------------| +| `batch_size` | 10 | Number of messages to batch before sending | +| `flush_interval` | 5s | Maximum time to wait before sending partial batch | +| `max_retries` | 3 | Maximum retry attempts for failed operations | +| `retry_delay` | 100ms | Initial delay between retry attempts | +| `max_retry_delay` | 5s | Maximum delay with exponential backoff | +| `connection_timeout` | 10s | Timeout for Redis connections | + +### Performance Tuning + +**For low-latency (real-time notifications):** +```yaml +batch_size: 5 +flush_interval: 500ms # 0.5s max latency +``` + +**For high-throughput (batch processing):** +```yaml +batch_size: 100 +flush_interval: 5s +``` + +**For burst traffic:** +```yaml +batch_size: 50 +flush_interval: 1s +``` + +## Redis Pub/Sub vs Redis Streams + +### Comparison + +| Feature | Redis Pub/Sub | Redis Streams | +|---------|---------------|---------------| +| **Delivery guarantee** | Fire-and-forget | Guaranteed with ACK | +| **Persistence** | No (in-memory only) | Yes (survives restarts) | +| **Message recovery** | No | Yes (redelivery) | +| **Consumer groups** | No | Yes | +| **Latency** | Lowest (~1ms) | Low (~2-5ms) | +| **Memory usage** | Minimal | Higher (persistence) | +| **Complexity** | Simple | Moderate | + +### Migration Path + +**Currently using Redis Pub/Sub?** Switching to Redis Streams is a one-line config change: + +```yaml +# Before (Redis Pub/Sub) +synchronization: + notification: + default: "redis" + +# After (Redis Streams) +synchronization: + notification: + default: "redis-streams" +``` + +All Redis Streams configuration is backward compatible - existing `pubsub.redis` settings are reused. + +## Testing + +### Test 1: Batching Behavior + +Send burst traffic to trigger batching: + +```bash +# Send 20 requests instantly (in parallel) +for i in {1..20}; do + curl -H "X-Optimizely-SDK-Key: YOUR_SDK_KEY" \ + -H "Content-Type: application/json" \ + -d "{\"userId\":\"burst-$i\"}" \ + "localhost:8080/v1/decide" & +done +wait +``` + +**Verify batching in Redis Monitor:** + +```bash +redis-cli monitor | grep -E "xack|xreadgroup" +``` + +**Expected patterns:** + +Multiple XACKs with same timestamp prefix (batch of 5): +``` +"xack" ... "1759461708595-1" +"xack" ... "1759461708595-2" +"xack" ... "1759461708595-3" +"xack" ... "1759461708595-4" +"xack" ... "1759461708595-5" +``` + +### Test 2: Flush Interval + +Send messages slower than batch size: + +```bash +# Send 3 messages (less than batch_size) +for i in {1..3}; do + curl -H "X-Optimizely-SDK-Key: YOUR_SDK_KEY" \ + -H "Content-Type: application/json" \ + -d "{\"userId\":\"flush-test-$i\"}" \ + "localhost:8080/v1/decide" +done +``` + +**Expected:** Messages delivered after `flush_interval` (e.g., 2s) even though batch isn't full. + +### Test 3: Message Recovery + +Test that messages survive Agent restarts: + +**Step 1 - Send messages:** +```bash +for i in {1..5}; do + curl -H "X-Optimizely-SDK-Key: YOUR_SDK_KEY" \ + -H "Content-Type: application/json" \ + -d "{\"userId\":\"recovery-test-$i\"}" \ + "localhost:8080/v1/decide" +done +``` + +**Step 2 - Kill Agent:** +```bash +# Stop the agent process +pkill -f optimizely +``` + +**Step 3 - Verify messages in Redis:** +```bash +redis-cli +> XLEN stream:optimizely-sync-YOUR_SDK_KEY +(integer) 20 # 5 users × 4 flags + +> XRANGE stream:optimizely-sync-YOUR_SDK_KEY - + COUNT 5 +# Shows pending messages +``` + +**Step 4 - Restart Agent:** +```bash +./bin/optimizely +``` + +**Expected:** All messages automatically redelivered to SSE clients. + +### Redis CLI Inspection Commands + +```bash +# List all streams +KEYS stream:* + +# Check stream length +XLEN stream:optimizely-sync-{SDK_KEY} + +# View messages in stream +XRANGE stream:optimizely-sync-{SDK_KEY} - + COUNT 10 + +# View consumer group info +XINFO GROUPS stream:optimizely-sync-{SDK_KEY} + +# View pending messages (unacknowledged) +XPENDING stream:optimizely-sync-{SDK_KEY} notifications + +# View consumer info +XINFO CONSUMERS stream:optimizely-sync-{SDK_KEY} notifications + +# Clear stream (for testing) +DEL stream:optimizely-sync-{SDK_KEY} +``` + +## Migration Guide + +### From Redis Pub/Sub to Redis Streams + +**1. Update configuration:** + +```yaml +synchronization: + notification: + enable: true + default: "redis-streams" # Change from "redis" to "redis-streams" +``` + +**2. (Optional) Add performance tuning:** + +```yaml +synchronization: + pubsub: + redis: + batch_size: 10 + flush_interval: 2s +``` + +**3. Restart Agent** + +**4. Verify operation:** + +```bash +# Check streams are created +redis-cli KEYS "stream:*" + +# Monitor activity +redis-cli monitor | grep -E "xadd|xreadgroup|xack" +``` + +**5. Clean up old pub/sub channels (optional):** + +```bash +# List old channels +redis-cli PUBSUB CHANNELS "optimizely-sync-*" + +# They will expire naturally when no longer used +``` + +### Rollback Plan + +If you need to rollback to Redis Pub/Sub: + +```yaml +synchronization: + notification: + default: "redis" # Rollback to pub/sub +``` + +Restart Agent. No data migration needed. + +## Troubleshooting + +### Messages Not Delivered + +**Check 1 - Verify stream exists:** +```bash +redis-cli KEYS "stream:optimizely-sync-*" +``` + +**Check 2 - Check consumer group:** +```bash +redis-cli XINFO GROUPS stream:optimizely-sync-{SDK_KEY} +``` + +Expected output: +``` +1) "name" +2) "notifications" +3) "consumers" +4) (integer) 1 +5) "pending" +6) (integer) 0 +``` + +**Check 3 - Check for pending messages:** +```bash +redis-cli XPENDING stream:optimizely-sync-{SDK_KEY} notifications +``` + +If `pending > 0`, messages are stuck. Agent may have crashed before ACK. + +**Solution:** Restart Agent to reprocess pending messages. + +### High Memory Usage + +**Cause:** Streams not being trimmed. + +**Check stream length:** +```bash +redis-cli XLEN stream:optimizely-sync-{SDK_KEY} +``` + +**Solution 1 - Configure max length (future enhancement):** +```yaml +# Not currently implemented +max_len: 1000 # Keep only last 1000 messages +``` + +**Solution 2 - Manual cleanup:** +```bash +# Keep only last 100 messages +redis-cli XTRIM stream:optimizely-sync-{SDK_KEY} MAXLEN ~ 100 +``` + +### Connection Errors + +**Error:** `connection refused` or `timeout` + +**Check Redis availability:** +```bash +redis-cli ping +``` + +**Verify configuration:** +```yaml +synchronization: + pubsub: + redis: + host: "localhost:6379" # Correct host? + connection_timeout: 10s # Increase if needed +``` + +**Check Agent logs:** +```bash +# Look for connection errors +grep -i "redis" agent.log +``` + +### Performance Issues + +**Symptom:** High latency for notifications + +**Solution 1 - Reduce batch size:** +```yaml +batch_size: 5 # Smaller batches +flush_interval: 500ms # Faster flush +``` + +**Solution 2 - Check Redis performance:** +```bash +redis-cli --latency +redis-cli --stat +``` + +**Solution 3 - Monitor batch metrics:** +```bash +curl http://localhost:8088/metrics | grep redis_streams +``` + +## Advanced Topics + +### Consumer Groups & Load Balancing + +Redis Streams uses consumer groups to distribute messages across multiple Agent instances: + +- **Stream name:** `stream:optimizely-sync-{SDK_KEY}` +- **Consumer group:** `notifications` (default) +- **Consumer name:** `consumer-{timestamp}` (unique per Agent instance) + +**How it works:** + +``` +Stream → Consumer Group "notifications" → Agent 1 (consumer-123) reads msg 1, 2, 3 + → Agent 2 (consumer-456) reads msg 4, 5, 6 + → Agent 3 (consumer-789) reads msg 7, 8, 9 +``` + +Multiple Agents reading from same stream will **load-balance messages automatically**. + +### Multiple SDK Keys Support + +Subscribe to notifications for **multiple SDK keys** using wildcards: + +**Single SDK key:** +```bash +curl -N 'http://localhost:8080/v1/notifications/event-stream' \ + -H 'X-Optimizely-SDK-Key: ABC123' +``` + +**All SDK keys (Redis pattern subscribe):** +```bash +# Agent publishes to: stream:optimizely-sync-{sdk_key} +# Subscribe with pattern: stream:optimizely-sync-* + +redis-cli PSUBSCRIBE "stream:optimizely-sync-*" +``` + +### Message Claiming & Fault Tolerance + +If an Agent crashes before acknowledging a message, **another Agent can claim it**: + +**Step 1 - Agent 1 reads message:** +```bash +XREADGROUP GROUP notifications consumer1 STREAMS stream:name ">" +``` + +**Step 2 - Agent 1 crashes (message pending, not acknowledged)** + +**Step 3 - Check pending messages:** +```bash +XPENDING stream:name notifications +# Shows message owned by consumer1 (dead) +``` + +**Step 4 - Agent 2 claims abandoned message:** +```bash +XCLAIM stream:name notifications consumer2 60000 +# Claims messages pending > 60 seconds +``` + +**Step 5 - Agent 2 processes and acknowledges:** +```bash +XACK stream:name notifications +``` + +**Benefits:** +- **Load balancing:** Multiple workers process different messages +- **Fault tolerance:** Dead workers' messages claimed by others +- **Exactly-once delivery:** Messages stay pending until acknowledged + +### Message Format + +Messages stored in streams contain: + +```json +{ + "data": "{\"type\":\"decision\",\"message\":{...}}", + "timestamp": 1759461274 +} +``` + +- `data`: JSON-encoded notification payload +- `timestamp`: Unix timestamp of message creation + +### Retry Logic + +Failed operations use exponential backoff: + +1. Initial delay: `retry_delay` (default: 100ms) +2. Each retry: delay × 2 +3. Max delay: `max_retry_delay` (default: 5s) +4. Max retries: `max_retries` (default: 3) + +**Retryable errors:** +- Connection errors (refused, reset, timeout) +- Redis LOADING, READONLY, CLUSTERDOWN states + +**Non-retryable errors:** +- Authentication errors +- Invalid commands +- Memory limit exceeded + +## FAQ + +### Does Agent support TLS/HTTPS? + +Yes, TLS is configurable in `config.yaml`: + +```yaml +server: + keyFile: /path/to/key.pem # TLS private key + certFile: /path/to/cert.pem # TLS certificate +``` + +Uncomment and set paths to enable HTTPS for the Agent server. + +### Can I subscribe to multiple SDK keys? + +Yes, use Redis pattern subscribe: + +```bash +# Subscribe to all SDK keys +redis-cli PSUBSCRIBE "stream:optimizely-sync-*" +``` + +Agent publishes to channels: `stream:optimizely-sync-{sdk_key}` + +### Are large messages a problem? + +**Redis Streams:** Can handle up to **512MB** messages (Redis max string size) + +**SQS comparison:** Only **256KB** limit + +**Considerations:** +- Redis memory usage increases with message size +- Network bandwidth for large payloads +- Serialization/deserialization overhead + +For production, keep notifications < 1MB for optimal performance. + +### How do I avoid "password" security scanner alerts? + +Use alternative field names in `config.yaml`: + +```yaml +auth_token: "your-redis-password" # Preferred +# or +redis_secret: "your-redis-password" +# or +# export REDIS_PASSWORD="your-redis-password" # Environment variable +``` + +Avoid using `password:` field name which triggers security scanners. + +### Why use Redis instead of direct event dispatching? + +**Event dispatching** (SDK → Optimizely): +- Each Agent sends events independently ✓ + +**Redis notifications** (Agent ↔ Agent): +- Syncs datafile updates across **all Agent instances** +- Solves the load balancer problem (webhook → random Agent) +- Ensures all Agents serve consistent data + +See [Why Redis for Notifications?](#why-redis-for-notifications) for details. + +### Can multiple consumers read the same message? + +**Consumer groups:** No - messages distributed across consumers (load balancing) + +``` +Msg 1 → Consumer A +Msg 2 → Consumer B (different message) +Msg 3 → Consumer A +``` + +**Multiple consumer groups:** Yes - different groups get same messages + +``` +Group "notifications" → Consumer A gets Msg 1 +Group "analytics" → Consumer X gets Msg 1 (same message) +``` + +### What happens if a consumer crashes? + +Messages become **pending** (unacknowledged). Another consumer can **claim** them: + +```bash +# Check pending messages +XPENDING stream:name notifications + +# Claim abandoned messages (60s timeout) +XCLAIM stream:name notifications consumer2 60000 + +# Process and acknowledge +XACK stream:name notifications +``` + +This ensures **no message loss** even when Agents crash. + +## See Also + +- [Redis Streams Documentation](https://redis.io/docs/latest/develop/data-types/streams/) diff --git a/pkg/handlers/decide.go b/pkg/handlers/decide.go index 0dae77eb..9607f6e5 100644 --- a/pkg/handlers/decide.go +++ b/pkg/handlers/decide.go @@ -108,50 +108,31 @@ func Decide(w http.ResponseWriter, r *http.Request) { featureMap = cfg.FeaturesMap } + var decides map[string]client.OptimizelyDecision switch len(keys) { case 0: // Decide All - decides := optimizelyUserContext.DecideAll(decideOptions) - decideOuts := []DecideOut{} - for _, d := range decides { - decideOut := DecideOut{ - OptimizelyDecision: d, - Variables: d.Variables.ToMap(), - IsEveryoneElseVariation: isEveryoneElseVariation(featureMap[d.FlagKey].DeliveryRules, d.RuleKey), - } - decideOuts = append(decideOuts, decideOut) - logger.Debug().Msgf("Feature %q is enabled for user %s? %t", d.FlagKey, d.UserContext.UserID, d.Enabled) - } - render.JSON(w, r, decideOuts) - return + decides = optimizelyUserContext.DecideAll(decideOptions) case 1: - // Decide single key + // Decide key := keys[0] logger.Debug().Str("featureKey", key).Msg("fetching feature decision") d := optimizelyUserContext.Decide(key, decideOptions) - decideOut := DecideOut{ - OptimizelyDecision: d, - Variables: d.Variables.ToMap(), - IsEveryoneElseVariation: isEveryoneElseVariation(featureMap[d.FlagKey].DeliveryRules, d.RuleKey), - } + decideOut := DecideOut{d, d.Variables.ToMap(), isEveryoneElseVariation(featureMap[d.FlagKey].DeliveryRules, d.RuleKey)} render.JSON(w, r, decideOut) return default: - // Decide for multiple keys - decides := optimizelyUserContext.DecideForKeys(keys, decideOptions) - decideOuts := []DecideOut{} - for _, d := range decides { - decideOut := DecideOut{ - OptimizelyDecision: d, - Variables: d.Variables.ToMap(), - IsEveryoneElseVariation: isEveryoneElseVariation(featureMap[d.FlagKey].DeliveryRules, d.RuleKey), - } - decideOuts = append(decideOuts, decideOut) - logger.Debug().Msgf("Feature %q is enabled for user %s? %t", d.FlagKey, d.UserContext.UserID, d.Enabled) - } - render.JSON(w, r, decideOuts) - return + // Decide for Keys + decides = optimizelyUserContext.DecideForKeys(keys, decideOptions) + } + + decideOuts := []DecideOut{} + for _, d := range decides { + decideOut := DecideOut{d, d.Variables.ToMap(), isEveryoneElseVariation(featureMap[d.FlagKey].DeliveryRules, d.RuleKey)} + decideOuts = append(decideOuts, decideOut) + logger.Debug().Msgf("Feature %q is enabled for user %s? %t", d.FlagKey, d.UserContext.UserID, d.Enabled) } + render.JSON(w, r, decideOuts) } func getUserContextWithOptions(r *http.Request) (DecideBody, error) { diff --git a/pkg/handlers/decide_test.go b/pkg/handlers/decide_test.go index 6b075bf9..7e47e791 100644 --- a/pkg/handlers/decide_test.go +++ b/pkg/handlers/decide_test.go @@ -580,9 +580,6 @@ func (suite *DecideTestSuite) TestDecideMultipleFlags() { var actual []DecideOut err := json.Unmarshal(rec.Body.Bytes(), &actual) - fmt.Printf("Response Body: %s\n", rec.Body.String()) - fmt.Printf("Unmarshalled actual: %+v\n", actual) - suite.NoError(err) suite.ElementsMatch(expected, actual) } diff --git a/pkg/handlers/reset.go b/pkg/handlers/reset.go deleted file mode 100644 index 5f7420d8..00000000 --- a/pkg/handlers/reset.go +++ /dev/null @@ -1,61 +0,0 @@ -/**************************************************************************** - * Copyright 2025, Optimizely, Inc. and contributors * - * * - * Licensed under the Apache License, Version 2.0 (the "License"); * - * you may not use this file except in compliance with the License. * - * You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, software * - * distributed under the License is distributed on an "AS IS" BASIS, * - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * - * See the License for the specific language governing permissions and * - * limitations under the License. * - ***************************************************************************/ - -// Package handlers // -package handlers - -import ( - "errors" - "net/http" - - "github.com/go-chi/render" - - "github.com/optimizely/agent/pkg/middleware" -) - -// ResetClient handles the /v1/reset endpoint from FSC tests -// This clears the client cache to ensure clean state between test scenarios, -// particularly important for CMAB cache testing -func ResetClient(w http.ResponseWriter, r *http.Request) { - // Get SDK key from header - sdkKey := r.Header.Get("X-Optimizely-SDK-Key") - if sdkKey == "" { - RenderError(errors.New("SDK key required for reset"), http.StatusBadRequest, w, r) - return - } - - // Get the cache from context - cache, err := middleware.GetOptlyCache(r) - if err != nil { - RenderError(errors.New("cache not available"), http.StatusInternalServerError, w, r) - return - } - - // Get logger for debugging - logger := middleware.GetLogger(r) - logger.Debug().Str("sdkKey", sdkKey).Msg("Resetting client for FSC test") - - // Reset the client using the cache interface - if optlyCache, ok := cache.(interface{ ResetClient(string) }); ok { - optlyCache.ResetClient(sdkKey) - } else { - RenderError(errors.New("cache reset not supported"), http.StatusInternalServerError, w, r) - return - } - - // Return success - render.JSON(w, r, map[string]interface{}{"result": true}) -} diff --git a/pkg/handlers/reset_test.go b/pkg/handlers/reset_test.go deleted file mode 100644 index 479403d6..00000000 --- a/pkg/handlers/reset_test.go +++ /dev/null @@ -1,140 +0,0 @@ -/**************************************************************************** - * Copyright 2025, Optimizely, Inc. and contributors * - * * - * Licensed under the Apache License, Version 2.0 (the "License"); * - * you may not use this file except in compliance with the License. * - * You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, software * - * distributed under the License is distributed on an "AS IS" BASIS, * - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * - * See the License for the specific language governing permissions and * - * limitations under the License. * - ***************************************************************************/ - -// Package handlers // -package handlers - -import ( - "context" - "net/http" - "net/http/httptest" - "testing" - - "github.com/optimizely/agent/pkg/middleware" - "github.com/optimizely/agent/pkg/optimizely" - "github.com/optimizely/agent/pkg/optimizely/optimizelytest" - - "github.com/go-chi/chi/v5" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/suite" -) - -type MockCache struct { - mock.Mock -} - -func (m *MockCache) GetClient(key string) (*optimizely.OptlyClient, error) { - args := m.Called(key) - return args.Get(0).(*optimizely.OptlyClient), args.Error(1) -} - -func (m *MockCache) UpdateConfigs(_ string) { -} - -func (m *MockCache) SetUserProfileService(sdkKey, userProfileService string) { - m.Called(sdkKey, userProfileService) -} - -func (m *MockCache) SetODPCache(sdkKey, odpCache string) { - m.Called(sdkKey, odpCache) -} - -func (m *MockCache) ResetClient(sdkKey string) { - m.Called(sdkKey) -} - -type ResetTestSuite struct { - suite.Suite - oc *optimizely.OptlyClient - tc *optimizelytest.TestClient - mux *chi.Mux - cache *MockCache -} - -func (suite *ResetTestSuite) ClientCtx(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - ctx := context.WithValue(r.Context(), middleware.OptlyClientKey, suite.oc) - ctx = context.WithValue(ctx, middleware.OptlyCacheKey, suite.cache) - next.ServeHTTP(w, r.WithContext(ctx)) - }) -} - -func (suite *ResetTestSuite) SetupTest() { - testClient := optimizelytest.NewClient() - suite.tc = testClient - suite.oc = &optimizely.OptlyClient{OptimizelyClient: testClient.OptimizelyClient} - - mockCache := new(MockCache) - mockCache.On("ResetClient", "test-sdk-key").Return() - suite.cache = mockCache - - mux := chi.NewMux() - mux.Use(suite.ClientCtx) - mux.Post("/reset", ResetClient) - suite.mux = mux -} - -func (suite *ResetTestSuite) TestResetClient() { - req := httptest.NewRequest("POST", "/reset", nil) - req.Header.Set("X-Optimizely-SDK-Key", "test-sdk-key") - recorder := httptest.NewRecorder() - - suite.mux.ServeHTTP(recorder, req) - - suite.Equal(http.StatusOK, recorder.Code) - suite.Contains(recorder.Header().Get("content-type"), "application/json") - suite.Contains(recorder.Body.String(), `"result":true`) - - // Verify ResetClient was called with correct SDK key - suite.cache.AssertCalled(suite.T(), "ResetClient", "test-sdk-key") -} - -func (suite *ResetTestSuite) TestResetClientMissingSDKKey() { - req := httptest.NewRequest("POST", "/reset", nil) - recorder := httptest.NewRecorder() - - suite.mux.ServeHTTP(recorder, req) - - suite.Equal(http.StatusBadRequest, recorder.Code) - suite.Contains(recorder.Body.String(), "SDK key required for reset") -} - -func (suite *ResetTestSuite) TestResetClientCacheNotAvailable() { - // Create a context without cache - req := httptest.NewRequest("POST", "/reset", nil) - req.Header.Set("X-Optimizely-SDK-Key", "test-sdk-key") - recorder := httptest.NewRecorder() - - // Use middleware that doesn't include cache - mux := chi.NewMux() - mux.Use(func(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - ctx := context.WithValue(r.Context(), middleware.OptlyClientKey, suite.oc) - // Note: no cache in context - next.ServeHTTP(w, r.WithContext(ctx)) - }) - }) - mux.Post("/reset", ResetClient) - - mux.ServeHTTP(recorder, req) - - suite.Equal(http.StatusInternalServerError, recorder.Code) - suite.Contains(recorder.Body.String(), "cache not available") -} - -func TestResetTestSuite(t *testing.T) { - suite.Run(t, new(ResetTestSuite)) -} diff --git a/pkg/middleware/cached.go b/pkg/middleware/cached.go index feebcd19..ca50621d 100644 --- a/pkg/middleware/cached.go +++ b/pkg/middleware/cached.go @@ -41,9 +41,6 @@ const OptlyFeatureKey = contextKey("featureKey") // OptlyExperimentKey is the context key used by ExperimentCtx for setting an Experiment const OptlyExperimentKey = contextKey("experimentKey") -// OptlyCacheKey is the context key for the OptlyCache -const OptlyCacheKey = contextKey("optlyCache") - // OptlySDKHeader is the header key for an ad-hoc SDK key const OptlySDKHeader = "X-Optimizely-SDK-Key" @@ -101,7 +98,6 @@ func (mw *CachedOptlyMiddleware) ClientCtx(next http.Handler) http.Handler { } ctx := context.WithValue(r.Context(), OptlyClientKey, optlyClient) - ctx = context.WithValue(ctx, OptlyCacheKey, mw.Cache) next.ServeHTTP(w, r.WithContext(ctx)) }) } diff --git a/pkg/middleware/utils.go b/pkg/middleware/utils.go index 71e55bf3..4c5ad136 100644 --- a/pkg/middleware/utils.go +++ b/pkg/middleware/utils.go @@ -47,16 +47,6 @@ func GetOptlyClient(r *http.Request) (*optimizely.OptlyClient, error) { return optlyClient, nil } -// GetOptlyCache is a utility to extract the OptlyCache from the http request context. -func GetOptlyCache(r *http.Request) (optimizely.Cache, error) { - cache, ok := r.Context().Value(OptlyCacheKey).(optimizely.Cache) - if !ok || cache == nil { - return nil, fmt.Errorf("optlyCache not available") - } - - return cache, nil -} - // GetLogger gets the logger with some info coming from http request func GetLogger(r *http.Request) *zerolog.Logger { reqID := r.Header.Get(OptlyRequestHeader) diff --git a/pkg/optimizely/cache.go b/pkg/optimizely/cache.go index 31804375..56f9493f 100644 --- a/pkg/optimizely/cache.go +++ b/pkg/optimizely/cache.go @@ -21,7 +21,6 @@ import ( "context" "encoding/json" "errors" - "os" "regexp" "strings" "sync" @@ -36,7 +35,6 @@ import ( "github.com/optimizely/agent/plugins/userprofileservice" odpCachePkg "github.com/optimizely/go-sdk/v2/pkg/cache" "github.com/optimizely/go-sdk/v2/pkg/client" - "github.com/optimizely/go-sdk/v2/pkg/cmab" sdkconfig "github.com/optimizely/go-sdk/v2/pkg/config" "github.com/optimizely/go-sdk/v2/pkg/decision" "github.com/optimizely/go-sdk/v2/pkg/event" @@ -314,58 +312,6 @@ func defaultLoader( ) clientOptions = append(clientOptions, client.WithOdpManager(odpManager)) - // Configure CMAB prediction endpoint from environment variable - // This allows FSC tests to override the endpoint by setting OPTIMIZELY_CMAB_PREDICTIONENDPOINT - if cmabEndpoint := os.Getenv("OPTIMIZELY_CMAB_PREDICTIONENDPOINT"); cmabEndpoint != "" { - // Set the global variable that go-sdk uses (FSC already includes the /%s format) - cmab.CMABPredictionEndpoint = cmabEndpoint - log.Info().Str("endpoint", cmabEndpoint).Msg("Using custom CMAB prediction endpoint") - } - - // Parse CMAB cache configuration - cacheSize := clientConf.CMAB.Cache.Size - if cacheSize == 0 { - cacheSize = cmab.DefaultCacheSize - } - - cacheTTL := clientConf.CMAB.Cache.TTL - if cacheTTL == 0 { - cacheTTL = cmab.DefaultCacheTTL - } - - // Create retry config - retryConfig := &cmab.RetryConfig{ - MaxRetries: clientConf.CMAB.RetryConfig.MaxRetries, - InitialBackoff: clientConf.CMAB.RetryConfig.InitialBackoff, - MaxBackoff: clientConf.CMAB.RetryConfig.MaxBackoff, - BackoffMultiplier: clientConf.CMAB.RetryConfig.BackoffMultiplier, - } - - // Apply defaults for retry config if not set - if retryConfig.MaxRetries == 0 { - retryConfig.MaxRetries = cmab.DefaultMaxRetries - } - if retryConfig.InitialBackoff == 0 { - retryConfig.InitialBackoff = cmab.DefaultInitialBackoff - } - if retryConfig.MaxBackoff == 0 { - retryConfig.MaxBackoff = cmab.DefaultMaxBackoff - } - if retryConfig.BackoffMultiplier == 0 { - retryConfig.BackoffMultiplier = cmab.DefaultBackoffMultiplier - } - - // Create CMAB config (NO endpoint configuration - not configurable) - cmabConfig := cmab.Config{ - CacheSize: cacheSize, - CacheTTL: cacheTTL, - HTTPTimeout: clientConf.CMAB.RequestTimeout, - RetryConfig: retryConfig, - } - - // Add to client options - clientOptions = append(clientOptions, client.WithCmabConfig(&cmabConfig)) - optimizelyClient, err := optimizelyFactory.Client( clientOptions..., ) @@ -422,24 +368,3 @@ func getServiceWithType(serviceType, sdkKey string, serviceMap cmap.ConcurrentMa } return nil } - -// ResetClient removes the optimizely client from cache to ensure clean state for testing -// This is primarily used by FSC tests to clear CMAB cache between test scenarios -func (c *OptlyCache) ResetClient(sdkKey string) { - // Remove the client from the cache - if val, exists := c.optlyMap.Get(sdkKey); exists { - c.optlyMap.Remove(sdkKey) - - // Close the client to clean up resources - if client, ok := val.(*OptlyClient); ok { - client.Close() - } - - message := "Reset Optimizely client for testing" - if ShouldIncludeSDKKey { - log.Info().Str("sdkKey", sdkKey).Msg(message) - } else { - log.Info().Msg(message) - } - } -} diff --git a/pkg/optimizely/cache_test.go b/pkg/optimizely/cache_test.go index 23292b4d..ee457e64 100644 --- a/pkg/optimizely/cache_test.go +++ b/pkg/optimizely/cache_test.go @@ -310,33 +310,6 @@ func (suite *CacheTestSuite) TestNilCreatorAddedforODPCache() { suite.Nil(odpCache) } -func (suite *CacheTestSuite) TestResetClient() { - // First, get a client to put it in the cache - client, err := suite.cache.GetClient("test-sdk-key") - suite.NoError(err) - suite.NotNil(client) - - // Verify client is in cache - cachedClient, exists := suite.cache.optlyMap.Get("test-sdk-key") - suite.True(exists) - suite.NotNil(cachedClient) - - // Reset the client - suite.cache.ResetClient("test-sdk-key") - - // Verify client is removed from cache - _, exists = suite.cache.optlyMap.Get("test-sdk-key") - suite.False(exists) -} - -func (suite *CacheTestSuite) TestResetClientNonExistent() { - // Reset a client that doesn't exist - should not panic - suite.cache.ResetClient("non-existent-key") - - // Verify no clients are in cache - suite.Equal(0, suite.cache.optlyMap.Count()) -} - // In order for 'go test' to run this suite, we need to create // a normal test function and pass our suite to suite.Run func TestCacheTestSuite(t *testing.T) { @@ -822,291 +795,6 @@ func (s *DefaultLoaderTestSuite) TestDefaultRegexValidator() { } } -func (s *DefaultLoaderTestSuite) TestCMABConfigurationParsing() { - conf := config.ClientConfig{ - SdkKeyRegex: "sdkkey", - CMAB: config.CMABConfig{ - RequestTimeout: 5 * time.Second, - Cache: config.CMABCacheConfig{ - Type: "memory", - Size: 500, - TTL: 15 * time.Minute, - }, - RetryConfig: config.CMABRetryConfig{ - MaxRetries: 5, - InitialBackoff: 200 * time.Millisecond, - MaxBackoff: 30 * time.Second, - BackoffMultiplier: 1.5, - }, - }, - } - - loader := defaultLoader(config.AgentConfig{Client: conf}, s.registry, nil, s.upsMap, s.odpCacheMap, s.pcFactory, s.bpFactory) - client, err := loader("sdkkey") - - s.NoError(err) - s.NotNil(client) - // Note: We can't directly test the CMAB service since it's internal to the OptimizelyClient - // But we can verify the loader doesn't error with valid CMAB config -} - -func (s *DefaultLoaderTestSuite) TestCMABConfigurationDefaults() { - conf := config.ClientConfig{ - SdkKeyRegex: "sdkkey", - CMAB: config.CMABConfig{ - RequestTimeout: 5 * time.Second, - // Empty cache and retry config should use defaults - Cache: config.CMABCacheConfig{}, - RetryConfig: config.CMABRetryConfig{}, - }, - } - - loader := defaultLoader(config.AgentConfig{Client: conf}, s.registry, nil, s.upsMap, s.odpCacheMap, s.pcFactory, s.bpFactory) - client, err := loader("sdkkey") - - s.NoError(err) - s.NotNil(client) -} - -func (s *DefaultLoaderTestSuite) TestCMABCacheConfigInvalidTTL() { - conf := config.ClientConfig{ - SdkKeyRegex: "sdkkey", - CMAB: config.CMABConfig{ - RequestTimeout: 5 * time.Second, - // Test with valid values since structured types prevent invalid input - Cache: config.CMABCacheConfig{ - Size: 1000, - TTL: 10 * time.Minute, - }, - RetryConfig: config.CMABRetryConfig{}, - }, - } - - loader := defaultLoader(config.AgentConfig{Client: conf}, s.registry, nil, s.upsMap, s.odpCacheMap, s.pcFactory, s.bpFactory) - client, err := loader("sdkkey") - - s.NoError(err) // Should not error, just use defaults - s.NotNil(client) -} - -func (s *DefaultLoaderTestSuite) TestCMABCacheConfigWithValidStructuredTypes() { - conf := config.ClientConfig{ - SdkKeyRegex: "sdkkey", - CMAB: config.CMABConfig{ - RequestTimeout: 5 * time.Second, - Cache: config.CMABCacheConfig{ - Type: "memory", - Size: 1000, - TTL: 15 * time.Minute, - }, - RetryConfig: config.CMABRetryConfig{ - MaxRetries: 3, - InitialBackoff: 100 * time.Millisecond, - MaxBackoff: 10 * time.Second, - BackoffMultiplier: 2.0, - }, - }, - } - - loader := defaultLoader(config.AgentConfig{Client: conf}, s.registry, nil, s.upsMap, s.odpCacheMap, s.pcFactory, s.bpFactory) - client, err := loader("sdkkey") - - s.NoError(err) - s.NotNil(client) -} - -func (s *DefaultLoaderTestSuite) TestCMABRetryConfigWithValidDurations() { - conf := config.ClientConfig{ - SdkKeyRegex: "sdkkey", - CMAB: config.CMABConfig{ - RequestTimeout: 5 * time.Second, - Cache: config.CMABCacheConfig{}, - RetryConfig: config.CMABRetryConfig{ - MaxRetries: 3, - InitialBackoff: 200 * time.Millisecond, - MaxBackoff: 30 * time.Second, - BackoffMultiplier: 2.0, - }, - }, - } - - loader := defaultLoader(config.AgentConfig{Client: conf}, s.registry, nil, s.upsMap, s.odpCacheMap, s.pcFactory, s.bpFactory) - client, err := loader("sdkkey") - - s.NoError(err) - s.NotNil(client) -} - -func (s *DefaultLoaderTestSuite) TestCMABConfigurationAllValidValues() { - conf := config.ClientConfig{ - SdkKeyRegex: "sdkkey", - CMAB: config.CMABConfig{ - RequestTimeout: 10 * time.Second, - Cache: config.CMABCacheConfig{ - Type: "memory", - Size: 2000, - TTL: 45 * time.Minute, - }, - RetryConfig: config.CMABRetryConfig{ - MaxRetries: 10, - InitialBackoff: 500 * time.Millisecond, - MaxBackoff: 1 * time.Minute, - BackoffMultiplier: 3.0, - }, - }, - } - - loader := defaultLoader(config.AgentConfig{Client: conf}, s.registry, nil, s.upsMap, s.odpCacheMap, s.pcFactory, s.bpFactory) - client, err := loader("sdkkey") - - s.NoError(err) - s.NotNil(client) -} - -func (s *DefaultLoaderTestSuite) TestCMABWithZeroRequestTimeout() { - conf := config.ClientConfig{ - SdkKeyRegex: "sdkkey", - CMAB: config.CMABConfig{ - RequestTimeout: 0, // Zero timeout - Cache: config.CMABCacheConfig{}, - RetryConfig: config.CMABRetryConfig{}, - }, - } - - loader := defaultLoader(config.AgentConfig{Client: conf}, s.registry, nil, s.upsMap, s.odpCacheMap, s.pcFactory, s.bpFactory) - client, err := loader("sdkkey") - - s.NoError(err) - s.NotNil(client) -} - -func (s *DefaultLoaderTestSuite) TestCMABConfigurationEdgeCases() { - testCases := []struct { - name string - config config.CMABConfig - }{ - { - name: "Zero cache size", - config: config.CMABConfig{ - RequestTimeout: 5 * time.Second, - Cache: config.CMABCacheConfig{ - Size: 0, - TTL: 30 * time.Minute, - }, - RetryConfig: config.CMABRetryConfig{}, - }, - }, - { - name: "Zero max retries", - config: config.CMABConfig{ - RequestTimeout: 5 * time.Second, - Cache: config.CMABCacheConfig{}, - RetryConfig: config.CMABRetryConfig{ - MaxRetries: 0, - }, - }, - }, - { - name: "Very short TTL", - config: config.CMABConfig{ - RequestTimeout: 5 * time.Second, - Cache: config.CMABCacheConfig{ - TTL: 1 * time.Millisecond, - }, - RetryConfig: config.CMABRetryConfig{}, - }, - }, - { - name: "Very long TTL", - config: config.CMABConfig{ - RequestTimeout: 5 * time.Second, - Cache: config.CMABCacheConfig{ - TTL: 24 * time.Hour, - }, - RetryConfig: config.CMABRetryConfig{}, - }, - }, - } - - for _, tc := range testCases { - s.Run(tc.name, func() { - conf := config.ClientConfig{ - SdkKeyRegex: "sdkkey", - CMAB: tc.config, - } - - loader := defaultLoader(config.AgentConfig{Client: conf}, s.registry, nil, s.upsMap, s.odpCacheMap, s.pcFactory, s.bpFactory) - client, err := loader("sdkkey") - - s.NoError(err, "Should not error for case: %s", tc.name) - s.NotNil(client, "Client should not be nil for case: %s", tc.name) - }) - } -} - -func (s *DefaultLoaderTestSuite) TestCMABConfigurationEmptyStructs() { - conf := config.ClientConfig{ - SdkKeyRegex: "sdkkey", - CMAB: config.CMABConfig{ - RequestTimeout: 5 * time.Second, - Cache: config.CMABCacheConfig{}, // empty struct - RetryConfig: config.CMABRetryConfig{}, // empty struct - }, - } - - loader := defaultLoader(config.AgentConfig{Client: conf}, s.registry, nil, s.upsMap, s.odpCacheMap, s.pcFactory, s.bpFactory) - client, err := loader("sdkkey") - - s.NoError(err) - s.NotNil(client) -} - -// Test that CMAB configuration doesn't interfere with existing functionality -func (s *DefaultLoaderTestSuite) TestCMABWithExistingServices() { - conf := config.ClientConfig{ - SdkKeyRegex: "sdkkey", - UserProfileService: map[string]interface{}{ - "default": "in-memory", - "services": map[string]interface{}{ - "in-memory": map[string]interface{}{ - "capacity": 100, - "storageStrategy": "fifo", - }, - }, - }, - ODP: config.OdpConfig{ - SegmentsCache: map[string]interface{}{ - "default": "in-memory", - "services": map[string]interface{}{ - "in-memory": map[string]interface{}{ - "size": 50, - "timeout": "10s", - }, - }, - }, - }, - CMAB: config.CMABConfig{ - RequestTimeout: 5 * time.Second, - Cache: config.CMABCacheConfig{ - Type: "memory", - Size: 1000, - TTL: 30 * time.Minute, - }, - RetryConfig: config.CMABRetryConfig{ - MaxRetries: 5, - }, - }, - } - - loader := defaultLoader(config.AgentConfig{Client: conf}, s.registry, nil, s.upsMap, s.odpCacheMap, s.pcFactory, s.bpFactory) - client, err := loader("sdkkey") - - s.NoError(err) - s.NotNil(client) - s.NotNil(client.UserProfileService, "UPS should still be configured") - s.NotNil(client.odpCache, "ODP Cache should still be configured") -} - func TestDefaultLoaderTestSuite(t *testing.T) { suite.Run(t, new(DefaultLoaderTestSuite)) } diff --git a/pkg/routers/api.go b/pkg/routers/api.go index 2d9e48ca..79df6103 100644 --- a/pkg/routers/api.go +++ b/pkg/routers/api.go @@ -49,7 +49,6 @@ type APIOptions struct { overrideHandler http.HandlerFunc lookupHandler http.HandlerFunc saveHandler http.HandlerFunc - resetHandler http.HandlerFunc sendOdpEventHandler http.HandlerFunc nStreamHandler http.HandlerFunc oAuthHandler http.HandlerFunc @@ -81,7 +80,6 @@ func NewDefaultAPIRouter(optlyCache optimizely.Cache, conf config.AgentConfig, m if !conf.API.EnableOverrides { overrideHandler = forbiddenHandler("Overrides not enabled") } - resetHandler := handlers.ResetClient nStreamHandler := forbiddenHandler("Notification stream not enabled") if conf.API.EnableNotifications { @@ -104,7 +102,6 @@ func NewDefaultAPIRouter(optlyCache optimizely.Cache, conf config.AgentConfig, m overrideHandler: overrideHandler, lookupHandler: handlers.Lookup, saveHandler: handlers.Save, - resetHandler: resetHandler, trackHandler: handlers.TrackEvent, sendOdpEventHandler: handlers.SendOdpEvent, sdkMiddleware: mw.ClientCtx, @@ -134,7 +131,6 @@ func WithAPIRouter(opt *APIOptions, r chi.Router) { overrideTimer := middleware.Metricize("override", opt.metricsRegistry) lookupTimer := middleware.Metricize("lookup", opt.metricsRegistry) saveTimer := middleware.Metricize("save", opt.metricsRegistry) - resetTimer := middleware.Metricize("reset", opt.metricsRegistry) trackTimer := middleware.Metricize("track-event", opt.metricsRegistry) sendOdpEventTimer := middleware.Metricize("send-odp-event", opt.metricsRegistry) createAccesstokenTimer := middleware.Metricize("create-api-access-token", opt.metricsRegistry) @@ -148,7 +144,6 @@ func WithAPIRouter(opt *APIOptions, r chi.Router) { overrideTracer := middleware.AddTracing("overrideHandler", "Override") lookupTracer := middleware.AddTracing("lookupHandler", "Lookup") saveTracer := middleware.AddTracing("saveHandler", "Save") - resetTracer := middleware.AddTracing("resetHandler", "Reset") sendOdpEventTracer := middleware.AddTracing("sendOdpEventHandler", "SendOdpEvent") nStreamTracer := middleware.AddTracing("notificationHandler", "SendNotificationEvent") authTracer := middleware.AddTracing("authHandler", "AuthToken") @@ -169,7 +164,6 @@ func WithAPIRouter(opt *APIOptions, r chi.Router) { r.With(decideTimer, opt.oAuthMiddleware, contentTypeMiddleware, decideTracer).Post("/decide", opt.decideHandler) r.With(trackTimer, opt.oAuthMiddleware, contentTypeMiddleware, trackTracer).Post("/track", opt.trackHandler) r.With(overrideTimer, opt.oAuthMiddleware, contentTypeMiddleware, overrideTracer).Post("/override", opt.overrideHandler) - r.With(resetTimer, opt.oAuthMiddleware, contentTypeMiddleware, resetTracer).Post("/reset", opt.resetHandler) r.With(lookupTimer, opt.oAuthMiddleware, contentTypeMiddleware, lookupTracer).Post("/lookup", opt.lookupHandler) r.With(saveTimer, opt.oAuthMiddleware, contentTypeMiddleware, saveTracer).Post("/save", opt.saveHandler) r.With(sendOdpEventTimer, opt.oAuthMiddleware, contentTypeMiddleware, sendOdpEventTracer).Post("/send-odp-event", opt.sendOdpEventHandler) diff --git a/pkg/syncer/pubsub.go b/pkg/syncer/pubsub.go index 6436c03e..7ff7ce3f 100644 --- a/pkg/syncer/pubsub.go +++ b/pkg/syncer/pubsub.go @@ -20,23 +20,27 @@ package syncer import ( "context" "errors" + "time" "github.com/optimizely/agent/config" "github.com/optimizely/agent/pkg/syncer/pubsub" + "github.com/optimizely/agent/pkg/utils/redisauth" ) const ( // PubSubDefaultChan will be used as default pubsub channel name PubSubDefaultChan = "optimizely-sync" - // PubSubRedis is the name of pubsub type of Redis + // PubSubRedis is the name of pubsub type of Redis (fire-and-forget) PubSubRedis = "redis" + // PubSubRedisStreams is the name of pubsub type of Redis Streams (persistent) + PubSubRedisStreams = "redis-streams" ) -type SycnFeatureFlag string +type SyncFeatureFlag string const ( - SyncFeatureFlagNotificaiton SycnFeatureFlag = "sync-feature-flag-notification" - SycnFeatureFlagDatafile SycnFeatureFlag = "sync-feature-flag-datafile" + SyncFeatureFlagNotificaiton SyncFeatureFlag = "sync-feature-flag-notification" + SyncFeatureFlagDatafile SyncFeatureFlag = "sync-feature-flag-datafile" ) type PubSub interface { @@ -44,16 +48,20 @@ type PubSub interface { Subscribe(ctx context.Context, channel string) (chan string, error) } -func newPubSub(conf config.SyncConfig, featureFlag SycnFeatureFlag) (PubSub, error) { +func newPubSub(conf config.SyncConfig, featureFlag SyncFeatureFlag) (PubSub, error) { if featureFlag == SyncFeatureFlagNotificaiton { if conf.Notification.Default == PubSubRedis { return getPubSubRedis(conf) + } else if conf.Notification.Default == PubSubRedisStreams { + return getPubSubRedisStreams(conf) } else { return nil, errors.New("pubsub type not supported") } - } else if featureFlag == SycnFeatureFlagDatafile { + } else if featureFlag == SyncFeatureFlagDatafile { if conf.Datafile.Default == PubSubRedis { return getPubSubRedis(conf) + } else if conf.Datafile.Default == PubSubRedisStreams { + return getPubSubRedisStreams(conf) } else { return nil, errors.New("pubsub type not supported") } @@ -81,27 +89,110 @@ func getPubSubRedis(conf config.SyncConfig) (PubSub, error) { return nil, errors.New("pubsub redis host not valid, host must be string") } - passwordVal, found := redisConf["password"] - if !found { - return nil, errors.New("pubsub redis password not found") - } - password, ok := passwordVal.(string) - if !ok { - return nil, errors.New("pubsub redis password not valid, password must be string") - } + // Support multiple auth field names and env var fallback for security scanning compliance + password := redisauth.GetPassword(redisConf, "REDIS_PASSWORD") databaseVal, found := redisConf["database"] if !found { return nil, errors.New("pubsub redis database not found") } - database, ok := databaseVal.(int) - if !ok { - return nil, errors.New("pubsub redis database not valid, database must be int") + // YAML/JSON unmarshals numbers as float64, convert to int + var database int + switch v := databaseVal.(type) { + case int: + database = v + case float64: + database = int(v) + default: + return nil, errors.New("pubsub redis database not valid, database must be numeric") } + // Return original Redis pub/sub implementation (fire-and-forget) return &pubsub.Redis{ Host: host, Password: password, Database: database, }, nil } + +func getPubSubRedisStreams(conf config.SyncConfig) (PubSub, error) { + pubsubConf, found := conf.Pubsub[PubSubRedis] + if !found { + return nil, errors.New("pubsub redis config not found") + } + + redisConf, ok := pubsubConf.(map[string]interface{}) + if !ok { + return nil, errors.New("pubsub redis config not valid") + } + + hostVal, found := redisConf["host"] + if !found { + return nil, errors.New("pubsub redis host not found") + } + host, ok := hostVal.(string) + if !ok { + return nil, errors.New("pubsub redis host not valid, host must be string") + } + + // Support multiple auth field names and env var fallback for security scanning compliance + password := redisauth.GetPassword(redisConf, "REDIS_PASSWORD") + + databaseVal, found := redisConf["database"] + if !found { + return nil, errors.New("pubsub redis database not found") + } + // YAML/JSON unmarshals numbers as float64, convert to int + var database int + switch v := databaseVal.(type) { + case int: + database = v + case float64: + database = int(v) + default: + return nil, errors.New("pubsub redis database not valid, database must be numeric") + } + + // Parse optional Redis Streams configuration parameters + batchSize := getIntFromConfig(redisConf, "batch_size", 10) + flushInterval := getDurationFromConfig(redisConf, "flush_interval", 5*time.Second) + maxRetries := getIntFromConfig(redisConf, "max_retries", 3) + retryDelay := getDurationFromConfig(redisConf, "retry_delay", 100*time.Millisecond) + maxRetryDelay := getDurationFromConfig(redisConf, "max_retry_delay", 5*time.Second) + connTimeout := getDurationFromConfig(redisConf, "connection_timeout", 10*time.Second) + + // Return Redis Streams implementation with configuration + return &pubsub.RedisStreams{ + Host: host, + Password: password, + Database: database, + BatchSize: batchSize, + FlushInterval: flushInterval, + MaxRetries: maxRetries, + RetryDelay: retryDelay, + MaxRetryDelay: maxRetryDelay, + ConnTimeout: connTimeout, + }, nil +} + +// getIntFromConfig safely extracts an integer value from config map with default fallback +func getIntFromConfig(config map[string]interface{}, key string, defaultValue int) int { + if val, found := config[key]; found { + if intVal, ok := val.(int); ok { + return intVal + } + } + return defaultValue +} + +// getDurationFromConfig safely extracts a duration value from config map with default fallback +func getDurationFromConfig(config map[string]interface{}, key string, defaultValue time.Duration) time.Duration { + if val, found := config[key]; found { + if strVal, ok := val.(string); ok { + if duration, err := time.ParseDuration(strVal); err == nil { + return duration + } + } + } + return defaultValue +} diff --git a/pkg/syncer/pubsub/redis_streams.go b/pkg/syncer/pubsub/redis_streams.go new file mode 100644 index 00000000..2a646244 --- /dev/null +++ b/pkg/syncer/pubsub/redis_streams.go @@ -0,0 +1,550 @@ +/**************************************************************************** + * Copyright 2025 Optimizely, Inc. and contributors * + * * + * Licensed under the Apache License, Version 2.0 (the "License"); * + * you may not use this file except in compliance with the License. * + * You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, software * + * distributed under the License is distributed on an "AS IS" BASIS, * + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * + * See the License for the specific language governing permissions and * + * limitations under the License. * + ***************************************************************************/ + +// Package pubsub provides pubsub functionality for the agent syncer +package pubsub + +import ( + "context" + "encoding/json" + "fmt" + "math" + "os" + "strings" + "time" + + "github.com/go-redis/redis/v8" + "github.com/rs/zerolog/log" + + "github.com/optimizely/agent/pkg/metrics" +) + +// RedisStreams implements persistent message delivery using Redis Streams +type RedisStreams struct { + Host string + Password string + Database int + // Stream configuration + MaxLen int64 + ConsumerGroup string + ConsumerName string + // Batching configuration + BatchSize int + FlushInterval time.Duration + // Retry configuration + MaxRetries int + RetryDelay time.Duration + MaxRetryDelay time.Duration + // Connection timeout + ConnTimeout time.Duration + // Metrics registry + metricsRegistry *metrics.Registry +} + +func (r *RedisStreams) Publish(ctx context.Context, channel string, message interface{}) error { + streamName := r.getStreamName(channel) + + // Convert message to string for consistent handling + var messageStr string + switch v := message.(type) { + case []byte: + messageStr = string(v) + case string: + messageStr = v + default: + // For other types, marshal to JSON + jsonBytes, err := json.Marshal(v) + if err != nil { + return fmt.Errorf("failed to marshal message: %w", err) + } + messageStr = string(jsonBytes) + } + + // Add message to stream with automatic ID generation + args := &redis.XAddArgs{ + Stream: streamName, + Values: map[string]interface{}{ + "data": messageStr, + "timestamp": time.Now().Unix(), + }, + } + + // Apply max length trimming if configured + if r.MaxLen > 0 { + args.MaxLen = r.MaxLen + args.Approx = true // Use approximate trimming for better performance + } + + return r.executeWithRetry(ctx, func(client *redis.Client) error { + return client.XAdd(ctx, args).Err() + }) +} + +func (r *RedisStreams) Subscribe(ctx context.Context, channel string) (chan string, error) { + streamName := r.getStreamName(channel) + consumerGroup := r.getConsumerGroup() + consumerName := r.getConsumerName() + + ch := make(chan string) + ready := make(chan error, 1) // Signal when consumer group is ready + + go func() { + defer close(ch) + + batchSize := r.getBatchSize() + flushTicker := time.NewTicker(r.getFlushInterval()) + defer flushTicker.Stop() + + var batch []string + var client *redis.Client + var lastReconnect time.Time + reconnectDelay := 1 * time.Second + maxReconnectDelay := 30 * time.Second + + // Initialize connection + client = r.createClient() + defer client.Close() + + // Create consumer group with retry + if err := r.createConsumerGroupWithRetry(ctx, client, streamName, consumerGroup); err != nil { + log.Error().Err(err).Str("stream", streamName).Str("group", consumerGroup).Msg("Failed to create consumer group") + ready <- err // Signal initialization failure + return + } + + // Signal that consumer group is ready + ready <- nil + + for { + select { + case <-ctx.Done(): + // Send any remaining batch before closing + if len(batch) > 0 { + r.sendBatch(ch, batch, ctx) + } + return + case <-flushTicker.C: + // Flush interval reached - send current batch + if len(batch) > 0 { + r.incrementCounter("batch.flush_interval") + r.sendBatch(ch, batch, ctx) + batch = nil + } + default: + // Read messages from the stream using consumer group + streams, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: consumerGroup, + Consumer: consumerName, + Streams: []string{streamName, ">"}, + Count: int64(batchSize - len(batch)), // Read up to remaining batch size + Block: 100 * time.Millisecond, // Short block to allow flush checking + }).Result() + + if err != nil { + if err == redis.Nil { + continue // No messages, continue polling + } + + // Handle connection errors with exponential backoff reconnection + if r.isConnectionError(err) { + r.incrementCounter("connection.error") + log.Warn().Err(err).Msg("Redis connection error, attempting reconnection") + + // Apply exponential backoff for reconnection + if time.Since(lastReconnect) > reconnectDelay { + r.incrementCounter("connection.reconnect_attempt") + client.Close() + client = r.createClient() + lastReconnect = time.Now() + + // Recreate consumer group after reconnection + if groupErr := r.createConsumerGroupWithRetry(ctx, client, streamName, consumerGroup); groupErr != nil { + r.incrementCounter("connection.group_recreate_error") + log.Error().Err(groupErr).Msg("Failed to recreate consumer group after reconnection") + } else { + r.incrementCounter("connection.reconnect_success") + } + + // Increase reconnect delay with exponential backoff + reconnectDelay = time.Duration(math.Min(float64(reconnectDelay*2), float64(maxReconnectDelay))) + } else { + // Wait before next retry + time.Sleep(100 * time.Millisecond) + } + } else { + // Log other errors but continue processing + r.incrementCounter("read.error") + log.Debug().Err(err).Msg("Redis streams read error") + } + continue + } + + // Reset reconnect delay on successful read + reconnectDelay = 1 * time.Second + + // Process messages from streams + messageCount := 0 + for _, stream := range streams { + for _, message := range stream.Messages { + // Extract the data field + if data, ok := message.Values["data"].(string); ok { + batch = append(batch, data) + messageCount++ + + // Acknowledge the message with retry + if ackErr := r.acknowledgeMessage(ctx, client, streamName, consumerGroup, message.ID); ackErr != nil { + log.Warn().Err(ackErr).Str("messageID", message.ID).Msg("Failed to acknowledge message") + } + + // Send batch if it's full + if len(batch) >= batchSize { + r.incrementCounter("batch.sent") + r.sendBatch(ch, batch, ctx) + batch = nil + // Continue processing more messages + } + } + } + } + + // Track successful message reads + if messageCount > 0 { + r.incrementCounter("messages.read") + } + } + } + }() + + // Wait for consumer group initialization before returning + select { + case err := <-ready: + if err != nil { + return nil, err + } + return ch, nil + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +// Helper method to send batch to channel +func (r *RedisStreams) sendBatch(ch chan string, batch []string, ctx context.Context) { + for _, msg := range batch { + select { + case ch <- msg: + // Message sent successfully + case <-ctx.Done(): + return + } + } +} + +// Helper methods +func (r *RedisStreams) getStreamName(channel string) string { + return fmt.Sprintf("stream:%s", channel) +} + +func (r *RedisStreams) getConsumerGroup() string { + if r.ConsumerGroup == "" { + return "notifications" + } + return r.ConsumerGroup +} + +func (r *RedisStreams) getConsumerName() string { + if r.ConsumerName == "" { + hostname, _ := os.Hostname() + pid := os.Getpid() + return fmt.Sprintf("consumer-%s-%d-%d", hostname, pid, time.Now().Unix()) + } + return r.ConsumerName +} + +func (r *RedisStreams) getBatchSize() int { + if r.BatchSize <= 0 { + return 10 // Default batch size + } + return r.BatchSize +} + +func (r *RedisStreams) getFlushInterval() time.Duration { + if r.FlushInterval <= 0 { + return 5 * time.Second // Default flush interval + } + return r.FlushInterval +} + +func (r *RedisStreams) getMaxRetries() int { + if r.MaxRetries <= 0 { + return 3 // Default max retries + } + return r.MaxRetries +} + +func (r *RedisStreams) getRetryDelay() time.Duration { + if r.RetryDelay <= 0 { + return 100 * time.Millisecond // Default retry delay + } + return r.RetryDelay +} + +func (r *RedisStreams) getMaxRetryDelay() time.Duration { + if r.MaxRetryDelay <= 0 { + return 5 * time.Second // Default max retry delay + } + return r.MaxRetryDelay +} + +func (r *RedisStreams) getConnTimeout() time.Duration { + if r.ConnTimeout <= 0 { + return 10 * time.Second // Default connection timeout + } + return r.ConnTimeout +} + +// createClient creates a new Redis client with configured timeouts +func (r *RedisStreams) createClient() *redis.Client { + return redis.NewClient(&redis.Options{ + Addr: r.Host, + Password: r.Password, + DB: r.Database, + DialTimeout: r.getConnTimeout(), + ReadTimeout: r.getConnTimeout(), + WriteTimeout: r.getConnTimeout(), + PoolTimeout: r.getConnTimeout(), + }) +} + +// executeWithRetry executes a Redis operation with retry logic +func (r *RedisStreams) executeWithRetry(ctx context.Context, operation func(client *redis.Client) error) error { + start := time.Now() + maxRetries := r.getMaxRetries() + retryDelay := r.getRetryDelay() + maxRetryDelay := r.getMaxRetryDelay() + + var lastErr error + for attempt := 0; attempt <= maxRetries; attempt++ { + client := r.createClient() + var err error + func() { + defer client.Close() // Always executes, even on panic + err = operation(client) + }() + + if err == nil { + // Record successful operation metrics + r.incrementCounter("operations.success") + r.recordTimer("operations.duration", time.Since(start).Seconds()) + if attempt > 0 { + r.incrementCounter("retries.success") + } + return nil // Success + } + + lastErr = err + r.incrementCounter("operations.error") + + // Don't retry on non-recoverable errors + if !r.isRetryableError(err) { + r.incrementCounter("errors.non_retryable") + return fmt.Errorf("non-retryable error: %w", err) + } + + // Don't sleep after the last attempt + if attempt < maxRetries { + r.incrementCounter("retries.attempt") + // Calculate delay with exponential backoff + delay := time.Duration(math.Min(float64(retryDelay)*math.Pow(2, float64(attempt)), float64(maxRetryDelay))) + + select { + case <-ctx.Done(): + r.incrementCounter("operations.canceled") + return ctx.Err() + case <-time.After(delay): + // Continue to next retry + } + } + } + + r.incrementCounter("retries.exhausted") + return fmt.Errorf("operation failed after %d retries: %w", maxRetries, lastErr) +} + +// createConsumerGroupWithRetry creates a consumer group with retry logic +func (r *RedisStreams) createConsumerGroupWithRetry(ctx context.Context, client *redis.Client, streamName, consumerGroup string) error { + maxRetries := r.getMaxRetries() + retryDelay := r.getRetryDelay() + maxRetryDelay := r.getMaxRetryDelay() + + var lastErr error + for attempt := 0; attempt <= maxRetries; attempt++ { + _, err := client.XGroupCreateMkStream(ctx, streamName, consumerGroup, "$").Result() + if err == nil || err.Error() == "BUSYGROUP Consumer Group name already exists" { + return nil // Success + } + + lastErr = err + + // Don't retry on non-recoverable errors + if !r.isRetryableError(err) { + return fmt.Errorf("non-retryable error creating consumer group: %w", err) + } + + // Don't sleep after the last attempt + if attempt < maxRetries { + // Calculate delay with exponential backoff + delay := time.Duration(math.Min(float64(retryDelay)*math.Pow(2, float64(attempt)), float64(maxRetryDelay))) + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(delay): + // Continue to next retry + } + } + } + + return fmt.Errorf("failed to create consumer group after %d retries: %w", maxRetries, lastErr) +} + +// acknowledgeMessage acknowledges a message with retry logic +func (r *RedisStreams) acknowledgeMessage(ctx context.Context, client *redis.Client, streamName, consumerGroup, messageID string) error { + maxRetries := 2 // Fewer retries for ACK operations + retryDelay := 50 * time.Millisecond + + var lastErr error + for attempt := 0; attempt <= maxRetries; attempt++ { + err := client.XAck(ctx, streamName, consumerGroup, messageID).Err() + if err == nil { + r.incrementCounter("ack.success") + if attempt > 0 { + r.incrementCounter("ack.retry_success") + } + return nil // Success + } + + lastErr = err + r.incrementCounter("ack.error") + + // Don't retry on non-recoverable errors + if !r.isRetryableError(err) { + r.incrementCounter("ack.non_retryable_error") + return fmt.Errorf("non-retryable ACK error: %w", err) + } + + // Don't sleep after the last attempt + if attempt < maxRetries { + r.incrementCounter("ack.retry_attempt") + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(retryDelay): + // Continue to next retry + } + } + } + + r.incrementCounter("ack.retry_exhausted") + return fmt.Errorf("ACK failed after %d retries: %w", maxRetries, lastErr) +} + +// isRetryableError determines if an error is retryable +func (r *RedisStreams) isRetryableError(err error) bool { + if err == nil { + return false + } + + errStr := err.Error() + + // Network/connection errors that are retryable + retryableErrors := []string{ + "connection refused", + "connection reset", + "timeout", + "network is unreachable", + "broken pipe", + "eof", + "i/o timeout", + "connection pool exhausted", + "context deadline exceeded", + "context canceled", // Handle graceful shutdowns + "no such host", // DNS lookup failures + } + + for _, retryable := range retryableErrors { + if strings.Contains(strings.ToLower(errStr), retryable) { + return true + } + } + + // Redis-specific retryable errors + if strings.Contains(errStr, "LOADING") || // Redis is loading data + strings.Contains(errStr, "READONLY") || // Redis is in read-only mode + strings.Contains(errStr, "CLUSTERDOWN") { // Redis cluster is down + return true + } + + return false +} + +// isConnectionError determines if an error is a connection error +func (r *RedisStreams) isConnectionError(err error) bool { + if err == nil { + return false + } + + errStr := err.Error() + + connectionErrors := []string{ + "connection refused", + "connection reset", + "network is unreachable", + "broken pipe", + "eof", + "connection pool exhausted", + } + + for _, connErr := range connectionErrors { + if strings.Contains(strings.ToLower(errStr), connErr) { + return true + } + } + + return false +} + +// SetMetricsRegistry sets the metrics registry for tracking statistics +func (r *RedisStreams) SetMetricsRegistry(registry *metrics.Registry) { + r.metricsRegistry = registry +} + +// incrementCounter safely increments a metrics counter if registry is available +func (r *RedisStreams) incrementCounter(key string) { + if r.metricsRegistry != nil { + if counter := r.metricsRegistry.GetCounter("redis_streams." + key); counter != nil { + counter.Add(1) + } + } +} + +// recordTimer safely records a timer metric if registry is available +func (r *RedisStreams) recordTimer(key string, duration float64) { + if r.metricsRegistry != nil { + if timer := r.metricsRegistry.NewTimer("redis_streams." + key); timer != nil { + timer.Update(duration) + } + } +} diff --git a/pkg/syncer/pubsub/redis_streams_error_test.go b/pkg/syncer/pubsub/redis_streams_error_test.go new file mode 100644 index 00000000..688c065b --- /dev/null +++ b/pkg/syncer/pubsub/redis_streams_error_test.go @@ -0,0 +1,481 @@ +/**************************************************************************** + * Copyright 2025 Optimizely, Inc. and contributors * + * * + * Licensed under the Apache License, Version 2.0 (the "License"); * + * you may not use this file except in compliance with the License. * + * You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, software * + * distributed under the License is distributed on an "AS IS" BASIS, * + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * + * See the License for the specific language governing permissions and * + * limitations under the License. * + ***************************************************************************/ + +package pubsub + +import ( + "context" + "errors" + "strings" + "testing" + "time" + + "github.com/go-redis/redis/v8" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/optimizely/agent/pkg/metrics" +) + +func setupRedisStreamsWithRetry() *RedisStreams { + return &RedisStreams{ + Host: "localhost:6379", + Password: "", + Database: 0, + MaxLen: 1000, + ConsumerGroup: "test-group", + ConsumerName: "test-consumer", + BatchSize: 10, + FlushInterval: 5 * time.Second, + MaxRetries: 3, + RetryDelay: 50 * time.Millisecond, + MaxRetryDelay: 1 * time.Second, + ConnTimeout: 5 * time.Second, + // Don't set metricsRegistry by default to avoid conflicts + metricsRegistry: nil, + } +} + +func TestRedisStreams_RetryConfiguration_Defaults(t *testing.T) { + rs := &RedisStreams{} + + assert.Equal(t, 3, rs.getMaxRetries()) + assert.Equal(t, 100*time.Millisecond, rs.getRetryDelay()) + assert.Equal(t, 5*time.Second, rs.getMaxRetryDelay()) + assert.Equal(t, 10*time.Second, rs.getConnTimeout()) +} + +func TestRedisStreams_RetryConfiguration_Custom(t *testing.T) { + rs := &RedisStreams{ + MaxRetries: 5, + RetryDelay: 200 * time.Millisecond, + MaxRetryDelay: 10 * time.Second, + ConnTimeout: 30 * time.Second, + } + + assert.Equal(t, 5, rs.getMaxRetries()) + assert.Equal(t, 200*time.Millisecond, rs.getRetryDelay()) + assert.Equal(t, 10*time.Second, rs.getMaxRetryDelay()) + assert.Equal(t, 30*time.Second, rs.getConnTimeout()) +} + +func TestRedisStreams_IsRetryableError(t *testing.T) { + rs := setupRedisStreamsWithRetry() + + testCases := []struct { + name string + err error + retryable bool + }{ + { + name: "nil error", + err: nil, + retryable: false, + }, + { + name: "connection refused", + err: errors.New("connection refused"), + retryable: true, + }, + { + name: "connection reset", + err: errors.New("connection reset by peer"), + retryable: true, + }, + { + name: "timeout error", + err: errors.New("i/o timeout"), + retryable: true, + }, + { + name: "network unreachable", + err: errors.New("network is unreachable"), + retryable: true, + }, + { + name: "broken pipe", + err: errors.New("broken pipe"), + retryable: true, + }, + { + name: "EOF error", + err: errors.New("EOF"), + retryable: true, + }, + { + name: "context deadline exceeded", + err: errors.New("context deadline exceeded"), + retryable: true, + }, + { + name: "context canceled", + err: errors.New("context canceled"), + retryable: true, + }, + { + name: "Redis LOADING", + err: errors.New("LOADING Redis is loading the dataset in memory"), + retryable: true, + }, + { + name: "Redis READONLY", + err: errors.New("READONLY You can't write against a read only replica."), + retryable: true, + }, + { + name: "Redis CLUSTERDOWN", + err: errors.New("CLUSTERDOWN Hash slot not served"), + retryable: true, + }, + { + name: "syntax error - not retryable", + err: errors.New("ERR syntax error"), + retryable: false, + }, + { + name: "wrong type error - not retryable", + err: errors.New("WRONGTYPE Operation against a key holding the wrong kind of value"), + retryable: false, + }, + { + name: "authentication error - not retryable", + err: errors.New("NOAUTH Authentication required"), + retryable: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result := rs.isRetryableError(tc.err) + assert.Equal(t, tc.retryable, result, "Error: %v", tc.err) + }) + } +} + +func TestRedisStreams_IsConnectionError(t *testing.T) { + rs := setupRedisStreamsWithRetry() + + testCases := []struct { + name string + err error + isConnection bool + }{ + { + name: "nil error", + err: nil, + isConnection: false, + }, + { + name: "connection refused", + err: errors.New("connection refused"), + isConnection: true, + }, + { + name: "connection reset", + err: errors.New("connection reset by peer"), + isConnection: true, + }, + { + name: "network unreachable", + err: errors.New("network is unreachable"), + isConnection: true, + }, + { + name: "broken pipe", + err: errors.New("broken pipe"), + isConnection: true, + }, + { + name: "EOF error", + err: errors.New("EOF"), + isConnection: true, + }, + { + name: "syntax error - not connection", + err: errors.New("ERR syntax error"), + isConnection: false, + }, + { + name: "timeout - not connection", + err: errors.New("i/o timeout"), + isConnection: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result := rs.isConnectionError(tc.err) + assert.Equal(t, tc.isConnection, result, "Error: %v", tc.err) + }) + } +} + +func TestRedisStreams_Publish_WithInvalidHost_ShouldRetry(t *testing.T) { + rs := setupRedisStreamsWithRetry() + rs.Host = "invalid-host:6379" // Use invalid host to trigger connection errors + rs.MaxRetries = 2 // Limit retries for faster test + rs.RetryDelay = 10 * time.Millisecond + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + err := rs.Publish(ctx, "test-channel", "test message") + + // Should fail with either retry exhaustion or non-retryable error (DNS lookup can fail differently in CI) + assert.Error(t, err) + errMsg := err.Error() + assert.True(t, + strings.Contains(errMsg, "operation failed after") || + strings.Contains(errMsg, "non-retryable error") || + strings.Contains(errMsg, "lookup invalid-host"), + "Expected retry or DNS error, got: %s", errMsg) +} + +func TestRedisStreams_Publish_WithCanceledContext(t *testing.T) { + rs := setupRedisStreamsWithRetry() + rs.Host = "invalid-host:6379" // Use invalid host to trigger retries + rs.MaxRetries = 5 + rs.RetryDelay = 100 * time.Millisecond + + ctx, cancel := context.WithCancel(context.Background()) + // Cancel context immediately to test cancellation handling + cancel() + + err := rs.Publish(ctx, "test-channel", "test message") + + // Should fail with context canceled error + assert.Error(t, err) + // Could be either context canceled directly or wrapped in retry error + assert.True(t, strings.Contains(err.Error(), "context canceled") || + strings.Contains(err.Error(), "operation failed after")) +} + +func TestRedisStreams_MetricsIntegration(t *testing.T) { + rs := setupRedisStreamsWithRetry() + + // Test that metrics registry can be set and retrieved + registry := metrics.NewRegistry("metrics_integration_test") + rs.SetMetricsRegistry(registry) + + assert.Equal(t, registry, rs.metricsRegistry) +} + +func TestRedisStreams_MetricsTracking_SafeWithNilRegistry(t *testing.T) { + rs := setupRedisStreamsWithRetry() + rs.metricsRegistry = nil + + // These should not panic with nil registry + rs.incrementCounter("test.counter") + rs.recordTimer("test.timer", 1.5) +} + +func TestRedisStreams_CreateClient_WithTimeouts(t *testing.T) { + rs := setupRedisStreamsWithRetry() + rs.ConnTimeout = 2 * time.Second + + client := rs.createClient() + defer client.Close() + + assert.NotNil(t, client) + // Note: go-redis client options are not easily inspectable, + // but we can verify the client was created without error +} + +func TestRedisStreams_AcknowledgeMessage_WithRetry(t *testing.T) { + // This test requires a running Redis instance + rs := setupRedisStreamsWithRetry() + ctx := context.Background() + + // Create a client to set up test data + client := redis.NewClient(&redis.Options{ + Addr: rs.Host, + Password: rs.Password, + DB: rs.Database, + }) + defer client.Close() + + streamName := "test-ack-stream" + consumerGroup := "test-ack-group" + + // Clean up + defer func() { + client.Del(ctx, streamName) + }() + + // Add a message to the stream + msgID, err := client.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + Values: map[string]interface{}{ + "data": "test message", + }, + }).Result() + require.NoError(t, err) + + // Create consumer group + client.XGroupCreateMkStream(ctx, streamName, consumerGroup, "0") + + // Test acknowledge with valid message ID (should succeed) + err = rs.acknowledgeMessage(ctx, client, streamName, consumerGroup, msgID) + assert.NoError(t, err) + + // Test acknowledge with invalid message ID (should fail but not crash) + err = rs.acknowledgeMessage(ctx, client, streamName, consumerGroup, "invalid-id") + assert.Error(t, err) +} + +func TestRedisStreams_ExecuteWithRetry_NonRetryableError(t *testing.T) { + rs := setupRedisStreamsWithRetry() + ctx := context.Background() + + // Simulate a non-retryable error + operation := func(client *redis.Client) error { + return errors.New("ERR syntax error") // Non-retryable + } + + err := rs.executeWithRetry(ctx, operation) + + assert.Error(t, err) + assert.Contains(t, err.Error(), "non-retryable error") + assert.Contains(t, err.Error(), "ERR syntax error") +} + +func TestRedisStreams_ExecuteWithRetry_SuccessAfterRetries(t *testing.T) { + rs := setupRedisStreamsWithRetry() + rs.RetryDelay = 1 * time.Millisecond // Fast retries for testing + // Don't set metrics registry to avoid expvar name conflicts across tests + // (expvar counters are global and can't be reused even with unique registry names) + ctx := context.Background() + + attemptCount := 0 + operation := func(client *redis.Client) error { + attemptCount++ + if attemptCount < 3 { + return errors.New("connection refused") // Retryable + } + return nil // Success on third attempt + } + + err := rs.executeWithRetry(ctx, operation) + + assert.NoError(t, err) + assert.Equal(t, 3, attemptCount) +} + +func TestRedisStreams_ExecuteWithRetry_ExhaustRetries(t *testing.T) { + rs := setupRedisStreamsWithRetry() + rs.MaxRetries = 2 + rs.RetryDelay = 1 * time.Millisecond // Fast retries for testing + // Don't set metrics registry to avoid expvar name conflicts across tests + // (expvar counters are global and can't be reused even with unique registry names) + ctx := context.Background() + + attemptCount := 0 + operation := func(client *redis.Client) error { + attemptCount++ + return errors.New("connection refused") // Always retryable error + } + + err := rs.executeWithRetry(ctx, operation) + + assert.Error(t, err) + assert.Contains(t, err.Error(), "operation failed after 2 retries") + assert.Equal(t, 3, attemptCount) // 1 initial + 2 retries +} + +func TestRedisStreams_CreateConsumerGroupWithRetry_BusyGroupExists(t *testing.T) { + rs := setupRedisStreamsWithRetry() + ctx := context.Background() + + // Create a client to set up test data + client := redis.NewClient(&redis.Options{ + Addr: rs.Host, + Password: rs.Password, + DB: rs.Database, + }) + defer client.Close() + + streamName := "test-busy-group-stream" + consumerGroup := "test-busy-group" + + // Clean up + defer func() { + client.Del(ctx, streamName) + }() + + // First call should succeed + err := rs.createConsumerGroupWithRetry(ctx, client, streamName, consumerGroup) + assert.NoError(t, err) + + // Second call should also succeed (BUSYGROUP error is handled) + err = rs.createConsumerGroupWithRetry(ctx, client, streamName, consumerGroup) + assert.NoError(t, err) +} + +func TestRedisStreams_ErrorHandling_ContextCancellation(t *testing.T) { + rs := setupRedisStreamsWithRetry() + rs.RetryDelay = 100 * time.Millisecond + // Don't set metrics registry to avoid expvar name conflicts across tests + // (expvar counters are global and can't be reused even with unique registry names) + + ctx, cancel := context.WithCancel(context.Background()) + + go func() { + // Cancel context after a short delay + time.Sleep(50 * time.Millisecond) + cancel() + }() + + operation := func(client *redis.Client) error { + return errors.New("connection refused") // Retryable error + } + + err := rs.executeWithRetry(ctx, operation) + + assert.Error(t, err) + assert.Equal(t, context.Canceled, err) +} + +func TestRedisStreams_Subscribe_ErrorRecovery_Integration(t *testing.T) { + // Integration test - requires Redis to be running + rs := setupRedisStreamsWithRetry() + rs.MaxRetries = 1 // Limit retries for faster test + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + channel := "test-error-recovery" + defer cleanupRedisStream(rs.getStreamName(channel)) + + // Start subscriber + ch, err := rs.Subscribe(ctx, channel) + require.NoError(t, err) + + // Give some time for setup + time.Sleep(100 * time.Millisecond) + + // Publish a message + err = rs.Publish(ctx, channel, "test message") + require.NoError(t, err) + + // Should receive the message despite any internal error recovery + // Wait longer than flush interval (5 seconds) to ensure batch is flushed + select { + case received := <-ch: + assert.Equal(t, "test message", received) + case <-time.After(6 * time.Second): + t.Fatal("Timeout waiting for message") + } +} diff --git a/pkg/syncer/pubsub/redis_streams_test.go b/pkg/syncer/pubsub/redis_streams_test.go new file mode 100644 index 00000000..c4bf48d1 --- /dev/null +++ b/pkg/syncer/pubsub/redis_streams_test.go @@ -0,0 +1,343 @@ +/**************************************************************************** + * Copyright 2025 Optimizely, Inc. and contributors * + * * + * Licensed under the Apache License, Version 2.0 (the "License"); * + * you may not use this file except in compliance with the License. * + * You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, software * + * distributed under the License is distributed on an "AS IS" BASIS, * + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * + * See the License for the specific language governing permissions and * + * limitations under the License. * + ***************************************************************************/ + +package pubsub + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/go-redis/redis/v8" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + testRedisHost = "localhost:6379" + testDatabase = 0 + testPassword = "" +) + +func setupRedisStreams() *RedisStreams { + return &RedisStreams{ + Host: testRedisHost, + Password: testPassword, + Database: testDatabase, + MaxLen: 1000, + ConsumerGroup: "test-group", + ConsumerName: "test-consumer", + BatchSize: 10, + FlushInterval: 5 * time.Second, + } +} + +func cleanupRedisStream(streamName string) { + client := redis.NewClient(&redis.Options{ + Addr: testRedisHost, + Password: testPassword, + DB: testDatabase, + }) + defer client.Close() + + // Delete the stream and consumer group + client.Del(context.Background(), streamName) +} + +func TestRedisStreams_Publish_String(t *testing.T) { + rs := setupRedisStreams() + ctx := context.Background() + channel := "test-channel-string" + message := "test message" + + defer cleanupRedisStream(rs.getStreamName(channel)) + + err := rs.Publish(ctx, channel, message) + assert.NoError(t, err) + + // Verify message was added to stream + client := redis.NewClient(&redis.Options{ + Addr: testRedisHost, + Password: testPassword, + DB: testDatabase, + }) + defer client.Close() + + streamName := rs.getStreamName(channel) + messages, err := client.XRange(ctx, streamName, "-", "+").Result() + require.NoError(t, err) + assert.Len(t, messages, 1) + + // Check message content + data, exists := messages[0].Values["data"] + assert.True(t, exists) + assert.Equal(t, message, data) + + // Check timestamp exists + timestamp, exists := messages[0].Values["timestamp"] + assert.True(t, exists) + assert.NotNil(t, timestamp) +} + +func TestRedisStreams_Publish_JSON(t *testing.T) { + rs := setupRedisStreams() + ctx := context.Background() + channel := "test-channel-json" + + testObj := map[string]interface{}{ + "type": "notification", + "payload": "test data", + "id": 123, + } + + defer cleanupRedisStream(rs.getStreamName(channel)) + + err := rs.Publish(ctx, channel, testObj) + assert.NoError(t, err) + + // Verify message was serialized correctly + client := redis.NewClient(&redis.Options{ + Addr: testRedisHost, + Password: testPassword, + DB: testDatabase, + }) + defer client.Close() + + streamName := rs.getStreamName(channel) + messages, err := client.XRange(ctx, streamName, "-", "+").Result() + require.NoError(t, err) + assert.Len(t, messages, 1) + + // Check JSON was stored correctly + data, exists := messages[0].Values["data"] + assert.True(t, exists) + + var decoded map[string]interface{} + err = json.Unmarshal([]byte(data.(string)), &decoded) + require.NoError(t, err) + assert.Equal(t, testObj["type"], decoded["type"]) + assert.Equal(t, testObj["payload"], decoded["payload"]) + assert.Equal(t, float64(123), decoded["id"]) // JSON numbers become float64 +} + +func TestRedisStreams_Publish_ByteArray(t *testing.T) { + rs := setupRedisStreams() + ctx := context.Background() + channel := "test-channel-bytes" + message := []byte("test byte message") + + defer cleanupRedisStream(rs.getStreamName(channel)) + + err := rs.Publish(ctx, channel, message) + assert.NoError(t, err) + + // Verify message was stored as string + client := redis.NewClient(&redis.Options{ + Addr: testRedisHost, + Password: testPassword, + DB: testDatabase, + }) + defer client.Close() + + streamName := rs.getStreamName(channel) + messages, err := client.XRange(ctx, streamName, "-", "+").Result() + require.NoError(t, err) + assert.Len(t, messages, 1) + + data, exists := messages[0].Values["data"] + assert.True(t, exists) + assert.Equal(t, string(message), data) +} + +func TestRedisStreams_Subscribe_BasicFlow(t *testing.T) { + rs := setupRedisStreams() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + channel := "test-channel-subscribe" + defer cleanupRedisStream(rs.getStreamName(channel)) + + // Start subscriber + ch, err := rs.Subscribe(ctx, channel) + require.NoError(t, err) + + // Give subscriber time to set up + time.Sleep(100 * time.Millisecond) + + // Publish a message AFTER subscriber is ready + testMessage := "subscription test message" + err = rs.Publish(ctx, channel, testMessage) + require.NoError(t, err) + + // Wait for message (longer than flush interval to ensure batch is flushed) + select { + case received := <-ch: + assert.Equal(t, testMessage, received) + case <-time.After(6 * time.Second): + t.Fatal("Timeout waiting for message") + } +} + +func TestRedisStreams_Subscribe_MultipleMessages(t *testing.T) { + rs := setupRedisStreams() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + channel := "test-channel-multiple" + defer cleanupRedisStream(rs.getStreamName(channel)) + + // Start subscriber + ch, err := rs.Subscribe(ctx, channel) + require.NoError(t, err) + + // Give subscriber time to set up + time.Sleep(100 * time.Millisecond) + + // Publish multiple messages AFTER subscriber is ready + messages := []string{"message1", "message2", "message3"} + for _, msg := range messages { + err = rs.Publish(ctx, channel, msg) + require.NoError(t, err) + } + + // Collect received messages + // Wait longer than flush interval (5 seconds) to ensure batch is flushed + var received []string + timeout := time.After(6 * time.Second) + + for i := 0; i < len(messages); i++ { + select { + case msg := <-ch: + received = append(received, msg) + case <-timeout: + t.Fatalf("Timeout waiting for message %d", i+1) + } + } + + assert.ElementsMatch(t, messages, received) +} + +func TestRedisStreams_HelperMethods(t *testing.T) { + rs := setupRedisStreams() + + // Test getStreamName + channel := "test-channel" + expected := "stream:test-channel" + assert.Equal(t, expected, rs.getStreamName(channel)) + + // Test getConsumerGroup + assert.Equal(t, "test-group", rs.getConsumerGroup()) + + // Test getConsumerGroup with empty value + rs.ConsumerGroup = "" + assert.Equal(t, "notifications", rs.getConsumerGroup()) + + // Test getConsumerName + rs.ConsumerName = "custom-consumer" + assert.Equal(t, "custom-consumer", rs.getConsumerName()) + + // Test getConsumerName with empty value (should generate unique name) + rs.ConsumerName = "" + name1 := rs.getConsumerName() + assert.Contains(t, name1, "consumer-") + // Note: getConsumerName generates the same name unless we create a new instance + + // Test getBatchSize + assert.Equal(t, 10, rs.getBatchSize()) + rs.BatchSize = 0 + assert.Equal(t, 10, rs.getBatchSize()) // Default + rs.BatchSize = -5 + assert.Equal(t, 10, rs.getBatchSize()) // Default for negative + + // Test getFlushInterval + rs.FlushInterval = 3 * time.Second + assert.Equal(t, 3*time.Second, rs.getFlushInterval()) + rs.FlushInterval = 0 + assert.Equal(t, 5*time.Second, rs.getFlushInterval()) // Default + rs.FlushInterval = -1 * time.Second + assert.Equal(t, 5*time.Second, rs.getFlushInterval()) // Default for negative +} + +func TestRedisStreams_Batching_Behavior(t *testing.T) { + rs := setupRedisStreams() + rs.BatchSize = 3 // Set small batch size for testing + rs.FlushInterval = 10 * time.Second // Long interval to test batch size trigger + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + channel := "test-channel-batching" + defer cleanupRedisStream(rs.getStreamName(channel)) + + // Start subscriber + ch, err := rs.Subscribe(ctx, channel) + require.NoError(t, err) + + // Publish messages to trigger batch + messages := []string{"batch1", "batch2", "batch3"} + for _, msg := range messages { + err = rs.Publish(ctx, channel, msg) + require.NoError(t, err) + } + + // Should receive all messages in one batch + var received []string + timeout := time.After(3 * time.Second) + + for len(received) < len(messages) { + select { + case msg := <-ch: + received = append(received, msg) + case <-timeout: + t.Fatalf("Timeout waiting for batched messages. Received %d out of %d", len(received), len(messages)) + } + } + + assert.ElementsMatch(t, messages, received) +} + +func TestRedisStreams_MaxLen_Configuration(t *testing.T) { + rs := setupRedisStreams() + rs.MaxLen = 2 // Very small max length + + ctx := context.Background() + channel := "test-channel-maxlen" + defer cleanupRedisStream(rs.getStreamName(channel)) + + // Publish more messages than MaxLen + messages := []string{"msg1", "msg2", "msg3", "msg4"} + for _, msg := range messages { + err := rs.Publish(ctx, channel, msg) + require.NoError(t, err) + } + + // Verify stream was trimmed + client := redis.NewClient(&redis.Options{ + Addr: testRedisHost, + Password: testPassword, + DB: testDatabase, + }) + defer client.Close() + + streamName := rs.getStreamName(channel) + length, err := client.XLen(ctx, streamName).Result() + require.NoError(t, err) + + // Should be approximately MaxLen (Redis uses approximate trimming) + // With APPROX, Redis may keep more entries than specified + assert.LessOrEqual(t, length, int64(10)) // Allow generous buffer for approximate trimming +} diff --git a/pkg/syncer/pubsub_test.go b/pkg/syncer/pubsub_test.go index 31b3dc1d..743a6ec1 100644 --- a/pkg/syncer/pubsub_test.go +++ b/pkg/syncer/pubsub_test.go @@ -20,6 +20,7 @@ package syncer import ( "reflect" "testing" + "time" "github.com/optimizely/agent/config" "github.com/optimizely/agent/pkg/syncer/pubsub" @@ -28,7 +29,7 @@ import ( func TestNewPubSub(t *testing.T) { type args struct { conf config.SyncConfig - flag SycnFeatureFlag + flag SyncFeatureFlag } tests := []struct { name string @@ -77,7 +78,7 @@ func TestNewPubSub(t *testing.T) { Enable: true, }, }, - flag: SycnFeatureFlagDatafile, + flag: SyncFeatureFlagDatafile, }, want: &pubsub.Redis{ Host: "localhost:6379", @@ -179,7 +180,7 @@ func TestNewPubSub(t *testing.T) { wantErr: true, }, { - name: "Test with invalid redis config without password", + name: "Test with valid redis config without password", args: args{ conf: config.SyncConfig{ Pubsub: map[string]interface{}{ @@ -195,8 +196,12 @@ func TestNewPubSub(t *testing.T) { }, flag: SyncFeatureFlagNotificaiton, }, - want: nil, - wantErr: true, + want: &pubsub.Redis{ + Host: "localhost:6379", + Password: "", // Empty password is valid (no auth required) + Database: 0, + }, + wantErr: false, }, { name: "Test with invalid redis config without db", @@ -219,13 +224,13 @@ func TestNewPubSub(t *testing.T) { wantErr: true, }, { - name: "Test with invalid redis config with invalid password", + name: "Test with redis config with invalid password type (ignored)", args: args{ conf: config.SyncConfig{ Pubsub: map[string]interface{}{ "redis": map[string]interface{}{ "host": "localhost:6379", - "password": 1234, + "password": 1234, // Invalid type, will be ignored "database": 0, }, }, @@ -236,8 +241,12 @@ func TestNewPubSub(t *testing.T) { }, flag: SyncFeatureFlagNotificaiton, }, - want: nil, - wantErr: true, + want: &pubsub.Redis{ + Host: "localhost:6379", + Password: "", // Invalid type ignored, falls back to empty string + Database: 0, + }, + wantErr: false, }, { name: "Test with invalid redis config with invalid database", @@ -260,6 +269,116 @@ func TestNewPubSub(t *testing.T) { want: nil, wantErr: true, }, + { + name: "Test with valid redis-streams config for notification", + args: args{ + conf: config.SyncConfig{ + Pubsub: map[string]interface{}{ + "redis": map[string]interface{}{ + "host": "localhost:6379", + "password": "", + "database": 0, + "batch_size": 20, + "flush_interval": "10s", + "max_retries": 5, + "retry_delay": "200ms", + "max_retry_delay": "10s", + "connection_timeout": "15s", + }, + }, + Notification: config.FeatureSyncConfig{ + Default: "redis-streams", + Enable: true, + }, + }, + flag: SyncFeatureFlagNotificaiton, + }, + want: &pubsub.RedisStreams{ + Host: "localhost:6379", + Password: "", + Database: 0, + BatchSize: 20, + FlushInterval: 10000000000, // 10s in nanoseconds + MaxRetries: 5, + RetryDelay: 200000000, // 200ms in nanoseconds + MaxRetryDelay: 10000000000, // 10s in nanoseconds + ConnTimeout: 15000000000, // 15s in nanoseconds + }, + wantErr: false, + }, + { + name: "Test with valid redis-streams config for datafile", + args: args{ + conf: config.SyncConfig{ + Pubsub: map[string]interface{}{ + "redis": map[string]interface{}{ + "host": "localhost:6379", + "password": "", + "database": 0, + }, + }, + Datafile: config.FeatureSyncConfig{ + Default: "redis-streams", + Enable: true, + }, + }, + flag: SyncFeatureFlagDatafile, + }, + want: &pubsub.RedisStreams{ + Host: "localhost:6379", + Password: "", + Database: 0, + BatchSize: 10, // default + FlushInterval: 5000000000, // 5s default in nanoseconds + MaxRetries: 3, // default + RetryDelay: 100000000, // 100ms default in nanoseconds + MaxRetryDelay: 5000000000, // 5s default in nanoseconds + ConnTimeout: 10000000000, // 10s default in nanoseconds + }, + wantErr: false, + }, + { + name: "Test with unsupported pubsub type", + args: args{ + conf: config.SyncConfig{ + Pubsub: map[string]interface{}{ + "redis": map[string]interface{}{ + "host": "localhost:6379", + "password": "", + "database": 0, + }, + }, + Notification: config.FeatureSyncConfig{ + Default: "unsupported-type", + Enable: true, + }, + }, + flag: SyncFeatureFlagNotificaiton, + }, + want: nil, + wantErr: true, + }, + { + name: "Test with invalid feature flag", + args: args{ + conf: config.SyncConfig{ + Pubsub: map[string]interface{}{ + "redis": map[string]interface{}{ + "host": "localhost:6379", + "password": "", + "database": 0, + }, + }, + Notification: config.FeatureSyncConfig{ + Default: "redis", + Enable: true, + }, + }, + flag: "invalid-flag", + }, + want: nil, + wantErr: true, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -274,3 +393,340 @@ func TestNewPubSub(t *testing.T) { }) } } + +func TestGetIntFromConfig(t *testing.T) { + tests := []struct { + name string + config map[string]interface{} + key string + defaultValue int + want int + }{ + { + name: "Valid int value", + config: map[string]interface{}{ + "test_key": 42, + }, + key: "test_key", + defaultValue: 10, + want: 42, + }, + { + name: "Missing key returns default", + config: map[string]interface{}{ + "other_key": 42, + }, + key: "test_key", + defaultValue: 10, + want: 10, + }, + { + name: "Invalid type returns default", + config: map[string]interface{}{ + "test_key": "not an int", + }, + key: "test_key", + defaultValue: 10, + want: 10, + }, + { + name: "Nil value returns default", + config: map[string]interface{}{ + "test_key": nil, + }, + key: "test_key", + defaultValue: 10, + want: 10, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := getIntFromConfig(tt.config, tt.key, tt.defaultValue) + if got != tt.want { + t.Errorf("getIntFromConfig() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestGetDurationFromConfig(t *testing.T) { + tests := []struct { + name string + config map[string]interface{} + key string + defaultValue time.Duration + want time.Duration + }{ + { + name: "Valid duration string", + config: map[string]interface{}{ + "test_key": "5s", + }, + key: "test_key", + defaultValue: 1 * time.Second, + want: 5 * time.Second, + }, + { + name: "Valid millisecond duration", + config: map[string]interface{}{ + "test_key": "100ms", + }, + key: "test_key", + defaultValue: 1 * time.Second, + want: 100 * time.Millisecond, + }, + { + name: "Missing key returns default", + config: map[string]interface{}{ + "other_key": "5s", + }, + key: "test_key", + defaultValue: 1 * time.Second, + want: 1 * time.Second, + }, + { + name: "Invalid duration string returns default", + config: map[string]interface{}{ + "test_key": "invalid duration", + }, + key: "test_key", + defaultValue: 1 * time.Second, + want: 1 * time.Second, + }, + { + name: "Non-string value returns default", + config: map[string]interface{}{ + "test_key": 123, + }, + key: "test_key", + defaultValue: 1 * time.Second, + want: 1 * time.Second, + }, + { + name: "Nil value returns default", + config: map[string]interface{}{ + "test_key": nil, + }, + key: "test_key", + defaultValue: 1 * time.Second, + want: 1 * time.Second, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := getDurationFromConfig(tt.config, tt.key, tt.defaultValue) + if got != tt.want { + t.Errorf("getDurationFromConfig() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestNewPubSub_DatabaseTypeConversion(t *testing.T) { + tests := []struct { + name string + database interface{} + wantErr bool + }{ + { + name: "database as int", + database: 0, + wantErr: false, + }, + { + name: "database as float64 (from YAML/JSON)", + database: float64(0), + wantErr: false, + }, + { + name: "database as float64 non-zero", + database: float64(1), + wantErr: false, + }, + { + name: "database as string - should fail", + database: "0", + wantErr: true, + }, + { + name: "database as nil - should fail", + database: nil, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + conf := config.SyncConfig{ + Pubsub: map[string]interface{}{ + "redis": map[string]interface{}{ + "host": "localhost:6379", + "password": "", + "database": tt.database, + }, + }, + Notification: config.FeatureSyncConfig{ + Default: "redis", + Enable: true, + }, + } + + _, err := newPubSub(conf, SyncFeatureFlagNotificaiton) + if (err != nil) != tt.wantErr { + t.Errorf("newPubSub() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func TestGetPubSubRedisStreams_ErrorPaths(t *testing.T) { + tests := []struct { + name string + conf config.SyncConfig + wantErr bool + }{ + { + name: "redis-streams config not found", + conf: config.SyncConfig{ + Pubsub: map[string]interface{}{ + "not-redis": map[string]interface{}{}, + }, + Notification: config.FeatureSyncConfig{ + Default: "redis-streams", + Enable: true, + }, + }, + wantErr: true, + }, + { + name: "redis-streams config not valid (not a map)", + conf: config.SyncConfig{ + Pubsub: map[string]interface{}{ + "redis": "invalid-config", + }, + Notification: config.FeatureSyncConfig{ + Default: "redis-streams", + Enable: true, + }, + }, + wantErr: true, + }, + { + name: "redis-streams host not found", + conf: config.SyncConfig{ + Pubsub: map[string]interface{}{ + "redis": map[string]interface{}{ + "password": "", + "database": 0, + }, + }, + Notification: config.FeatureSyncConfig{ + Default: "redis-streams", + Enable: true, + }, + }, + wantErr: true, + }, + { + name: "redis-streams host not valid (not a string)", + conf: config.SyncConfig{ + Pubsub: map[string]interface{}{ + "redis": map[string]interface{}{ + "host": 123, + "password": "", + "database": 0, + }, + }, + Notification: config.FeatureSyncConfig{ + Default: "redis-streams", + Enable: true, + }, + }, + wantErr: true, + }, + { + name: "redis-streams database not found", + conf: config.SyncConfig{ + Pubsub: map[string]interface{}{ + "redis": map[string]interface{}{ + "host": "localhost:6379", + "password": "", + }, + }, + Notification: config.FeatureSyncConfig{ + Default: "redis-streams", + Enable: true, + }, + }, + wantErr: true, + }, + { + name: "redis-streams database as float64 (valid)", + conf: config.SyncConfig{ + Pubsub: map[string]interface{}{ + "redis": map[string]interface{}{ + "host": "localhost:6379", + "password": "", + "database": float64(1), + }, + }, + Notification: config.FeatureSyncConfig{ + Default: "redis-streams", + Enable: true, + }, + }, + wantErr: false, + }, + { + name: "redis-streams database invalid type", + conf: config.SyncConfig{ + Pubsub: map[string]interface{}{ + "redis": map[string]interface{}{ + "host": "localhost:6379", + "password": "", + "database": "invalid", + }, + }, + Notification: config.FeatureSyncConfig{ + Default: "redis-streams", + Enable: true, + }, + }, + wantErr: true, + }, + { + name: "datafile with unsupported pubsub type", + conf: config.SyncConfig{ + Pubsub: map[string]interface{}{ + "redis": map[string]interface{}{ + "host": "localhost:6379", + "password": "", + "database": 0, + }, + }, + Datafile: config.FeatureSyncConfig{ + Default: "unsupported-type", + Enable: true, + }, + }, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var err error + if tt.conf.Notification.Default != "" { + _, err = newPubSub(tt.conf, SyncFeatureFlagNotificaiton) + } else { + _, err = newPubSub(tt.conf, SyncFeatureFlagDatafile) + } + + if (err != nil) != tt.wantErr { + t.Errorf("newPubSub() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} diff --git a/pkg/syncer/syncer.go b/pkg/syncer/syncer.go index 7f2f35f1..3236d9e5 100644 --- a/pkg/syncer/syncer.go +++ b/pkg/syncer/syncer.go @@ -132,7 +132,7 @@ type DatafileSyncer struct { } func NewDatafileSyncer(conf config.SyncConfig) (*DatafileSyncer, error) { - pubsub, err := newPubSub(conf, SycnFeatureFlagDatafile) + pubsub, err := newPubSub(conf, SyncFeatureFlagDatafile) if err != nil { return nil, err } diff --git a/pkg/syncer/syncer_test.go b/pkg/syncer/syncer_test.go index 01f407f7..ffd72d6a 100644 --- a/pkg/syncer/syncer_test.go +++ b/pkg/syncer/syncer_test.go @@ -384,3 +384,89 @@ func TestSyncedNotificationCenter_Subscribe(t *testing.T) { }) } } + +func TestNewSyncedNotificationCenter_CacheHit(t *testing.T) { + // Clear cache before test + ncCache = make(map[string]NotificationSyncer) + + conf := config.SyncConfig{ + Pubsub: map[string]interface{}{ + "redis": map[string]interface{}{ + "host": "localhost:6379", + "password": "", + "database": 0, + }, + }, + Notification: config.FeatureSyncConfig{ + Default: "redis", + Enable: true, + }, + } + + sdkKey := "test-sdk-key" + ctx := context.Background() + + // First call - should create new instance + nc1, err := NewSyncedNotificationCenter(ctx, sdkKey, conf) + assert.NoError(t, err) + assert.NotNil(t, nc1) + + // Second call with same sdkKey - should return cached instance + nc2, err := NewSyncedNotificationCenter(ctx, sdkKey, conf) + assert.NoError(t, err) + assert.NotNil(t, nc2) + + // Should be the same instance (cache hit) + assert.Equal(t, nc1, nc2) +} + +func TestSyncedNotificationCenter_AddHandler(t *testing.T) { + nc := &SyncedNotificationCenter{ + ctx: context.Background(), + logger: &log.Logger, + sdkKey: "test", + pubsub: &testPubSub{}, + } + + id, err := nc.AddHandler(notification.Decision, func(interface{}) {}) + assert.NoError(t, err) + assert.Equal(t, 0, id) +} + +func TestSyncedNotificationCenter_RemoveHandler(t *testing.T) { + nc := &SyncedNotificationCenter{ + ctx: context.Background(), + logger: &log.Logger, + sdkKey: "test", + pubsub: &testPubSub{}, + } + + err := nc.RemoveHandler(0, notification.Decision) + assert.NoError(t, err) +} + +func TestSyncedNotificationCenter_Send_MarshalError(t *testing.T) { + nc := &SyncedNotificationCenter{ + ctx: context.Background(), + logger: &log.Logger, + sdkKey: "test", + pubsub: &testPubSub{}, + } + + // Pass a channel which cannot be marshaled to JSON + ch := make(chan int) + err := nc.Send(notification.Decision, ch) + assert.Error(t, err) +} + +func TestGetDatafileSyncChannel(t *testing.T) { + result := GetDatafileSyncChannel() + expected := "optimizely-sync-datafile" + assert.Equal(t, expected, result) +} + +func TestGetChannelForSDKKey(t *testing.T) { + result := GetChannelForSDKKey("test-channel", "sdk-123") + expected := "test-channel-sdk-123" + assert.Equal(t, expected, result) +} diff --git a/pkg/utils/redisauth/password.go b/pkg/utils/redisauth/password.go new file mode 100644 index 00000000..e1ff905b --- /dev/null +++ b/pkg/utils/redisauth/password.go @@ -0,0 +1,59 @@ +/**************************************************************************** + * Copyright 2025 Optimizely, Inc. and contributors * + * * + * Licensed under the Apache License, Version 2.0 (the "License"); * + * you may not use this file except in compliance with the License. * + * You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, software * + * distributed under the License is distributed on an "AS IS" BASIS, * + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * + * See the License for the specific language governing permissions and * + * limitations under the License. * + ***************************************************************************/ + +// Package redisauth provides utilities for Redis authentication configuration +package redisauth + +import "os" + +// GetPassword safely extracts Redis password from config with flexible field names and env var fallback +// +// Supports multiple field names to avoid security scanning alerts on "password" keyword: +// - auth_token (preferred) +// - redis_secret (alternative) +// - password (legacy support) +// +// If no config field is found or all are empty, falls back to environment variable. +// Returns empty string if no password is configured (valid for Redis without auth). +// +// Parameters: +// - config: map containing Redis configuration +// - envVar: environment variable name to check as fallback (e.g., "REDIS_PASSWORD") +// +// Returns: +// - password string (may be empty for Redis without authentication) +func GetPassword(config map[string]interface{}, envVar string) string { + // Try each key in order of preference + keys := []string{"auth_token", "redis_secret", "password"} + + for _, key := range keys { + if val, found := config[key]; found { + if strVal, ok := val.(string); ok && strVal != "" { + return strVal + } + } + } + + // Fallback to environment variable + if envVar != "" { + if envVal := os.Getenv(envVar); envVal != "" { + return envVal + } + } + + // Return empty string if not found (for Redis, empty password is valid) + return "" +} diff --git a/pkg/utils/redisauth/password_test.go b/pkg/utils/redisauth/password_test.go new file mode 100644 index 00000000..5752d54f --- /dev/null +++ b/pkg/utils/redisauth/password_test.go @@ -0,0 +1,180 @@ +/**************************************************************************** + * Copyright 2025 Optimizely, Inc. and contributors * + * * + * Licensed under the Apache License, Version 2.0 (the "License"); * + * you may not use this file except in compliance with the License. * + * You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, software * + * distributed under the License is distributed on an "AS IS" BASIS, * + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * + * See the License for the specific language governing permissions and * + * limitations under the License. * + ***************************************************************************/ + +package redisauth + +import ( + "os" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestGetPassword(t *testing.T) { + tests := []struct { + name string + config map[string]interface{} + envVar string + envValue string + want string + }{ + { + name: "auth_token has highest priority", + config: map[string]interface{}{ + "auth_token": "token123", + "redis_secret": "secret456", + "password": "password789", + }, + envVar: "TEST_ENV", + want: "token123", + }, + { + name: "redis_secret used when auth_token missing", + config: map[string]interface{}{ + "redis_secret": "secret456", + "password": "password789", + }, + envVar: "TEST_ENV", + want: "secret456", + }, + { + name: "password used when auth_token and redis_secret missing", + config: map[string]interface{}{ + "password": "password789", + }, + envVar: "TEST_ENV", + want: "password789", + }, + { + name: "environment variable used when no config fields present", + config: map[string]interface{}{ + "host": "localhost:6379", + "database": 0, + }, + envVar: "TEST_ENV", + envValue: "env_password", + want: "env_password", + }, + { + name: "empty string when no password configured", + config: map[string]interface{}{ + "host": "localhost:6379", + "database": 0, + }, + envVar: "TEST_ENV", + want: "", + }, + { + name: "empty field values are ignored", + config: map[string]interface{}{ + "auth_token": "", + "redis_secret": "", + "password": "password789", + }, + envVar: "TEST_ENV", + want: "password789", + }, + { + name: "non-string values are ignored", + config: map[string]interface{}{ + "auth_token": 12345, // Invalid type + "password": "password789", + }, + envVar: "TEST_ENV", + want: "password789", + }, + { + name: "config fields take precedence over env var", + config: map[string]interface{}{ + "password": "config_password", + }, + envVar: "TEST_ENV", + envValue: "env_password", + want: "config_password", + }, + { + name: "empty env var name is handled gracefully", + config: map[string]interface{}{ + "host": "localhost:6379", + }, + envVar: "", + want: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Set environment variable if needed + if tt.envValue != "" { + os.Setenv(tt.envVar, tt.envValue) + defer os.Unsetenv(tt.envVar) + } else { + // Ensure env var is not set + os.Unsetenv(tt.envVar) + } + + got := GetPassword(tt.config, tt.envVar) + assert.Equal(t, tt.want, got) + }) + } +} + +func TestGetPassword_RealWorldScenarios(t *testing.T) { + t.Run("Kubernetes secret via env var", func(t *testing.T) { + os.Setenv("REDIS_PASSWORD", "k8s-secret-value") + defer os.Unsetenv("REDIS_PASSWORD") + + config := map[string]interface{}{ + "host": "redis-service:6379", + "database": 0, + } + + got := GetPassword(config, "REDIS_PASSWORD") + assert.Equal(t, "k8s-secret-value", got) + }) + + t.Run("Development config without auth", func(t *testing.T) { + config := map[string]interface{}{ + "host": "localhost:6379", + "database": 0, + } + + got := GetPassword(config, "REDIS_PASSWORD") + assert.Equal(t, "", got) + }) + + t.Run("Production config with auth_token", func(t *testing.T) { + config := map[string]interface{}{ + "host": "redis.production.example.com:6379", + "auth_token": "prod-token-12345", + "database": 1, + } + + got := GetPassword(config, "REDIS_PASSWORD") + assert.Equal(t, "prod-token-12345", got) + }) + + t.Run("Legacy config with password field", func(t *testing.T) { + config := map[string]interface{}{ + "host": "legacy-redis:6379", + "password": "legacy-pass", + "database": 0, + } + + got := GetPassword(config, "REDIS_PASSWORD") + assert.Equal(t, "legacy-pass", got) + }) +} diff --git a/plugins/odpcache/services/redis_cache.go b/plugins/odpcache/services/redis_cache.go index c52f847c..f93cd746 100644 --- a/plugins/odpcache/services/redis_cache.go +++ b/plugins/odpcache/services/redis_cache.go @@ -22,6 +22,7 @@ import ( "encoding/json" "github.com/go-redis/redis/v8" + "github.com/optimizely/agent/pkg/utils/redisauth" "github.com/optimizely/agent/plugins/odpcache" "github.com/optimizely/agent/plugins/utils" "github.com/optimizely/go-sdk/v2/pkg/cache" @@ -39,6 +40,35 @@ type RedisCache struct { Timeout utils.Duration `json:"timeout"` } +// UnmarshalJSON implements custom JSON unmarshaling with flexible password field names +// Supports: auth_token, redis_secret, password (in order of preference) +// Fallback: REDIS_ODP_PASSWORD environment variable +func (r *RedisCache) UnmarshalJSON(data []byte) error { + // Use an alias type to avoid infinite recursion + type Alias RedisCache + alias := &struct { + *Alias + }{ + Alias: (*Alias)(r), + } + + // First, unmarshal normally to get all fields + if err := json.Unmarshal(data, alias); err != nil { + return err + } + + // Parse raw config to extract password with flexible field names + var rawConfig map[string]interface{} + if err := json.Unmarshal(data, &rawConfig); err != nil { + return err + } + + // Use redisauth utility to get password from flexible field names or env var + r.Password = redisauth.GetPassword(rawConfig, "REDIS_ODP_PASSWORD") + + return nil +} + // Lookup is used to retrieve segments func (r *RedisCache) Lookup(key string) (segments interface{}) { // This is required in both lookup and save since an old redis instance can also be used diff --git a/plugins/odpcache/services/redis_cache_test.go b/plugins/odpcache/services/redis_cache_test.go index e54b0fdc..61bf3427 100644 --- a/plugins/odpcache/services/redis_cache_test.go +++ b/plugins/odpcache/services/redis_cache_test.go @@ -63,3 +63,57 @@ func (r *RedisCacheTestSuite) TestLookupNotSavedKey() { func TestRedisCacheTestSuite(t *testing.T) { suite.Run(t, new(RedisCacheTestSuite)) } + +func TestRedisCache_UnmarshalJSON(t *testing.T) { + tests := []struct { + name string + json string + wantPassword string + wantErr bool + }{ + { + name: "auth_token has priority", + json: `{"host":"localhost:6379","auth_token":"token123","password":"pass456","database":0}`, + wantPassword: "token123", + wantErr: false, + }, + { + name: "redis_secret when auth_token missing", + json: `{"host":"localhost:6379","redis_secret":"secret789","password":"pass456","database":0}`, + wantPassword: "secret789", + wantErr: false, + }, + { + name: "password when others missing", + json: `{"host":"localhost:6379","password":"pass456","database":0}`, + wantPassword: "pass456", + wantErr: false, + }, + { + name: "empty when no password fields", + json: `{"host":"localhost:6379","database":0}`, + wantPassword: "", + wantErr: false, + }, + { + name: "invalid json", + json: `{invalid}`, + wantPassword: "", + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var cache RedisCache + err := cache.UnmarshalJSON([]byte(tt.json)) + if (err != nil) != tt.wantErr { + t.Errorf("UnmarshalJSON() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !tt.wantErr && cache.Password != tt.wantPassword { + t.Errorf("UnmarshalJSON() Password = %v, want %v", cache.Password, tt.wantPassword) + } + }) + } +} diff --git a/plugins/userprofileservice/services/redis_ups.go b/plugins/userprofileservice/services/redis_ups.go index d57ab2dd..5e76877c 100644 --- a/plugins/userprofileservice/services/redis_ups.go +++ b/plugins/userprofileservice/services/redis_ups.go @@ -23,6 +23,7 @@ import ( "time" "github.com/go-redis/redis/v8" + "github.com/optimizely/agent/pkg/utils/redisauth" "github.com/optimizely/agent/plugins/userprofileservice" "github.com/optimizely/go-sdk/v2/pkg/decision" "github.com/rs/zerolog/log" @@ -39,6 +40,35 @@ type RedisUserProfileService struct { Database int `json:"database"` } +// UnmarshalJSON implements custom JSON unmarshaling with flexible password field names +// Supports: auth_token, redis_secret, password (in order of preference) +// Fallback: REDIS_UPS_PASSWORD environment variable +func (u *RedisUserProfileService) UnmarshalJSON(data []byte) error { + // Use an alias type to avoid infinite recursion + type Alias RedisUserProfileService + alias := &struct { + *Alias + }{ + Alias: (*Alias)(u), + } + + // First, unmarshal normally to get all fields + if err := json.Unmarshal(data, alias); err != nil { + return err + } + + // Parse raw config to extract password with flexible field names + var rawConfig map[string]interface{} + if err := json.Unmarshal(data, &rawConfig); err != nil { + return err + } + + // Use redisauth utility to get password from flexible field names or env var + u.Password = redisauth.GetPassword(rawConfig, "REDIS_UPS_PASSWORD") + + return nil +} + // Lookup is used to retrieve past bucketing decisions for users func (u *RedisUserProfileService) Lookup(userID string) (profile decision.UserProfile) { profile = decision.UserProfile{ diff --git a/plugins/userprofileservice/services/redis_ups_test.go b/plugins/userprofileservice/services/redis_ups_test.go index 3e212f3b..6fac69e0 100644 --- a/plugins/userprofileservice/services/redis_ups_test.go +++ b/plugins/userprofileservice/services/redis_ups_test.go @@ -75,3 +75,57 @@ func (r *RedisUPSTestSuite) TestLookupNotSavedProfileID() { func TestRedisUPSTestSuite(t *testing.T) { suite.Run(t, new(RedisUPSTestSuite)) } + +func TestRedisUserProfileService_UnmarshalJSON(t *testing.T) { + tests := []struct { + name string + json string + wantPassword string + wantErr bool + }{ + { + name: "auth_token has priority", + json: `{"host":"localhost:6379","auth_token":"token123","password":"pass456","database":0}`, + wantPassword: "token123", + wantErr: false, + }, + { + name: "redis_secret when auth_token missing", + json: `{"host":"localhost:6379","redis_secret":"secret789","password":"pass456","database":0}`, + wantPassword: "secret789", + wantErr: false, + }, + { + name: "password when others missing", + json: `{"host":"localhost:6379","password":"pass456","database":0}`, + wantPassword: "pass456", + wantErr: false, + }, + { + name: "empty when no password fields", + json: `{"host":"localhost:6379","database":0}`, + wantPassword: "", + wantErr: false, + }, + { + name: "invalid json", + json: `{invalid}`, + wantPassword: "", + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var ups RedisUserProfileService + err := ups.UnmarshalJSON([]byte(tt.json)) + if (err != nil) != tt.wantErr { + t.Errorf("UnmarshalJSON() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !tt.wantErr && ups.Password != tt.wantPassword { + t.Errorf("UnmarshalJSON() Password = %v, want %v", ups.Password, tt.wantPassword) + } + }) + } +}