Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cmd/synthetic-monitoring-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ func run(args []string, stdout io.Writer) error {

// Initialize cache client (always non-nil, with fallback chain: memcached → local → noop)
cacheClient := setupCache(
ctx,
config.CacheType,
config.MemcachedServers,
config.CacheLocalCapacity,
Expand Down Expand Up @@ -496,7 +497,7 @@ func setupGoMemLimit(ratio float64) error {
return nil
}

func setupCache(cacheType cache.Kind, memcachedServers []string, localCapacity int, localTTL time.Duration, logger *zerolog.Logger) cache.Cache {
func setupCache(ctx context.Context, cacheType cache.Kind, memcachedServers []string, localCapacity int, localTTL time.Duration, logger *zerolog.Logger) cache.Cache {
// Determine effective cache type with auto mode logic:
// auto + servers provided -> memcached -> local -> noop
// auto + no servers -> local -> noop
Expand All @@ -522,7 +523,7 @@ func setupCache(cacheType cache.Kind, memcachedServers []string, localCapacity i
Timeout: 100 * time.Millisecond,
}

cacheClient, err := cache.NewMemcachedClient(cacheConfig)
cacheClient, err := cache.NewMemcachedClient(ctx, cacheConfig)
if err != nil {
logger.Warn().
Err(err).
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
module github.com/grafana/synthetic-monitoring-agent

go 1.24.0
go 1.25.0

toolchain go1.24.6
toolchain go1.25.4

require (
github.com/go-kit/kit v0.13.0
Expand Down
193 changes: 185 additions & 8 deletions internal/cache/memcached.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,21 @@ const (
// DefaultMaxIdleConns is the default maximum number of idle connections per server
DefaultMaxIdleConns = 2

// DefaultRefreshInterval is the default interval for re-resolving server addresses
DefaultRefreshInterval = 60 * time.Second

// DNSResolveTimeout is the timeout for DNS resolution operations
DNSResolveTimeout = 5 * time.Second

// MaxKeyLength is the maximum key length supported by memcached
MaxKeyLength = 250
)

// Resolver is an interface for DNS resolution to allow injection in tests.
type Resolver interface {
LookupIP(ctx context.Context, network, host string) ([]net.IP, error)
}

// MemcachedConfig holds configuration for the cache client.
type MemcachedConfig struct {
// Servers is the list of memcached server addresses (host:port)
Expand All @@ -32,12 +43,21 @@ type MemcachedConfig struct {
Timeout time.Duration
// MaxIdleConns is the maximum number of idle connections per server (optional, defaults to 2)
MaxIdleConns int
// RefreshInterval is the interval for re-resolving server addresses (optional, defaults to 60s)
RefreshInterval time.Duration
// Resolver is the DNS resolver to use (optional, defaults to net.DefaultResolver)
// This is primarily for testing purposes.
Resolver Resolver
}

// MemcachedClient wraps the memcached client with additional functionality.
type MemcachedClient struct {
mc *memcache.Client
logger zerolog.Logger
mc *memcache.Client
logger zerolog.Logger
serverList *memcache.ServerList
originalServers []string
cancel context.CancelFunc
resolver Resolver
}

const KindMemcached Kind = "memcached"
Expand All @@ -48,7 +68,7 @@ var _ Cache = (*MemcachedClient)(nil)
// NewMemcachedClient creates a new memcached cache client with the provided configuration.
// It validates the configuration and returns an error if invalid.
// Returns a Cache interface that can be used for all cache operations.
func NewMemcachedClient(config MemcachedConfig) (*MemcachedClient, error) {
func NewMemcachedClient(ctx context.Context, config MemcachedConfig) (*MemcachedClient, error) {
// Validate that at least one server is provided
if len(config.Servers) == 0 {
return nil, fmt.Errorf("at least one memcached server must be provided")
Expand All @@ -71,25 +91,65 @@ func NewMemcachedClient(config MemcachedConfig) (*MemcachedClient, error) {
if config.Timeout == 0 {
config.Timeout = DefaultTimeout
}

if config.MaxIdleConns == 0 {
config.MaxIdleConns = DefaultMaxIdleConns
}

// Create memcache client
mc := memcache.New(config.Servers...)
if config.RefreshInterval == 0 {
config.RefreshInterval = DefaultRefreshInterval
}

// Use default resolver if none provided
resolver := config.Resolver
if resolver == nil {
resolver = net.DefaultResolver
}

// Create a ServerList selector for dynamic address resolution
serverList := &memcache.ServerList{}

// Resolve initial server addresses
logger := config.Logger.With().Str("component", "cache").Logger()
resolved := resolveServers(ctx, config.Servers, resolver, logger)

// Ensure we have at least one resolved server
if len(resolved) == 0 {
return nil, fmt.Errorf("no servers could be resolved from the provided list: %v", config.Servers)
}

// Set initial servers in the selector
if err := serverList.SetServers(resolved...); err != nil {
return nil, fmt.Errorf("failed to set initial servers: %w", err)
}

// Create memcache client with the selector
mc := memcache.NewFromSelector(serverList)
mc.Timeout = config.Timeout
mc.MaxIdleConns = config.MaxIdleConns

// Create context for the refresh goroutine
refreshCtx, cancel := context.WithCancel(ctx)

client := &MemcachedClient{
mc: mc,
logger: config.Logger.With().Str("component", "cache").Logger(),
mc: mc,
logger: logger,
serverList: serverList,
originalServers: config.Servers,
cancel: cancel,
resolver: resolver,
}

// Start background goroutine to periodically refresh server addresses
go client.refreshServers(refreshCtx, config.RefreshInterval)

client.logger.Info().
Strs("servers", config.Servers).
Int("resolved_count", len(resolved)).
Dur("timeout", config.Timeout).
Int("max_idle_conns", config.MaxIdleConns).
Msg("cache client initialized")
Dur("refresh_interval", config.RefreshInterval).
Msg("cache client initialized with dynamic address resolution")

return client, nil
}
Expand Down Expand Up @@ -222,6 +282,26 @@ func (c *MemcachedClient) Flush(ctx context.Context) error {
return nil
}

// Close stops the background server address refresh goroutine by cancelling its context.
//
// This method is optional if the parent context passed to NewMemcachedClient is already
// cancelled during application shutdown. The refresh goroutine will automatically stop
// when the parent context is cancelled.
//
// Call this method explicitly only if you need to stop the refresh goroutine before
// the application shuts down or if you're managing the client lifecycle independently.
//
// Note: This is not part of the Cache interface, so it must be called explicitly
// if cleanup is needed.
func (c *MemcachedClient) Close() error {
if c.cancel != nil {
c.cancel()
c.logger.Debug().Msg("cache client closed")
}

return nil
}

// validateKey validates a memcached key.
func validateKey(key string) error {
if key == "" {
Expand All @@ -232,3 +312,100 @@ func validateKey(key string) error {
}
return nil
}

// resolveServers resolves all hostnames in the server list to their IP addresses.
// Each hostname is resolved to potentially multiple IP addresses.
// Returns a slice of "ip:port" strings ready for use with memcache.ServerList.SetServers.
// IP addresses are used as-is without resolution.
// Hostnames that fail to resolve are skipped (not included in the result).
func resolveServers(ctx context.Context, servers []string, resolver Resolver, logger zerolog.Logger) []string {
var resolved []string

for _, server := range servers {
host, port, err := net.SplitHostPort(server)
if err != nil {
logger.Warn().Err(err).Str("server", server).Msg("failed to parse server address, skipping")
continue
}

// Check if host is already an IP address
if ip := net.ParseIP(host); ip != nil {
// Already an IP address, use as-is
resolved = append(resolved, server)
logger.Debug().Str("server", server).Msg("using IP address directly")

continue
}

// Try to resolve the hostname
ips, err := resolver.LookupIP(ctx, "ip", host)
if err != nil {
logger.Warn().Err(err).Str("host", host).Msg("failed to resolve hostname, skipping server")

continue
}

if len(ips) == 0 {
logger.Warn().Str("host", host).Msg("no IPs resolved for hostname, skipping server")

continue
}

// Add all resolved IPs with the port
for _, ip := range ips {
addr := net.JoinHostPort(ip.String(), port)
resolved = append(resolved, addr)
}

logger.Debug().
Str("host", host).
Int("ip_count", len(ips)).
Strs("resolved", resolved[len(resolved)-len(ips):]).
Msg("resolved server addresses")
}

return resolved
}

// refreshServers periodically re-resolves server addresses and updates the ServerList.
func (c *MemcachedClient) refreshServers(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
c.logger.Debug().Msg("stopping server address refresh")

return

case <-ticker.C:
// Create a timeout context for DNS resolution to prevent hanging
resolveCtx, cancel := context.WithTimeout(ctx, DNSResolveTimeout)
resolved := resolveServers(resolveCtx, c.originalServers, c.resolver, c.logger)
cancel()

if len(resolved) == 0 {
c.logger.Error().
Strs("servers", c.originalServers).
Msg("failed to refresh server addresses")

continue
}

if err := c.serverList.SetServers(resolved...); err != nil {
c.logger.Error().
Err(err).
Strs("servers", resolved).
Msg("failed to update server addresses")

continue
}

c.logger.Debug().
Int("server_count", len(resolved)).
Strs("servers", resolved).
Msg("refreshed server addresses")
}
}
}
Loading
Loading