diff --git a/CHANGELOG.md b/CHANGELOG.md index b124f70faa..32bf7f47b1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ * [ENHANCEMENT] Ingester: Add `enable_matcher_optimization` config to apply low selectivity matchers lazily. #7063 * [ENHANCEMENT] Distributor: Add a label references validation for remote write v2 request. #7074 * [ENHANCEMENT] Distributor: Add count, spans, and buckets validations for native histogram. #7072 +* [ENHANCEMENT] Querier: Add a `-querier.parquet-queryable-shard-cache-ttl` flag to add TTL to parquet shard cache. #7098 * [BUGFIX] Compactor: Avoid race condition which allow a grouper to not compact all partitions. #7082 * [BUGFIX] Fix bug where validating metric names uses the wrong validation logic. #7086 * [BUGFIX] Ring: Change DynamoDB KV to retry indefinetly for WatchKey. #7088 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index be429ba38d..61a71e9b06 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -302,6 +302,10 @@ querier: # queryable. # CLI flag: -querier.parquet-queryable-fallback-disabled [parquet_queryable_fallback_disabled: | default = false] + + # [Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL. + # CLI flag: -querier.parquet-queryable-shard-cache-ttl + [parquet_queryable_shard_cache_ttl: | default = 24h] ``` ### `blocks_storage_config` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 200d23476d..a0c67fa557 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4790,6 +4790,10 @@ thanos_engine: # need to make sure Parquet files are created before it is queryable. # CLI flag: -querier.parquet-queryable-fallback-disabled [parquet_queryable_fallback_disabled: | default = false] + +# [Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL. +# CLI flag: -querier.parquet-queryable-shard-cache-ttl +[parquet_queryable_shard_cache_ttl: | default = 24h] ``` ### `query_frontend_config` diff --git a/pkg/querier/parquet_queryable.go b/pkg/querier/parquet_queryable.go index 502a635534..663e985ac5 100644 --- a/pkg/querier/parquet_queryable.go +++ b/pkg/querier/parquet_queryable.go @@ -51,6 +51,8 @@ const ( parquetBlockStore blockStoreType = "parquet" ) +const defaultMaintenanceInterval = time.Minute + var ( validBlockStoreTypes = []blockStoreType{tsdbBlockStore, parquetBlockStore} ) @@ -97,6 +99,7 @@ type parquetQueryableWithFallback struct { fallbackDisabled bool queryStoreAfter time.Duration parquetQueryable storage.Queryable + cache cacheInterface[parquet_storage.ParquetShard] blockStorageQueryable *BlocksStoreQueryable finder BlocksFinder @@ -132,7 +135,7 @@ func NewParquetQueryable( return nil, err } - cache, err := newCache[parquet_storage.ParquetShard]("parquet-shards", config.ParquetQueryableShardCacheSize, newCacheMetrics(reg)) + cache, err := newCache[parquet_storage.ParquetShard]("parquet-shards", config.ParquetQueryableShardCacheSize, config.ParquetQueryableShardCacheTTL, defaultMaintenanceInterval, newCacheMetrics(reg)) if err != nil { return nil, err } @@ -248,6 +251,7 @@ func NewParquetQueryable( subservices: manager, blockStorageQueryable: blockStorageQueryable, parquetQueryable: parquetQueryable, + cache: cache, queryStoreAfter: config.QueryStoreAfter, subservicesWatcher: services.NewFailureWatcher(), finder: blockStorageQueryable.finder, @@ -283,6 +287,10 @@ func (p *parquetQueryableWithFallback) running(ctx context.Context) error { } func (p *parquetQueryableWithFallback) stopping(_ error) error { + if p.cache != nil { + p.cache.Close() + } + return services.StopManagerAndAwaitStopped(context.Background(), p.subservices) } @@ -613,6 +621,7 @@ func materializedLabelsFilterCallback(ctx context.Context, _ *storage.SelectHint type cacheInterface[T any] interface { Get(path string) T Set(path string, reader T) + Close() } type cacheMetrics struct { @@ -643,17 +652,24 @@ func newCacheMetrics(reg prometheus.Registerer) *cacheMetrics { } } +type cacheEntry[T any] struct { + value T + expiresAt time.Time +} + type Cache[T any] struct { - cache *lru.Cache[string, T] + cache *lru.Cache[string, *cacheEntry[T]] name string metrics *cacheMetrics + ttl time.Duration + stopCh chan struct{} } -func newCache[T any](name string, size int, metrics *cacheMetrics) (cacheInterface[T], error) { +func newCache[T any](name string, size int, ttl, maintenanceInterval time.Duration, metrics *cacheMetrics) (cacheInterface[T], error) { if size <= 0 { return &noopCache[T]{}, nil } - cache, err := lru.NewWithEvict(size, func(key string, value T) { + cache, err := lru.NewWithEvict(size, func(key string, value *cacheEntry[T]) { metrics.evictions.WithLabelValues(name).Inc() metrics.size.WithLabelValues(name).Dec() }) @@ -661,17 +677,56 @@ func newCache[T any](name string, size int, metrics *cacheMetrics) (cacheInterfa return nil, err } - return &Cache[T]{ + c := &Cache[T]{ cache: cache, name: name, metrics: metrics, - }, nil + ttl: ttl, + stopCh: make(chan struct{}), + } + + if ttl > 0 { + go c.maintenanceLoop(maintenanceInterval) + } + + return c, nil +} + +func (c *Cache[T]) maintenanceLoop(interval time.Duration) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + now := time.Now() + keys := c.cache.Keys() + for _, key := range keys { + if entry, ok := c.cache.Peek(key); ok { + // we use a Peek() because the Get() change LRU order. + if !entry.expiresAt.IsZero() && now.After(entry.expiresAt) { + c.cache.Remove(key) + } + } + } + case <-c.stopCh: + return + } + } } func (c *Cache[T]) Get(path string) (r T) { - if reader, ok := c.cache.Get(path); ok { + if entry, ok := c.cache.Get(path); ok { + isExpired := !entry.expiresAt.IsZero() && time.Now().After(entry.expiresAt) + + if isExpired { + c.cache.Remove(path) + c.metrics.misses.WithLabelValues(c.name).Inc() + return + } + c.metrics.hits.WithLabelValues(c.name).Inc() - return reader + return entry.value } c.metrics.misses.WithLabelValues(c.name).Inc() return @@ -681,8 +736,22 @@ func (c *Cache[T]) Set(path string, reader T) { if !c.cache.Contains(path) { c.metrics.size.WithLabelValues(c.name).Inc() } - c.metrics.misses.WithLabelValues(c.name).Inc() - c.cache.Add(path, reader) + + var expiresAt time.Time + if c.ttl > 0 { + expiresAt = time.Now().Add(c.ttl) + } + + entry := &cacheEntry[T]{ + value: reader, + expiresAt: expiresAt, + } + + c.cache.Add(path, entry) +} + +func (c *Cache[T]) Close() { + close(c.stopCh) } type noopCache[T any] struct { @@ -696,6 +765,10 @@ func (n noopCache[T]) Set(_ string, _ T) { } +func (n noopCache[T]) Close() { + +} + var ( shardInfoCtxKey contextKey = 1 ) diff --git a/pkg/querier/parquet_queryable_test.go b/pkg/querier/parquet_queryable_test.go index 263bb36149..f8895eb550 100644 --- a/pkg/querier/parquet_queryable_test.go +++ b/pkg/querier/parquet_queryable_test.go @@ -1,6 +1,7 @@ package querier import ( + "bytes" "context" "fmt" "math/rand" @@ -14,6 +15,7 @@ import ( "github.com/oklog/ulid/v2" "github.com/prometheus-community/parquet-common/convert" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" @@ -881,3 +883,110 @@ func TestParquetQueryableFallbackDisabled(t *testing.T) { }) }) } + +func Test_Cache_LRUEviction(t *testing.T) { + reg := prometheus.NewRegistry() + metrics := newCacheMetrics(reg) + cache, err := newCache[string]("test", 2, 0, time.Minute, metrics) + require.NoError(t, err) + defer cache.Close() + + cache.Set("key1", "value1") + cache.Set("key2", "value2") + + _ = cache.Get("key1") // hit + // "key2" deleted by LRU eviction + cache.Set("key3", "value3") + + val1 := cache.Get("key1") // hit + require.Equal(t, "value1", val1) + val3 := cache.Get("key3") // hit + require.Equal(t, "value3", val3) + val2 := cache.Get("key2") // miss + require.Equal(t, "", val2) + + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP cortex_parquet_queryable_cache_evictions_total Total number of parquet cache evictions + # TYPE cortex_parquet_queryable_cache_evictions_total counter + cortex_parquet_queryable_cache_evictions_total{name="test"} 1 + # HELP cortex_parquet_queryable_cache_hits_total Total number of parquet cache hits + # TYPE cortex_parquet_queryable_cache_hits_total counter + cortex_parquet_queryable_cache_hits_total{name="test"} 3 + # HELP cortex_parquet_queryable_cache_item_count Current number of cached parquet items + # TYPE cortex_parquet_queryable_cache_item_count gauge + cortex_parquet_queryable_cache_item_count{name="test"} 2 + # HELP cortex_parquet_queryable_cache_misses_total Total number of parquet cache misses + # TYPE cortex_parquet_queryable_cache_misses_total counter + cortex_parquet_queryable_cache_misses_total{name="test"} 1 + `))) +} + +func Test_Cache_TTLEvictionByGet(t *testing.T) { + reg := prometheus.NewRegistry() + metrics := newCacheMetrics(reg) + + cache, err := newCache[string]("test", 10, 100*time.Millisecond, time.Minute, metrics) + require.NoError(t, err) + defer cache.Close() + + cache.Set("key1", "value1") + + val := cache.Get("key1") + require.Equal(t, "value1", val) + + // sleep longer than TTL + time.Sleep(150 * time.Millisecond) + + val = cache.Get("key1") + require.Equal(t, "", val) + + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP cortex_parquet_queryable_cache_evictions_total Total number of parquet cache evictions + # TYPE cortex_parquet_queryable_cache_evictions_total counter + cortex_parquet_queryable_cache_evictions_total{name="test"} 1 + # HELP cortex_parquet_queryable_cache_hits_total Total number of parquet cache hits + # TYPE cortex_parquet_queryable_cache_hits_total counter + cortex_parquet_queryable_cache_hits_total{name="test"} 1 + # HELP cortex_parquet_queryable_cache_item_count Current number of cached parquet items + # TYPE cortex_parquet_queryable_cache_item_count gauge + cortex_parquet_queryable_cache_item_count{name="test"} 0 + # HELP cortex_parquet_queryable_cache_misses_total Total number of parquet cache misses + # TYPE cortex_parquet_queryable_cache_misses_total counter + cortex_parquet_queryable_cache_misses_total{name="test"} 1 + `))) +} + +func Test_Cache_TTLEvictionByLoop(t *testing.T) { + reg := prometheus.NewRegistry() + metrics := newCacheMetrics(reg) + + cache, err := newCache[string]("test", 10, 100*time.Millisecond, 100*time.Millisecond, metrics) + require.NoError(t, err) + defer cache.Close() + + cache.Set("key1", "value1") + + val := cache.Get("key1") + require.Equal(t, "value1", val) + + // sleep longer than TTL + time.Sleep(150 * time.Millisecond) + + if c, ok := cache.(*Cache[string]); ok { + // should delete by maintenance loop + _, ok := c.cache.Peek("key1") + require.False(t, ok) + } + + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP cortex_parquet_queryable_cache_evictions_total Total number of parquet cache evictions + # TYPE cortex_parquet_queryable_cache_evictions_total counter + cortex_parquet_queryable_cache_evictions_total{name="test"} 1 + # HELP cortex_parquet_queryable_cache_hits_total Total number of parquet cache hits + # TYPE cortex_parquet_queryable_cache_hits_total counter + cortex_parquet_queryable_cache_hits_total{name="test"} 1 + # HELP cortex_parquet_queryable_cache_item_count Current number of cached parquet items + # TYPE cortex_parquet_queryable_cache_item_count gauge + cortex_parquet_queryable_cache_item_count{name="test"} 0 + `))) +} diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index f84f07b967..8818f1266a 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -92,10 +92,11 @@ type Config struct { EnablePromQLExperimentalFunctions bool `yaml:"enable_promql_experimental_functions"` // Query Parquet files if available - EnableParquetQueryable bool `yaml:"enable_parquet_queryable"` - ParquetQueryableShardCacheSize int `yaml:"parquet_queryable_shard_cache_size"` - ParquetQueryableDefaultBlockStore string `yaml:"parquet_queryable_default_block_store"` - ParquetQueryableFallbackDisabled bool `yaml:"parquet_queryable_fallback_disabled"` + EnableParquetQueryable bool `yaml:"enable_parquet_queryable"` + ParquetQueryableShardCacheSize int `yaml:"parquet_queryable_shard_cache_size"` + ParquetQueryableDefaultBlockStore string `yaml:"parquet_queryable_default_block_store"` + ParquetQueryableFallbackDisabled bool `yaml:"parquet_queryable_fallback_disabled"` + ParquetQueryableShardCacheTTL time.Duration `yaml:"parquet_queryable_shard_cache_ttl"` DistributedExecEnabled bool `yaml:"distributed_exec_enabled" doc:"hidden"` } @@ -147,6 +148,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.EnablePromQLExperimentalFunctions, "querier.enable-promql-experimental-functions", false, "[Experimental] If true, experimental promQL functions are enabled.") f.BoolVar(&cfg.EnableParquetQueryable, "querier.enable-parquet-queryable", false, "[Experimental] If true, querier will try to query the parquet files if available.") f.IntVar(&cfg.ParquetQueryableShardCacheSize, "querier.parquet-queryable-shard-cache-size", 512, "[Experimental] Maximum size of the Parquet queryable shard cache. 0 to disable.") + f.DurationVar(&cfg.ParquetQueryableShardCacheTTL, "querier.parquet-queryable-shard-cache-ttl", 24*time.Hour, "[Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL.") f.StringVar(&cfg.ParquetQueryableDefaultBlockStore, "querier.parquet-queryable-default-block-store", string(parquetBlockStore), "[Experimental] Parquet queryable's default block store to query. Valid options are tsdb and parquet. If it is set to tsdb, parquet queryable always fallback to store gateway.") f.BoolVar(&cfg.DistributedExecEnabled, "querier.distributed-exec-enabled", false, "Experimental: Enables distributed execution of queries by passing logical query plan fragments to downstream components.") f.BoolVar(&cfg.ParquetQueryableFallbackDisabled, "querier.parquet-queryable-fallback-disabled", false, "[Experimental] Disable Parquet queryable to fallback queries to Store Gateway if the block is not available as Parquet files but available in TSDB. Setting this to true will disable the fallback and users can remove Store Gateway. But need to make sure Parquet files are created before it is queryable.") diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 8b748f0c8d..1e4a246cd2 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -5801,6 +5801,13 @@ "type": "number", "x-cli-flag": "querier.parquet-queryable-shard-cache-size" }, + "parquet_queryable_shard_cache_ttl": { + "default": "24h0m0s", + "description": "[Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL.", + "type": "string", + "x-cli-flag": "querier.parquet-queryable-shard-cache-ttl", + "x-format": "duration" + }, "per_step_stats_enabled": { "default": false, "description": "Enable returning samples stats per steps in query response.",