From 8b0a70a2094faa0a8fb3709068363c58f297ed81 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Fri, 7 Nov 2025 21:55:14 +0900 Subject: [PATCH 1/2] Add parquet shard cache TTL Signed-off-by: SungJin1212 --- CHANGELOG.md | 1 + docs/blocks-storage/querier.md | 4 + docs/configuration/config-file-reference.md | 4 + pkg/querier/parquet_queryable.go | 93 +++++++++++++++-- pkg/querier/parquet_queryable_test.go | 109 ++++++++++++++++++++ pkg/querier/querier.go | 10 +- schemas/cortex-config-schema.json | 7 ++ 7 files changed, 214 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b124f70faab..32bf7f47b14 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ * [ENHANCEMENT] Ingester: Add `enable_matcher_optimization` config to apply low selectivity matchers lazily. #7063 * [ENHANCEMENT] Distributor: Add a label references validation for remote write v2 request. #7074 * [ENHANCEMENT] Distributor: Add count, spans, and buckets validations for native histogram. #7072 +* [ENHANCEMENT] Querier: Add a `-querier.parquet-queryable-shard-cache-ttl` flag to add TTL to parquet shard cache. #7098 * [BUGFIX] Compactor: Avoid race condition which allow a grouper to not compact all partitions. #7082 * [BUGFIX] Fix bug where validating metric names uses the wrong validation logic. #7086 * [BUGFIX] Ring: Change DynamoDB KV to retry indefinetly for WatchKey. #7088 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index be429ba38d6..61a71e9b06b 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -302,6 +302,10 @@ querier: # queryable. # CLI flag: -querier.parquet-queryable-fallback-disabled [parquet_queryable_fallback_disabled: | default = false] + + # [Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL. + # CLI flag: -querier.parquet-queryable-shard-cache-ttl + [parquet_queryable_shard_cache_ttl: | default = 24h] ``` ### `blocks_storage_config` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 200d23476de..a0c67fa5570 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4790,6 +4790,10 @@ thanos_engine: # need to make sure Parquet files are created before it is queryable. # CLI flag: -querier.parquet-queryable-fallback-disabled [parquet_queryable_fallback_disabled: | default = false] + +# [Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL. +# CLI flag: -querier.parquet-queryable-shard-cache-ttl +[parquet_queryable_shard_cache_ttl: | default = 24h] ``` ### `query_frontend_config` diff --git a/pkg/querier/parquet_queryable.go b/pkg/querier/parquet_queryable.go index 502a635534b..663e985ac53 100644 --- a/pkg/querier/parquet_queryable.go +++ b/pkg/querier/parquet_queryable.go @@ -51,6 +51,8 @@ const ( parquetBlockStore blockStoreType = "parquet" ) +const defaultMaintenanceInterval = time.Minute + var ( validBlockStoreTypes = []blockStoreType{tsdbBlockStore, parquetBlockStore} ) @@ -97,6 +99,7 @@ type parquetQueryableWithFallback struct { fallbackDisabled bool queryStoreAfter time.Duration parquetQueryable storage.Queryable + cache cacheInterface[parquet_storage.ParquetShard] blockStorageQueryable *BlocksStoreQueryable finder BlocksFinder @@ -132,7 +135,7 @@ func NewParquetQueryable( return nil, err } - cache, err := newCache[parquet_storage.ParquetShard]("parquet-shards", config.ParquetQueryableShardCacheSize, newCacheMetrics(reg)) + cache, err := newCache[parquet_storage.ParquetShard]("parquet-shards", config.ParquetQueryableShardCacheSize, config.ParquetQueryableShardCacheTTL, defaultMaintenanceInterval, newCacheMetrics(reg)) if err != nil { return nil, err } @@ -248,6 +251,7 @@ func NewParquetQueryable( subservices: manager, blockStorageQueryable: blockStorageQueryable, parquetQueryable: parquetQueryable, + cache: cache, queryStoreAfter: config.QueryStoreAfter, subservicesWatcher: services.NewFailureWatcher(), finder: blockStorageQueryable.finder, @@ -283,6 +287,10 @@ func (p *parquetQueryableWithFallback) running(ctx context.Context) error { } func (p *parquetQueryableWithFallback) stopping(_ error) error { + if p.cache != nil { + p.cache.Close() + } + return services.StopManagerAndAwaitStopped(context.Background(), p.subservices) } @@ -613,6 +621,7 @@ func materializedLabelsFilterCallback(ctx context.Context, _ *storage.SelectHint type cacheInterface[T any] interface { Get(path string) T Set(path string, reader T) + Close() } type cacheMetrics struct { @@ -643,17 +652,24 @@ func newCacheMetrics(reg prometheus.Registerer) *cacheMetrics { } } +type cacheEntry[T any] struct { + value T + expiresAt time.Time +} + type Cache[T any] struct { - cache *lru.Cache[string, T] + cache *lru.Cache[string, *cacheEntry[T]] name string metrics *cacheMetrics + ttl time.Duration + stopCh chan struct{} } -func newCache[T any](name string, size int, metrics *cacheMetrics) (cacheInterface[T], error) { +func newCache[T any](name string, size int, ttl, maintenanceInterval time.Duration, metrics *cacheMetrics) (cacheInterface[T], error) { if size <= 0 { return &noopCache[T]{}, nil } - cache, err := lru.NewWithEvict(size, func(key string, value T) { + cache, err := lru.NewWithEvict(size, func(key string, value *cacheEntry[T]) { metrics.evictions.WithLabelValues(name).Inc() metrics.size.WithLabelValues(name).Dec() }) @@ -661,17 +677,56 @@ func newCache[T any](name string, size int, metrics *cacheMetrics) (cacheInterfa return nil, err } - return &Cache[T]{ + c := &Cache[T]{ cache: cache, name: name, metrics: metrics, - }, nil + ttl: ttl, + stopCh: make(chan struct{}), + } + + if ttl > 0 { + go c.maintenanceLoop(maintenanceInterval) + } + + return c, nil +} + +func (c *Cache[T]) maintenanceLoop(interval time.Duration) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + now := time.Now() + keys := c.cache.Keys() + for _, key := range keys { + if entry, ok := c.cache.Peek(key); ok { + // we use a Peek() because the Get() change LRU order. + if !entry.expiresAt.IsZero() && now.After(entry.expiresAt) { + c.cache.Remove(key) + } + } + } + case <-c.stopCh: + return + } + } } func (c *Cache[T]) Get(path string) (r T) { - if reader, ok := c.cache.Get(path); ok { + if entry, ok := c.cache.Get(path); ok { + isExpired := !entry.expiresAt.IsZero() && time.Now().After(entry.expiresAt) + + if isExpired { + c.cache.Remove(path) + c.metrics.misses.WithLabelValues(c.name).Inc() + return + } + c.metrics.hits.WithLabelValues(c.name).Inc() - return reader + return entry.value } c.metrics.misses.WithLabelValues(c.name).Inc() return @@ -681,8 +736,22 @@ func (c *Cache[T]) Set(path string, reader T) { if !c.cache.Contains(path) { c.metrics.size.WithLabelValues(c.name).Inc() } - c.metrics.misses.WithLabelValues(c.name).Inc() - c.cache.Add(path, reader) + + var expiresAt time.Time + if c.ttl > 0 { + expiresAt = time.Now().Add(c.ttl) + } + + entry := &cacheEntry[T]{ + value: reader, + expiresAt: expiresAt, + } + + c.cache.Add(path, entry) +} + +func (c *Cache[T]) Close() { + close(c.stopCh) } type noopCache[T any] struct { @@ -696,6 +765,10 @@ func (n noopCache[T]) Set(_ string, _ T) { } +func (n noopCache[T]) Close() { + +} + var ( shardInfoCtxKey contextKey = 1 ) diff --git a/pkg/querier/parquet_queryable_test.go b/pkg/querier/parquet_queryable_test.go index 263bb361490..9626a9324e2 100644 --- a/pkg/querier/parquet_queryable_test.go +++ b/pkg/querier/parquet_queryable_test.go @@ -1,6 +1,7 @@ package querier import ( + "bytes" "context" "fmt" "math/rand" @@ -14,6 +15,7 @@ import ( "github.com/oklog/ulid/v2" "github.com/prometheus-community/parquet-common/convert" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" @@ -881,3 +883,110 @@ func TestParquetQueryableFallbackDisabled(t *testing.T) { }) }) } + +func Test_Cache_LRUEviction(t *testing.T) { + reg := prometheus.NewRegistry() + metrics := newCacheMetrics(reg) + cache, err := newCache[string]("test", 2, 0, time.Minute, metrics) + require.NoError(t, err) + defer cache.Close() + + cache.Set("key1", "value1") + cache.Set("key2", "value2") + + _ = cache.Get("key1") // hit + // "key2" deleted by LRU eviction + cache.Set("key3", "value3") + + val1 := cache.Get("key1") // hit + require.Equal(t, "value1", val1) + val3 := cache.Get("key3") // hit + require.Equal(t, "value3", val3) + val2 := cache.Get("key2") // miss + require.Equal(t, "", val2) + + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(fmt.Sprintf(` + # HELP cortex_parquet_queryable_cache_evictions_total Total number of parquet cache evictions + # TYPE cortex_parquet_queryable_cache_evictions_total counter + cortex_parquet_queryable_cache_evictions_total{name="test"} 1 + # HELP cortex_parquet_queryable_cache_hits_total Total number of parquet cache hits + # TYPE cortex_parquet_queryable_cache_hits_total counter + cortex_parquet_queryable_cache_hits_total{name="test"} 3 + # HELP cortex_parquet_queryable_cache_item_count Current number of cached parquet items + # TYPE cortex_parquet_queryable_cache_item_count gauge + cortex_parquet_queryable_cache_item_count{name="test"} 2 + # HELP cortex_parquet_queryable_cache_misses_total Total number of parquet cache misses + # TYPE cortex_parquet_queryable_cache_misses_total counter + cortex_parquet_queryable_cache_misses_total{name="test"} 1 +`)))) +} + +func Test_Cache_TTLEvictionByGet(t *testing.T) { + reg := prometheus.NewRegistry() + metrics := newCacheMetrics(reg) + + cache, err := newCache[string]("test", 10, 100*time.Millisecond, time.Minute, metrics) + require.NoError(t, err) + defer cache.Close() + + cache.Set("key1", "value1") + + val := cache.Get("key1") + require.Equal(t, "value1", val) + + // sleep longer than TTL + time.Sleep(150 * time.Millisecond) + + val = cache.Get("key1") + require.Equal(t, "", val) + + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(fmt.Sprintf(` + # HELP cortex_parquet_queryable_cache_evictions_total Total number of parquet cache evictions + # TYPE cortex_parquet_queryable_cache_evictions_total counter + cortex_parquet_queryable_cache_evictions_total{name="test"} 1 + # HELP cortex_parquet_queryable_cache_hits_total Total number of parquet cache hits + # TYPE cortex_parquet_queryable_cache_hits_total counter + cortex_parquet_queryable_cache_hits_total{name="test"} 1 + # HELP cortex_parquet_queryable_cache_item_count Current number of cached parquet items + # TYPE cortex_parquet_queryable_cache_item_count gauge + cortex_parquet_queryable_cache_item_count{name="test"} 0 + # HELP cortex_parquet_queryable_cache_misses_total Total number of parquet cache misses + # TYPE cortex_parquet_queryable_cache_misses_total counter + cortex_parquet_queryable_cache_misses_total{name="test"} 1 +`)))) +} + +func Test_Cache_TTLEvictionByLoop(t *testing.T) { + reg := prometheus.NewRegistry() + metrics := newCacheMetrics(reg) + + cache, err := newCache[string]("test", 10, 100*time.Millisecond, 100*time.Millisecond, metrics) + require.NoError(t, err) + defer cache.Close() + + cache.Set("key1", "value1") + + val := cache.Get("key1") + require.Equal(t, "value1", val) + + // sleep longer than TTL + time.Sleep(150 * time.Millisecond) + + if c, ok := cache.(*Cache[string]); ok { + // should delete by maintenance loop + _, ok := c.cache.Peek("key1") + require.False(t, ok) + } + + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(fmt.Sprintf(` + # HELP cortex_parquet_queryable_cache_evictions_total Total number of parquet cache evictions + # TYPE cortex_parquet_queryable_cache_evictions_total counter + cortex_parquet_queryable_cache_evictions_total{name="test"} 1 + # HELP cortex_parquet_queryable_cache_hits_total Total number of parquet cache hits + # TYPE cortex_parquet_queryable_cache_hits_total counter + cortex_parquet_queryable_cache_hits_total{name="test"} 1 + # HELP cortex_parquet_queryable_cache_item_count Current number of cached parquet items + # TYPE cortex_parquet_queryable_cache_item_count gauge + cortex_parquet_queryable_cache_item_count{name="test"} 0 +`)))) +} diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index f84f07b9674..8818f1266a1 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -92,10 +92,11 @@ type Config struct { EnablePromQLExperimentalFunctions bool `yaml:"enable_promql_experimental_functions"` // Query Parquet files if available - EnableParquetQueryable bool `yaml:"enable_parquet_queryable"` - ParquetQueryableShardCacheSize int `yaml:"parquet_queryable_shard_cache_size"` - ParquetQueryableDefaultBlockStore string `yaml:"parquet_queryable_default_block_store"` - ParquetQueryableFallbackDisabled bool `yaml:"parquet_queryable_fallback_disabled"` + EnableParquetQueryable bool `yaml:"enable_parquet_queryable"` + ParquetQueryableShardCacheSize int `yaml:"parquet_queryable_shard_cache_size"` + ParquetQueryableDefaultBlockStore string `yaml:"parquet_queryable_default_block_store"` + ParquetQueryableFallbackDisabled bool `yaml:"parquet_queryable_fallback_disabled"` + ParquetQueryableShardCacheTTL time.Duration `yaml:"parquet_queryable_shard_cache_ttl"` DistributedExecEnabled bool `yaml:"distributed_exec_enabled" doc:"hidden"` } @@ -147,6 +148,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.EnablePromQLExperimentalFunctions, "querier.enable-promql-experimental-functions", false, "[Experimental] If true, experimental promQL functions are enabled.") f.BoolVar(&cfg.EnableParquetQueryable, "querier.enable-parquet-queryable", false, "[Experimental] If true, querier will try to query the parquet files if available.") f.IntVar(&cfg.ParquetQueryableShardCacheSize, "querier.parquet-queryable-shard-cache-size", 512, "[Experimental] Maximum size of the Parquet queryable shard cache. 0 to disable.") + f.DurationVar(&cfg.ParquetQueryableShardCacheTTL, "querier.parquet-queryable-shard-cache-ttl", 24*time.Hour, "[Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL.") f.StringVar(&cfg.ParquetQueryableDefaultBlockStore, "querier.parquet-queryable-default-block-store", string(parquetBlockStore), "[Experimental] Parquet queryable's default block store to query. Valid options are tsdb and parquet. If it is set to tsdb, parquet queryable always fallback to store gateway.") f.BoolVar(&cfg.DistributedExecEnabled, "querier.distributed-exec-enabled", false, "Experimental: Enables distributed execution of queries by passing logical query plan fragments to downstream components.") f.BoolVar(&cfg.ParquetQueryableFallbackDisabled, "querier.parquet-queryable-fallback-disabled", false, "[Experimental] Disable Parquet queryable to fallback queries to Store Gateway if the block is not available as Parquet files but available in TSDB. Setting this to true will disable the fallback and users can remove Store Gateway. But need to make sure Parquet files are created before it is queryable.") diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 8b748f0c8d3..1e4a246cd27 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -5801,6 +5801,13 @@ "type": "number", "x-cli-flag": "querier.parquet-queryable-shard-cache-size" }, + "parquet_queryable_shard_cache_ttl": { + "default": "24h0m0s", + "description": "[Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL.", + "type": "string", + "x-cli-flag": "querier.parquet-queryable-shard-cache-ttl", + "x-format": "duration" + }, "per_step_stats_enabled": { "default": false, "description": "Enable returning samples stats per steps in query response.", From 9c55941cfb6cef97c64d047b30d29b50d65decfc Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Sat, 8 Nov 2025 08:09:50 +0900 Subject: [PATCH 2/2] fix lint Signed-off-by: SungJin1212 --- pkg/querier/parquet_queryable_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/querier/parquet_queryable_test.go b/pkg/querier/parquet_queryable_test.go index 9626a9324e2..f8895eb550d 100644 --- a/pkg/querier/parquet_queryable_test.go +++ b/pkg/querier/parquet_queryable_test.go @@ -905,7 +905,7 @@ func Test_Cache_LRUEviction(t *testing.T) { val2 := cache.Get("key2") // miss require.Equal(t, "", val2) - require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(fmt.Sprintf(` + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` # HELP cortex_parquet_queryable_cache_evictions_total Total number of parquet cache evictions # TYPE cortex_parquet_queryable_cache_evictions_total counter cortex_parquet_queryable_cache_evictions_total{name="test"} 1 @@ -918,7 +918,7 @@ func Test_Cache_LRUEviction(t *testing.T) { # HELP cortex_parquet_queryable_cache_misses_total Total number of parquet cache misses # TYPE cortex_parquet_queryable_cache_misses_total counter cortex_parquet_queryable_cache_misses_total{name="test"} 1 -`)))) + `))) } func Test_Cache_TTLEvictionByGet(t *testing.T) { @@ -940,7 +940,7 @@ func Test_Cache_TTLEvictionByGet(t *testing.T) { val = cache.Get("key1") require.Equal(t, "", val) - require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(fmt.Sprintf(` + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` # HELP cortex_parquet_queryable_cache_evictions_total Total number of parquet cache evictions # TYPE cortex_parquet_queryable_cache_evictions_total counter cortex_parquet_queryable_cache_evictions_total{name="test"} 1 @@ -953,7 +953,7 @@ func Test_Cache_TTLEvictionByGet(t *testing.T) { # HELP cortex_parquet_queryable_cache_misses_total Total number of parquet cache misses # TYPE cortex_parquet_queryable_cache_misses_total counter cortex_parquet_queryable_cache_misses_total{name="test"} 1 -`)))) + `))) } func Test_Cache_TTLEvictionByLoop(t *testing.T) { @@ -978,7 +978,7 @@ func Test_Cache_TTLEvictionByLoop(t *testing.T) { require.False(t, ok) } - require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(fmt.Sprintf(` + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` # HELP cortex_parquet_queryable_cache_evictions_total Total number of parquet cache evictions # TYPE cortex_parquet_queryable_cache_evictions_total counter cortex_parquet_queryable_cache_evictions_total{name="test"} 1 @@ -988,5 +988,5 @@ func Test_Cache_TTLEvictionByLoop(t *testing.T) { # HELP cortex_parquet_queryable_cache_item_count Current number of cached parquet items # TYPE cortex_parquet_queryable_cache_item_count gauge cortex_parquet_queryable_cache_item_count{name="test"} 0 -`)))) + `))) }