Skip to content

Commit 12bc203

Browse files
committed
Add parquet shard cache TTL
Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
1 parent f593e69 commit 12bc203

File tree

7 files changed

+214
-14
lines changed

7 files changed

+214
-14
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
* [ENHANCEMENT] Ingester: Add `enable_matcher_optimization` config to apply low selectivity matchers lazily. #7063
77
* [ENHANCEMENT] Distributor: Add a label references validation for remote write v2 request. #7074
88
* [ENHANCEMENT] Distributor: Add count, spans, and buckets validations for native histogram. #7072
9+
* [ENHANCEMENT] Querier: Add a `-querier.parquet-queryable-shard-cache-ttl` flag to add TTL to parquet shard cache. #7097
910
* [BUGFIX] Compactor: Avoid race condition which allow a grouper to not compact all partitions. #7082
1011
* [BUGFIX] Fix bug where validating metric names uses the wrong validation logic. #7086
1112
* [BUGFIX] Ring: Change DynamoDB KV to retry indefinetly for WatchKey. #7088

docs/blocks-storage/querier.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,10 @@ querier:
302302
# queryable.
303303
# CLI flag: -querier.parquet-queryable-fallback-disabled
304304
[parquet_queryable_fallback_disabled: <boolean> | default = false]
305+
306+
# [Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL.
307+
# CLI flag: -querier.parquet-queryable-shard-cache-ttl
308+
[parquet_queryable_shard_cache_ttl: <duration> | default = 24h]
305309
```
306310
307311
### `blocks_storage_config`

docs/configuration/config-file-reference.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4790,6 +4790,10 @@ thanos_engine:
47904790
# need to make sure Parquet files are created before it is queryable.
47914791
# CLI flag: -querier.parquet-queryable-fallback-disabled
47924792
[parquet_queryable_fallback_disabled: <boolean> | default = false]
4793+
4794+
# [Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL.
4795+
# CLI flag: -querier.parquet-queryable-shard-cache-ttl
4796+
[parquet_queryable_shard_cache_ttl: <duration> | default = 24h]
47934797
```
47944798
47954799
### `query_frontend_config`

pkg/querier/parquet_queryable.go

Lines changed: 83 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ const (
5151
parquetBlockStore blockStoreType = "parquet"
5252
)
5353

54+
const defaultMaintenanceInterval = time.Minute
55+
5456
var (
5557
validBlockStoreTypes = []blockStoreType{tsdbBlockStore, parquetBlockStore}
5658
)
@@ -97,6 +99,7 @@ type parquetQueryableWithFallback struct {
9799
fallbackDisabled bool
98100
queryStoreAfter time.Duration
99101
parquetQueryable storage.Queryable
102+
cache cacheInterface[parquet_storage.ParquetShard]
100103
blockStorageQueryable *BlocksStoreQueryable
101104

102105
finder BlocksFinder
@@ -132,7 +135,7 @@ func NewParquetQueryable(
132135
return nil, err
133136
}
134137

135-
cache, err := newCache[parquet_storage.ParquetShard]("parquet-shards", config.ParquetQueryableShardCacheSize, newCacheMetrics(reg))
138+
cache, err := newCache[parquet_storage.ParquetShard]("parquet-shards", config.ParquetQueryableShardCacheSize, config.ParquetQueryableShardCacheTTL, defaultMaintenanceInterval, newCacheMetrics(reg))
136139
if err != nil {
137140
return nil, err
138141
}
@@ -248,6 +251,7 @@ func NewParquetQueryable(
248251
subservices: manager,
249252
blockStorageQueryable: blockStorageQueryable,
250253
parquetQueryable: parquetQueryable,
254+
cache: cache,
251255
queryStoreAfter: config.QueryStoreAfter,
252256
subservicesWatcher: services.NewFailureWatcher(),
253257
finder: blockStorageQueryable.finder,
@@ -283,6 +287,10 @@ func (p *parquetQueryableWithFallback) running(ctx context.Context) error {
283287
}
284288

285289
func (p *parquetQueryableWithFallback) stopping(_ error) error {
290+
if p.cache != nil {
291+
p.cache.Close()
292+
}
293+
286294
return services.StopManagerAndAwaitStopped(context.Background(), p.subservices)
287295
}
288296

@@ -613,6 +621,7 @@ func materializedLabelsFilterCallback(ctx context.Context, _ *storage.SelectHint
613621
type cacheInterface[T any] interface {
614622
Get(path string) T
615623
Set(path string, reader T)
624+
Close()
616625
}
617626

618627
type cacheMetrics struct {
@@ -643,35 +652,81 @@ func newCacheMetrics(reg prometheus.Registerer) *cacheMetrics {
643652
}
644653
}
645654

655+
type cacheEntry[T any] struct {
656+
value T
657+
expiresAt time.Time
658+
}
659+
646660
type Cache[T any] struct {
647-
cache *lru.Cache[string, T]
661+
cache *lru.Cache[string, *cacheEntry[T]]
648662
name string
649663
metrics *cacheMetrics
664+
ttl time.Duration
665+
stopCh chan struct{}
650666
}
651667

652-
func newCache[T any](name string, size int, metrics *cacheMetrics) (cacheInterface[T], error) {
668+
func newCache[T any](name string, size int, ttl, maintenanceInterval time.Duration, metrics *cacheMetrics) (cacheInterface[T], error) {
653669
if size <= 0 {
654670
return &noopCache[T]{}, nil
655671
}
656-
cache, err := lru.NewWithEvict(size, func(key string, value T) {
672+
cache, err := lru.NewWithEvict(size, func(key string, value *cacheEntry[T]) {
657673
metrics.evictions.WithLabelValues(name).Inc()
658674
metrics.size.WithLabelValues(name).Dec()
659675
})
660676
if err != nil {
661677
return nil, err
662678
}
663679

664-
return &Cache[T]{
680+
c := &Cache[T]{
665681
cache: cache,
666682
name: name,
667683
metrics: metrics,
668-
}, nil
684+
ttl: ttl,
685+
stopCh: make(chan struct{}),
686+
}
687+
688+
if ttl > 0 {
689+
go c.maintenanceLoop(maintenanceInterval)
690+
}
691+
692+
return c, nil
693+
}
694+
695+
func (c *Cache[T]) maintenanceLoop(interval time.Duration) {
696+
ticker := time.NewTicker(interval)
697+
defer ticker.Stop()
698+
699+
for {
700+
select {
701+
case <-ticker.C:
702+
now := time.Now()
703+
keys := c.cache.Keys()
704+
for _, key := range keys {
705+
if entry, ok := c.cache.Peek(key); ok {
706+
// we use a Peek() because the Get() change LRU order.
707+
if !entry.expiresAt.IsZero() && now.After(entry.expiresAt) {
708+
c.cache.Remove(key)
709+
}
710+
}
711+
}
712+
case <-c.stopCh:
713+
return
714+
}
715+
}
669716
}
670717

671718
func (c *Cache[T]) Get(path string) (r T) {
672-
if reader, ok := c.cache.Get(path); ok {
719+
if entry, ok := c.cache.Get(path); ok {
720+
isExpired := !entry.expiresAt.IsZero() && time.Now().After(entry.expiresAt)
721+
722+
if isExpired {
723+
c.cache.Remove(path)
724+
c.metrics.misses.WithLabelValues(c.name).Inc()
725+
return
726+
}
727+
673728
c.metrics.hits.WithLabelValues(c.name).Inc()
674-
return reader
729+
return entry.value
675730
}
676731
c.metrics.misses.WithLabelValues(c.name).Inc()
677732
return
@@ -681,8 +736,22 @@ func (c *Cache[T]) Set(path string, reader T) {
681736
if !c.cache.Contains(path) {
682737
c.metrics.size.WithLabelValues(c.name).Inc()
683738
}
684-
c.metrics.misses.WithLabelValues(c.name).Inc()
685-
c.cache.Add(path, reader)
739+
740+
var expiresAt time.Time
741+
if c.ttl > 0 {
742+
expiresAt = time.Now().Add(c.ttl)
743+
}
744+
745+
entry := &cacheEntry[T]{
746+
value: reader,
747+
expiresAt: expiresAt,
748+
}
749+
750+
c.cache.Add(path, entry)
751+
}
752+
753+
func (c *Cache[T]) Close() {
754+
close(c.stopCh)
686755
}
687756

688757
type noopCache[T any] struct {
@@ -696,6 +765,10 @@ func (n noopCache[T]) Set(_ string, _ T) {
696765

697766
}
698767

768+
func (n noopCache[T]) Close() {
769+
770+
}
771+
699772
var (
700773
shardInfoCtxKey contextKey = 1
701774
)

pkg/querier/parquet_queryable_test.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package querier
22

33
import (
4+
"bytes"
45
"context"
56
"fmt"
67
"math/rand"
@@ -14,6 +15,7 @@ import (
1415
"github.com/oklog/ulid/v2"
1516
"github.com/prometheus-community/parquet-common/convert"
1617
"github.com/prometheus/client_golang/prometheus"
18+
"github.com/prometheus/client_golang/prometheus/testutil"
1719
"github.com/prometheus/prometheus/model/labels"
1820
"github.com/prometheus/prometheus/storage"
1921
"github.com/prometheus/prometheus/tsdb"
@@ -881,3 +883,110 @@ func TestParquetQueryableFallbackDisabled(t *testing.T) {
881883
})
882884
})
883885
}
886+
887+
func Test_Cache_LRUEviction(t *testing.T) {
888+
reg := prometheus.NewRegistry()
889+
metrics := newCacheMetrics(reg)
890+
cache, err := newCache[string]("test", 2, 0, time.Minute, metrics)
891+
require.NoError(t, err)
892+
defer cache.Close()
893+
894+
cache.Set("key1", "value1")
895+
cache.Set("key2", "value2")
896+
897+
_ = cache.Get("key1") // hit
898+
// "key2" deleted by LRU eviction
899+
cache.Set("key3", "value3")
900+
901+
val1 := cache.Get("key1") // hit
902+
require.Equal(t, "value1", val1)
903+
val3 := cache.Get("key3") // hit
904+
require.Equal(t, "value3", val3)
905+
val2 := cache.Get("key2") // miss
906+
require.Equal(t, "", val2)
907+
908+
require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(fmt.Sprintf(`
909+
# HELP cortex_parquet_queryable_cache_evictions_total Total number of parquet cache evictions
910+
# TYPE cortex_parquet_queryable_cache_evictions_total counter
911+
cortex_parquet_queryable_cache_evictions_total{name="test"} 1
912+
# HELP cortex_parquet_queryable_cache_hits_total Total number of parquet cache hits
913+
# TYPE cortex_parquet_queryable_cache_hits_total counter
914+
cortex_parquet_queryable_cache_hits_total{name="test"} 3
915+
# HELP cortex_parquet_queryable_cache_item_count Current number of cached parquet items
916+
# TYPE cortex_parquet_queryable_cache_item_count gauge
917+
cortex_parquet_queryable_cache_item_count{name="test"} 2
918+
# HELP cortex_parquet_queryable_cache_misses_total Total number of parquet cache misses
919+
# TYPE cortex_parquet_queryable_cache_misses_total counter
920+
cortex_parquet_queryable_cache_misses_total{name="test"} 1
921+
`))))
922+
}
923+
924+
func Test_Cache_TTLEvictionByGet(t *testing.T) {
925+
reg := prometheus.NewRegistry()
926+
metrics := newCacheMetrics(reg)
927+
928+
cache, err := newCache[string]("test", 10, 100*time.Millisecond, time.Minute, metrics)
929+
require.NoError(t, err)
930+
defer cache.Close()
931+
932+
cache.Set("key1", "value1")
933+
934+
val := cache.Get("key1")
935+
require.Equal(t, "value1", val)
936+
937+
// sleep longer than TTL
938+
time.Sleep(150 * time.Millisecond)
939+
940+
val = cache.Get("key1")
941+
require.Equal(t, "", val)
942+
943+
require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(fmt.Sprintf(`
944+
# HELP cortex_parquet_queryable_cache_evictions_total Total number of parquet cache evictions
945+
# TYPE cortex_parquet_queryable_cache_evictions_total counter
946+
cortex_parquet_queryable_cache_evictions_total{name="test"} 1
947+
# HELP cortex_parquet_queryable_cache_hits_total Total number of parquet cache hits
948+
# TYPE cortex_parquet_queryable_cache_hits_total counter
949+
cortex_parquet_queryable_cache_hits_total{name="test"} 1
950+
# HELP cortex_parquet_queryable_cache_item_count Current number of cached parquet items
951+
# TYPE cortex_parquet_queryable_cache_item_count gauge
952+
cortex_parquet_queryable_cache_item_count{name="test"} 0
953+
# HELP cortex_parquet_queryable_cache_misses_total Total number of parquet cache misses
954+
# TYPE cortex_parquet_queryable_cache_misses_total counter
955+
cortex_parquet_queryable_cache_misses_total{name="test"} 1
956+
`))))
957+
}
958+
959+
func Test_Cache_TTLEvictionByLoop(t *testing.T) {
960+
reg := prometheus.NewRegistry()
961+
metrics := newCacheMetrics(reg)
962+
963+
cache, err := newCache[string]("test", 10, 100*time.Millisecond, 100*time.Millisecond, metrics)
964+
require.NoError(t, err)
965+
defer cache.Close()
966+
967+
cache.Set("key1", "value1")
968+
969+
val := cache.Get("key1")
970+
require.Equal(t, "value1", val)
971+
972+
// sleep longer than TTL
973+
time.Sleep(150 * time.Millisecond)
974+
975+
if c, ok := cache.(*Cache[string]); ok {
976+
// should delete by maintenance loop
977+
_, ok := c.cache.Peek("key1")
978+
require.False(t, ok)
979+
}
980+
981+
require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(fmt.Sprintf(`
982+
# HELP cortex_parquet_queryable_cache_evictions_total Total number of parquet cache evictions
983+
# TYPE cortex_parquet_queryable_cache_evictions_total counter
984+
cortex_parquet_queryable_cache_evictions_total{name="test"} 1
985+
# HELP cortex_parquet_queryable_cache_hits_total Total number of parquet cache hits
986+
# TYPE cortex_parquet_queryable_cache_hits_total counter
987+
cortex_parquet_queryable_cache_hits_total{name="test"} 1
988+
# HELP cortex_parquet_queryable_cache_item_count Current number of cached parquet items
989+
# TYPE cortex_parquet_queryable_cache_item_count gauge
990+
cortex_parquet_queryable_cache_item_count{name="test"} 0
991+
`))))
992+
}

pkg/querier/querier.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,11 @@ type Config struct {
9292
EnablePromQLExperimentalFunctions bool `yaml:"enable_promql_experimental_functions"`
9393

9494
// Query Parquet files if available
95-
EnableParquetQueryable bool `yaml:"enable_parquet_queryable"`
96-
ParquetQueryableShardCacheSize int `yaml:"parquet_queryable_shard_cache_size"`
97-
ParquetQueryableDefaultBlockStore string `yaml:"parquet_queryable_default_block_store"`
98-
ParquetQueryableFallbackDisabled bool `yaml:"parquet_queryable_fallback_disabled"`
95+
EnableParquetQueryable bool `yaml:"enable_parquet_queryable"`
96+
ParquetQueryableShardCacheSize int `yaml:"parquet_queryable_shard_cache_size"`
97+
ParquetQueryableDefaultBlockStore string `yaml:"parquet_queryable_default_block_store"`
98+
ParquetQueryableFallbackDisabled bool `yaml:"parquet_queryable_fallback_disabled"`
99+
ParquetQueryableShardCacheTTL time.Duration `yaml:"parquet_queryable_shard_cache_ttl"`
99100

100101
DistributedExecEnabled bool `yaml:"distributed_exec_enabled" doc:"hidden"`
101102
}
@@ -147,6 +148,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
147148
f.BoolVar(&cfg.EnablePromQLExperimentalFunctions, "querier.enable-promql-experimental-functions", false, "[Experimental] If true, experimental promQL functions are enabled.")
148149
f.BoolVar(&cfg.EnableParquetQueryable, "querier.enable-parquet-queryable", false, "[Experimental] If true, querier will try to query the parquet files if available.")
149150
f.IntVar(&cfg.ParquetQueryableShardCacheSize, "querier.parquet-queryable-shard-cache-size", 512, "[Experimental] Maximum size of the Parquet queryable shard cache. 0 to disable.")
151+
f.DurationVar(&cfg.ParquetQueryableShardCacheTTL, "querier.parquet-queryable-shard-cache-ttl", 24*time.Hour, "[Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL.")
150152
f.StringVar(&cfg.ParquetQueryableDefaultBlockStore, "querier.parquet-queryable-default-block-store", string(parquetBlockStore), "[Experimental] Parquet queryable's default block store to query. Valid options are tsdb and parquet. If it is set to tsdb, parquet queryable always fallback to store gateway.")
151153
f.BoolVar(&cfg.DistributedExecEnabled, "querier.distributed-exec-enabled", false, "Experimental: Enables distributed execution of queries by passing logical query plan fragments to downstream components.")
152154
f.BoolVar(&cfg.ParquetQueryableFallbackDisabled, "querier.parquet-queryable-fallback-disabled", false, "[Experimental] Disable Parquet queryable to fallback queries to Store Gateway if the block is not available as Parquet files but available in TSDB. Setting this to true will disable the fallback and users can remove Store Gateway. But need to make sure Parquet files are created before it is queryable.")

schemas/cortex-config-schema.json

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5801,6 +5801,13 @@
58015801
"type": "number",
58025802
"x-cli-flag": "querier.parquet-queryable-shard-cache-size"
58035803
},
5804+
"parquet_queryable_shard_cache_ttl": {
5805+
"default": "24h0m0s",
5806+
"description": "[Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL.",
5807+
"type": "string",
5808+
"x-cli-flag": "querier.parquet-queryable-shard-cache-ttl",
5809+
"x-format": "duration"
5810+
},
58045811
"per_step_stats_enabled": {
58055812
"default": false,
58065813
"description": "Enable returning samples stats per steps in query response.",

0 commit comments

Comments
 (0)