Skip to content

Commit 8835bef

Browse files
committed
add redis-streams
1 parent b37d2e3 commit 8835bef

File tree

6 files changed

+1675
-1
lines changed

6 files changed

+1675
-1
lines changed

config.yaml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,16 +256,36 @@ synchronization:
256256
## Agent publishes to channels: "optimizely-sync-{sdk_key}"
257257
## For external Redis clients: Subscribe "optimizely-sync-{sdk_key}" or PSubscribe "optimizely-sync-*"
258258
## Note: Channel configuration parsing is a known bug - planned for future release
259+
260+
## Redis Streams configuration (when using Redis Streams for notifications)
261+
## batch_size: number of messages to batch before sending (default: 10)
262+
# batch_size: 10
263+
## flush_interval: maximum time to wait before sending a partial batch (default: 5s)
264+
# flush_interval: 5s
265+
## max_retries: maximum number of retry attempts for failed operations (default: 3)
266+
# max_retries: 3
267+
## retry_delay: initial delay between retry attempts (default: 100ms)
268+
# retry_delay: 100ms
269+
## max_retry_delay: maximum delay between retry attempts with exponential backoff (default: 5s)
270+
# max_retry_delay: 5s
271+
## connection_timeout: timeout for Redis connections (default: 10s)
272+
# connection_timeout: 10s
259273
## if notification synchronization is enabled, then the active notification event-stream API
260274
## will get the notifications from available replicas
261275
notification:
262276
enable: false
277+
## Use "redis" for fire-and-forget pub/sub (existing behavior)
278+
## Use "redis-streams" for persistent message delivery with retries and acknowledgment
263279
default: "redis"
280+
# default: "redis-streams" # Uncomment to enable Redis Streams
264281
## if datafile synchronization is enabled, then for each webhook API call
265282
## the datafile will be sent to all available replicas to achieve better eventual consistency
266283
datafile:
267284
enable: false
285+
## Use "redis" for fire-and-forget pub/sub (existing behavior)
286+
## Use "redis-streams" for persistent message delivery with retries and acknowledgment
268287
default: "redis"
288+
# default: "redis-streams" # Uncomment to enable Redis Streams
269289

270290
##
271291
## cmab: Contextual Multi-Armed Bandit configuration

pkg/syncer/pubsub.go

Lines changed: 91 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package syncer
2020
import (
2121
"context"
2222
"errors"
23+
"time"
2324

2425
"github.com/optimizely/agent/config"
2526
"github.com/optimizely/agent/pkg/syncer/pubsub"
@@ -28,8 +29,10 @@ import (
2829
const (
2930
// PubSubDefaultChan will be used as default pubsub channel name
3031
PubSubDefaultChan = "optimizely-sync"
31-
// PubSubRedis is the name of pubsub type of Redis
32+
// PubSubRedis is the name of pubsub type of Redis (fire-and-forget)
3233
PubSubRedis = "redis"
34+
// PubSubRedisStreams is the name of pubsub type of Redis Streams (persistent)
35+
PubSubRedisStreams = "redis-streams"
3336
)
3437

3538
type SycnFeatureFlag string
@@ -48,12 +51,16 @@ func newPubSub(conf config.SyncConfig, featureFlag SycnFeatureFlag) (PubSub, err
4851
if featureFlag == SyncFeatureFlagNotificaiton {
4952
if conf.Notification.Default == PubSubRedis {
5053
return getPubSubRedis(conf)
54+
} else if conf.Notification.Default == PubSubRedisStreams {
55+
return getPubSubRedisStreams(conf)
5156
} else {
5257
return nil, errors.New("pubsub type not supported")
5358
}
5459
} else if featureFlag == SycnFeatureFlagDatafile {
5560
if conf.Datafile.Default == PubSubRedis {
5661
return getPubSubRedis(conf)
62+
} else if conf.Datafile.Default == PubSubRedisStreams {
63+
return getPubSubRedisStreams(conf)
5764
} else {
5865
return nil, errors.New("pubsub type not supported")
5966
}
@@ -99,9 +106,92 @@ func getPubSubRedis(conf config.SyncConfig) (PubSub, error) {
99106
return nil, errors.New("pubsub redis database not valid, database must be int")
100107
}
101108

109+
// Return original Redis pub/sub implementation (fire-and-forget)
102110
return &pubsub.Redis{
103111
Host: host,
104112
Password: password,
105113
Database: database,
106114
}, nil
107115
}
116+
117+
func getPubSubRedisStreams(conf config.SyncConfig) (PubSub, error) {
118+
pubsubConf, found := conf.Pubsub[PubSubRedis]
119+
if !found {
120+
return nil, errors.New("pubsub redis config not found")
121+
}
122+
123+
redisConf, ok := pubsubConf.(map[string]interface{})
124+
if !ok {
125+
return nil, errors.New("pubsub redis config not valid")
126+
}
127+
128+
hostVal, found := redisConf["host"]
129+
if !found {
130+
return nil, errors.New("pubsub redis host not found")
131+
}
132+
host, ok := hostVal.(string)
133+
if !ok {
134+
return nil, errors.New("pubsub redis host not valid, host must be string")
135+
}
136+
137+
passwordVal, found := redisConf["password"]
138+
if !found {
139+
return nil, errors.New("pubsub redis password not found")
140+
}
141+
password, ok := passwordVal.(string)
142+
if !ok {
143+
return nil, errors.New("pubsub redis password not valid, password must be string")
144+
}
145+
146+
databaseVal, found := redisConf["database"]
147+
if !found {
148+
return nil, errors.New("pubsub redis database not found")
149+
}
150+
database, ok := databaseVal.(int)
151+
if !ok {
152+
return nil, errors.New("pubsub redis database not valid, database must be int")
153+
}
154+
155+
// Parse optional Redis Streams configuration parameters
156+
batchSize := getIntFromConfig(redisConf, "batch_size", 10)
157+
flushInterval := getDurationFromConfig(redisConf, "flush_interval", 5*time.Second)
158+
maxRetries := getIntFromConfig(redisConf, "max_retries", 3)
159+
retryDelay := getDurationFromConfig(redisConf, "retry_delay", 100*time.Millisecond)
160+
maxRetryDelay := getDurationFromConfig(redisConf, "max_retry_delay", 5*time.Second)
161+
connTimeout := getDurationFromConfig(redisConf, "connection_timeout", 10*time.Second)
162+
163+
// Return Redis Streams implementation with configuration
164+
return &pubsub.RedisStreams{
165+
Host: host,
166+
Password: password,
167+
Database: database,
168+
BatchSize: batchSize,
169+
FlushInterval: flushInterval,
170+
MaxRetries: maxRetries,
171+
RetryDelay: retryDelay,
172+
MaxRetryDelay: maxRetryDelay,
173+
ConnTimeout: connTimeout,
174+
}, nil
175+
}
176+
177+
// getIntFromConfig safely extracts an integer value from config map with default fallback
178+
func getIntFromConfig(config map[string]interface{}, key string, defaultValue int) int {
179+
if val, found := config[key]; found {
180+
if intVal, ok := val.(int); ok {
181+
return intVal
182+
}
183+
}
184+
return defaultValue
185+
}
186+
187+
// getDurationFromConfig safely extracts a duration value from config map with default fallback
188+
func getDurationFromConfig(config map[string]interface{}, key string, defaultValue time.Duration) time.Duration {
189+
if val, found := config[key]; found {
190+
if strVal, ok := val.(string); ok {
191+
if duration, err := time.ParseDuration(strVal); err == nil {
192+
return duration
193+
}
194+
}
195+
}
196+
return defaultValue
197+
}

0 commit comments

Comments
 (0)