Skip to content

Commit 372b8d5

Browse files
committed
feat: Resolve memcached servers
In order to allow the user to pass a hostname on the command line (that might resolve to multiple IP addresses), perform DNS resolution when creating the Memcached client. The servers are resolved again on a periodic basis to account for possible changes in IP addresses (as might happen in a k8s cluster). Signed-off-by: Marcelo E. Magallon <marcelo.magallon@grafana.com>
1 parent b51599d commit 372b8d5

File tree

5 files changed

+623
-25
lines changed

5 files changed

+623
-25
lines changed

cmd/synthetic-monitoring-agent/main.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,7 @@ func run(args []string, stdout io.Writer) error {
248248

249249
// Initialize cache client (always non-nil, with fallback chain: memcached → local → noop)
250250
cacheClient := setupCache(
251+
ctx,
251252
config.CacheType,
252253
config.MemcachedServers,
253254
config.CacheLocalCapacity,
@@ -496,7 +497,7 @@ func setupGoMemLimit(ratio float64) error {
496497
return nil
497498
}
498499

499-
func setupCache(cacheType cache.Kind, memcachedServers []string, localCapacity int, localTTL time.Duration, logger *zerolog.Logger) cache.Cache {
500+
func setupCache(ctx context.Context, cacheType cache.Kind, memcachedServers []string, localCapacity int, localTTL time.Duration, logger *zerolog.Logger) cache.Cache {
500501
// Determine effective cache type with auto mode logic:
501502
// auto + servers provided -> memcached -> local -> noop
502503
// auto + no servers -> local -> noop
@@ -522,7 +523,7 @@ func setupCache(cacheType cache.Kind, memcachedServers []string, localCapacity i
522523
Timeout: 100 * time.Millisecond,
523524
}
524525

525-
cacheClient, err := cache.NewMemcachedClient(cacheConfig)
526+
cacheClient, err := cache.NewMemcachedClient(ctx, cacheConfig)
526527
if err != nil {
527528
logger.Warn().
528529
Err(err).

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
module github.com/grafana/synthetic-monitoring-agent
22

3-
go 1.24.0
3+
go 1.25.0
44

5-
toolchain go1.24.6
5+
toolchain go1.25.4
66

77
require (
88
github.com/go-kit/kit v0.13.0

internal/cache/memcached.go

Lines changed: 185 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,21 @@ const (
1818
// DefaultMaxIdleConns is the default maximum number of idle connections per server
1919
DefaultMaxIdleConns = 2
2020

21+
// DefaultRefreshInterval is the default interval for re-resolving server addresses
22+
DefaultRefreshInterval = 60 * time.Second
23+
24+
// DNSResolveTimeout is the timeout for DNS resolution operations
25+
DNSResolveTimeout = 5 * time.Second
26+
2127
// MaxKeyLength is the maximum key length supported by memcached
2228
MaxKeyLength = 250
2329
)
2430

31+
// Resolver is an interface for DNS resolution to allow injection in tests.
32+
type Resolver interface {
33+
LookupIP(ctx context.Context, network, host string) ([]net.IP, error)
34+
}
35+
2536
// MemcachedConfig holds configuration for the cache client.
2637
type MemcachedConfig struct {
2738
// Servers is the list of memcached server addresses (host:port)
@@ -32,12 +43,21 @@ type MemcachedConfig struct {
3243
Timeout time.Duration
3344
// MaxIdleConns is the maximum number of idle connections per server (optional, defaults to 2)
3445
MaxIdleConns int
46+
// RefreshInterval is the interval for re-resolving server addresses (optional, defaults to 60s)
47+
RefreshInterval time.Duration
48+
// Resolver is the DNS resolver to use (optional, defaults to net.DefaultResolver)
49+
// This is primarily for testing purposes.
50+
Resolver Resolver
3551
}
3652

3753
// MemcachedClient wraps the memcached client with additional functionality.
3854
type MemcachedClient struct {
39-
mc *memcache.Client
40-
logger zerolog.Logger
55+
mc *memcache.Client
56+
logger zerolog.Logger
57+
serverList *memcache.ServerList
58+
originalServers []string
59+
cancel context.CancelFunc
60+
resolver Resolver
4161
}
4262

4363
const KindMemcached Kind = "memcached"
@@ -48,7 +68,7 @@ var _ Cache = (*MemcachedClient)(nil)
4868
// NewMemcachedClient creates a new memcached cache client with the provided configuration.
4969
// It validates the configuration and returns an error if invalid.
5070
// Returns a Cache interface that can be used for all cache operations.
51-
func NewMemcachedClient(config MemcachedConfig) (*MemcachedClient, error) {
71+
func NewMemcachedClient(ctx context.Context, config MemcachedConfig) (*MemcachedClient, error) {
5272
// Validate that at least one server is provided
5373
if len(config.Servers) == 0 {
5474
return nil, fmt.Errorf("at least one memcached server must be provided")
@@ -71,25 +91,65 @@ func NewMemcachedClient(config MemcachedConfig) (*MemcachedClient, error) {
7191
if config.Timeout == 0 {
7292
config.Timeout = DefaultTimeout
7393
}
94+
7495
if config.MaxIdleConns == 0 {
7596
config.MaxIdleConns = DefaultMaxIdleConns
7697
}
7798

78-
// Create memcache client
79-
mc := memcache.New(config.Servers...)
99+
if config.RefreshInterval == 0 {
100+
config.RefreshInterval = DefaultRefreshInterval
101+
}
102+
103+
// Use default resolver if none provided
104+
resolver := config.Resolver
105+
if resolver == nil {
106+
resolver = net.DefaultResolver
107+
}
108+
109+
// Create a ServerList selector for dynamic address resolution
110+
serverList := &memcache.ServerList{}
111+
112+
// Resolve initial server addresses
113+
logger := config.Logger.With().Str("component", "cache").Logger()
114+
resolved := resolveServers(ctx, config.Servers, resolver, logger)
115+
116+
// Ensure we have at least one resolved server
117+
if len(resolved) == 0 {
118+
return nil, fmt.Errorf("no servers could be resolved from the provided list: %v", config.Servers)
119+
}
120+
121+
// Set initial servers in the selector
122+
if err := serverList.SetServers(resolved...); err != nil {
123+
return nil, fmt.Errorf("failed to set initial servers: %w", err)
124+
}
125+
126+
// Create memcache client with the selector
127+
mc := memcache.NewFromSelector(serverList)
80128
mc.Timeout = config.Timeout
81129
mc.MaxIdleConns = config.MaxIdleConns
82130

131+
// Create context for the refresh goroutine
132+
refreshCtx, cancel := context.WithCancel(ctx)
133+
83134
client := &MemcachedClient{
84-
mc: mc,
85-
logger: config.Logger.With().Str("component", "cache").Logger(),
135+
mc: mc,
136+
logger: logger,
137+
serverList: serverList,
138+
originalServers: config.Servers,
139+
cancel: cancel,
140+
resolver: resolver,
86141
}
87142

143+
// Start background goroutine to periodically refresh server addresses
144+
go client.refreshServers(refreshCtx, config.RefreshInterval)
145+
88146
client.logger.Info().
89147
Strs("servers", config.Servers).
148+
Int("resolved_count", len(resolved)).
90149
Dur("timeout", config.Timeout).
91150
Int("max_idle_conns", config.MaxIdleConns).
92-
Msg("cache client initialized")
151+
Dur("refresh_interval", config.RefreshInterval).
152+
Msg("cache client initialized with dynamic address resolution")
93153

94154
return client, nil
95155
}
@@ -222,6 +282,26 @@ func (c *MemcachedClient) Flush(ctx context.Context) error {
222282
return nil
223283
}
224284

285+
// Close stops the background server address refresh goroutine by cancelling its context.
286+
//
287+
// This method is optional if the parent context passed to NewMemcachedClient is already
288+
// cancelled during application shutdown. The refresh goroutine will automatically stop
289+
// when the parent context is cancelled.
290+
//
291+
// Call this method explicitly only if you need to stop the refresh goroutine before
292+
// the application shuts down or if you're managing the client lifecycle independently.
293+
//
294+
// Note: This is not part of the Cache interface, so it must be called explicitly
295+
// if cleanup is needed.
296+
func (c *MemcachedClient) Close() error {
297+
if c.cancel != nil {
298+
c.cancel()
299+
c.logger.Debug().Msg("cache client closed")
300+
}
301+
302+
return nil
303+
}
304+
225305
// validateKey validates a memcached key.
226306
func validateKey(key string) error {
227307
if key == "" {
@@ -232,3 +312,100 @@ func validateKey(key string) error {
232312
}
233313
return nil
234314
}
315+
316+
// resolveServers resolves all hostnames in the server list to their IP addresses.
317+
// Each hostname is resolved to potentially multiple IP addresses.
318+
// Returns a slice of "ip:port" strings ready for use with memcache.ServerList.SetServers.
319+
// IP addresses are used as-is without resolution.
320+
// Hostnames that fail to resolve are skipped (not included in the result).
321+
func resolveServers(ctx context.Context, servers []string, resolver Resolver, logger zerolog.Logger) []string {
322+
var resolved []string
323+
324+
for _, server := range servers {
325+
host, port, err := net.SplitHostPort(server)
326+
if err != nil {
327+
logger.Warn().Err(err).Str("server", server).Msg("failed to parse server address, skipping")
328+
continue
329+
}
330+
331+
// Check if host is already an IP address
332+
if ip := net.ParseIP(host); ip != nil {
333+
// Already an IP address, use as-is
334+
resolved = append(resolved, server)
335+
logger.Debug().Str("server", server).Msg("using IP address directly")
336+
337+
continue
338+
}
339+
340+
// Try to resolve the hostname
341+
ips, err := resolver.LookupIP(ctx, "ip", host)
342+
if err != nil {
343+
logger.Warn().Err(err).Str("host", host).Msg("failed to resolve hostname, skipping server")
344+
345+
continue
346+
}
347+
348+
if len(ips) == 0 {
349+
logger.Warn().Str("host", host).Msg("no IPs resolved for hostname, skipping server")
350+
351+
continue
352+
}
353+
354+
// Add all resolved IPs with the port
355+
for _, ip := range ips {
356+
addr := net.JoinHostPort(ip.String(), port)
357+
resolved = append(resolved, addr)
358+
}
359+
360+
logger.Debug().
361+
Str("host", host).
362+
Int("ip_count", len(ips)).
363+
Strs("resolved", resolved[len(resolved)-len(ips):]).
364+
Msg("resolved server addresses")
365+
}
366+
367+
return resolved
368+
}
369+
370+
// refreshServers periodically re-resolves server addresses and updates the ServerList.
371+
func (c *MemcachedClient) refreshServers(ctx context.Context, interval time.Duration) {
372+
ticker := time.NewTicker(interval)
373+
defer ticker.Stop()
374+
375+
for {
376+
select {
377+
case <-ctx.Done():
378+
c.logger.Debug().Msg("stopping server address refresh")
379+
380+
return
381+
382+
case <-ticker.C:
383+
// Create a timeout context for DNS resolution to prevent hanging
384+
resolveCtx, cancel := context.WithTimeout(ctx, DNSResolveTimeout)
385+
resolved := resolveServers(resolveCtx, c.originalServers, c.resolver, c.logger)
386+
cancel()
387+
388+
if len(resolved) == 0 {
389+
c.logger.Error().
390+
Strs("servers", c.originalServers).
391+
Msg("failed to refresh server addresses")
392+
393+
continue
394+
}
395+
396+
if err := c.serverList.SetServers(resolved...); err != nil {
397+
c.logger.Error().
398+
Err(err).
399+
Strs("servers", resolved).
400+
Msg("failed to update server addresses")
401+
402+
continue
403+
}
404+
405+
c.logger.Debug().
406+
Int("server_count", len(resolved)).
407+
Strs("servers", resolved).
408+
Msg("refreshed server addresses")
409+
}
410+
}
411+
}

0 commit comments

Comments
 (0)