Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* [ENHANCEMENT] Ingester: Add `enable_matcher_optimization` config to apply low selectivity matchers lazily. #7063
* [ENHANCEMENT] Distributor: Add a label references validation for remote write v2 request. #7074
* [ENHANCEMENT] Distributor: Add count, spans, and buckets validations for native histogram. #7072
* [ENHANCEMENT] Querier: Add a `-querier.parquet-queryable-shard-cache-ttl` flag to add TTL to parquet shard cache. #7098
* [BUGFIX] Compactor: Avoid race condition which allow a grouper to not compact all partitions. #7082
* [BUGFIX] Fix bug where validating metric names uses the wrong validation logic. #7086
* [BUGFIX] Ring: Change DynamoDB KV to retry indefinetly for WatchKey. #7088
Expand Down
4 changes: 4 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,10 @@ querier:
# queryable.
# CLI flag: -querier.parquet-queryable-fallback-disabled
[parquet_queryable_fallback_disabled: <boolean> | default = false]

# [Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL.
# CLI flag: -querier.parquet-queryable-shard-cache-ttl
[parquet_queryable_shard_cache_ttl: <duration> | default = 24h]
```

### `blocks_storage_config`
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4790,6 +4790,10 @@ thanos_engine:
# need to make sure Parquet files are created before it is queryable.
# CLI flag: -querier.parquet-queryable-fallback-disabled
[parquet_queryable_fallback_disabled: <boolean> | default = false]

# [Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL.
# CLI flag: -querier.parquet-queryable-shard-cache-ttl
[parquet_queryable_shard_cache_ttl: <duration> | default = 24h]
```

### `query_frontend_config`
Expand Down
93 changes: 83 additions & 10 deletions pkg/querier/parquet_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ const (
parquetBlockStore blockStoreType = "parquet"
)

const defaultMaintenanceInterval = time.Minute

var (
validBlockStoreTypes = []blockStoreType{tsdbBlockStore, parquetBlockStore}
)
Expand Down Expand Up @@ -97,6 +99,7 @@ type parquetQueryableWithFallback struct {
fallbackDisabled bool
queryStoreAfter time.Duration
parquetQueryable storage.Queryable
cache cacheInterface[parquet_storage.ParquetShard]
blockStorageQueryable *BlocksStoreQueryable

finder BlocksFinder
Expand Down Expand Up @@ -132,7 +135,7 @@ func NewParquetQueryable(
return nil, err
}

cache, err := newCache[parquet_storage.ParquetShard]("parquet-shards", config.ParquetQueryableShardCacheSize, newCacheMetrics(reg))
cache, err := newCache[parquet_storage.ParquetShard]("parquet-shards", config.ParquetQueryableShardCacheSize, config.ParquetQueryableShardCacheTTL, defaultMaintenanceInterval, newCacheMetrics(reg))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -248,6 +251,7 @@ func NewParquetQueryable(
subservices: manager,
blockStorageQueryable: blockStorageQueryable,
parquetQueryable: parquetQueryable,
cache: cache,
queryStoreAfter: config.QueryStoreAfter,
subservicesWatcher: services.NewFailureWatcher(),
finder: blockStorageQueryable.finder,
Expand Down Expand Up @@ -283,6 +287,10 @@ func (p *parquetQueryableWithFallback) running(ctx context.Context) error {
}

func (p *parquetQueryableWithFallback) stopping(_ error) error {
if p.cache != nil {
p.cache.Close()
}

return services.StopManagerAndAwaitStopped(context.Background(), p.subservices)
}

Expand Down Expand Up @@ -613,6 +621,7 @@ func materializedLabelsFilterCallback(ctx context.Context, _ *storage.SelectHint
type cacheInterface[T any] interface {
Get(path string) T
Set(path string, reader T)
Close()
}

type cacheMetrics struct {
Expand Down Expand Up @@ -643,35 +652,81 @@ func newCacheMetrics(reg prometheus.Registerer) *cacheMetrics {
}
}

type cacheEntry[T any] struct {
value T
expiresAt time.Time
}

type Cache[T any] struct {
cache *lru.Cache[string, T]
cache *lru.Cache[string, *cacheEntry[T]]
name string
metrics *cacheMetrics
ttl time.Duration
stopCh chan struct{}
}

func newCache[T any](name string, size int, metrics *cacheMetrics) (cacheInterface[T], error) {
func newCache[T any](name string, size int, ttl, maintenanceInterval time.Duration, metrics *cacheMetrics) (cacheInterface[T], error) {
if size <= 0 {
return &noopCache[T]{}, nil
}
cache, err := lru.NewWithEvict(size, func(key string, value T) {
cache, err := lru.NewWithEvict(size, func(key string, value *cacheEntry[T]) {
metrics.evictions.WithLabelValues(name).Inc()
metrics.size.WithLabelValues(name).Dec()
})
if err != nil {
return nil, err
}

return &Cache[T]{
c := &Cache[T]{
cache: cache,
name: name,
metrics: metrics,
}, nil
ttl: ttl,
stopCh: make(chan struct{}),
}

if ttl > 0 {
go c.maintenanceLoop(maintenanceInterval)
}

return c, nil
}

func (c *Cache[T]) maintenanceLoop(interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
now := time.Now()
keys := c.cache.Keys()
for _, key := range keys {
if entry, ok := c.cache.Peek(key); ok {
// we use a Peek() because the Get() change LRU order.
if !entry.expiresAt.IsZero() && now.After(entry.expiresAt) {
c.cache.Remove(key)
}
}
}
case <-c.stopCh:
return
}
}
}

func (c *Cache[T]) Get(path string) (r T) {
if reader, ok := c.cache.Get(path); ok {
if entry, ok := c.cache.Get(path); ok {
isExpired := !entry.expiresAt.IsZero() && time.Now().After(entry.expiresAt)

if isExpired {
c.cache.Remove(path)
c.metrics.misses.WithLabelValues(c.name).Inc()
return
}

c.metrics.hits.WithLabelValues(c.name).Inc()
return reader
return entry.value
}
c.metrics.misses.WithLabelValues(c.name).Inc()
return
Expand All @@ -681,8 +736,22 @@ func (c *Cache[T]) Set(path string, reader T) {
if !c.cache.Contains(path) {
c.metrics.size.WithLabelValues(c.name).Inc()
}
c.metrics.misses.WithLabelValues(c.name).Inc()
c.cache.Add(path, reader)

var expiresAt time.Time
if c.ttl > 0 {
expiresAt = time.Now().Add(c.ttl)
}

entry := &cacheEntry[T]{
value: reader,
expiresAt: expiresAt,
}

c.cache.Add(path, entry)
}

func (c *Cache[T]) Close() {
close(c.stopCh)
}

type noopCache[T any] struct {
Expand All @@ -696,6 +765,10 @@ func (n noopCache[T]) Set(_ string, _ T) {

}

func (n noopCache[T]) Close() {

}

var (
shardInfoCtxKey contextKey = 1
)
Expand Down
109 changes: 109 additions & 0 deletions pkg/querier/parquet_queryable_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package querier

import (
"bytes"
"context"
"fmt"
"math/rand"
Expand All @@ -14,6 +15,7 @@ import (
"github.com/oklog/ulid/v2"
"github.com/prometheus-community/parquet-common/convert"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
Expand Down Expand Up @@ -881,3 +883,110 @@ func TestParquetQueryableFallbackDisabled(t *testing.T) {
})
})
}

func Test_Cache_LRUEviction(t *testing.T) {
reg := prometheus.NewRegistry()
metrics := newCacheMetrics(reg)
cache, err := newCache[string]("test", 2, 0, time.Minute, metrics)
require.NoError(t, err)
defer cache.Close()

cache.Set("key1", "value1")
cache.Set("key2", "value2")

_ = cache.Get("key1") // hit
// "key2" deleted by LRU eviction
cache.Set("key3", "value3")

val1 := cache.Get("key1") // hit
require.Equal(t, "value1", val1)
val3 := cache.Get("key3") // hit
require.Equal(t, "value3", val3)
val2 := cache.Get("key2") // miss
require.Equal(t, "", val2)

require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
# HELP cortex_parquet_queryable_cache_evictions_total Total number of parquet cache evictions
# TYPE cortex_parquet_queryable_cache_evictions_total counter
cortex_parquet_queryable_cache_evictions_total{name="test"} 1
# HELP cortex_parquet_queryable_cache_hits_total Total number of parquet cache hits
# TYPE cortex_parquet_queryable_cache_hits_total counter
cortex_parquet_queryable_cache_hits_total{name="test"} 3
# HELP cortex_parquet_queryable_cache_item_count Current number of cached parquet items
# TYPE cortex_parquet_queryable_cache_item_count gauge
cortex_parquet_queryable_cache_item_count{name="test"} 2
# HELP cortex_parquet_queryable_cache_misses_total Total number of parquet cache misses
# TYPE cortex_parquet_queryable_cache_misses_total counter
cortex_parquet_queryable_cache_misses_total{name="test"} 1
`)))
}

func Test_Cache_TTLEvictionByGet(t *testing.T) {
reg := prometheus.NewRegistry()
metrics := newCacheMetrics(reg)

cache, err := newCache[string]("test", 10, 100*time.Millisecond, time.Minute, metrics)
require.NoError(t, err)
defer cache.Close()

cache.Set("key1", "value1")

val := cache.Get("key1")
require.Equal(t, "value1", val)

// sleep longer than TTL
time.Sleep(150 * time.Millisecond)

val = cache.Get("key1")
require.Equal(t, "", val)

require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
# HELP cortex_parquet_queryable_cache_evictions_total Total number of parquet cache evictions
# TYPE cortex_parquet_queryable_cache_evictions_total counter
cortex_parquet_queryable_cache_evictions_total{name="test"} 1
# HELP cortex_parquet_queryable_cache_hits_total Total number of parquet cache hits
# TYPE cortex_parquet_queryable_cache_hits_total counter
cortex_parquet_queryable_cache_hits_total{name="test"} 1
# HELP cortex_parquet_queryable_cache_item_count Current number of cached parquet items
# TYPE cortex_parquet_queryable_cache_item_count gauge
cortex_parquet_queryable_cache_item_count{name="test"} 0
# HELP cortex_parquet_queryable_cache_misses_total Total number of parquet cache misses
# TYPE cortex_parquet_queryable_cache_misses_total counter
cortex_parquet_queryable_cache_misses_total{name="test"} 1
`)))
}

func Test_Cache_TTLEvictionByLoop(t *testing.T) {
reg := prometheus.NewRegistry()
metrics := newCacheMetrics(reg)

cache, err := newCache[string]("test", 10, 100*time.Millisecond, 100*time.Millisecond, metrics)
require.NoError(t, err)
defer cache.Close()

cache.Set("key1", "value1")

val := cache.Get("key1")
require.Equal(t, "value1", val)

// sleep longer than TTL
time.Sleep(150 * time.Millisecond)

if c, ok := cache.(*Cache[string]); ok {
// should delete by maintenance loop
_, ok := c.cache.Peek("key1")
require.False(t, ok)
}

require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
# HELP cortex_parquet_queryable_cache_evictions_total Total number of parquet cache evictions
# TYPE cortex_parquet_queryable_cache_evictions_total counter
cortex_parquet_queryable_cache_evictions_total{name="test"} 1
# HELP cortex_parquet_queryable_cache_hits_total Total number of parquet cache hits
# TYPE cortex_parquet_queryable_cache_hits_total counter
cortex_parquet_queryable_cache_hits_total{name="test"} 1
# HELP cortex_parquet_queryable_cache_item_count Current number of cached parquet items
# TYPE cortex_parquet_queryable_cache_item_count gauge
cortex_parquet_queryable_cache_item_count{name="test"} 0
`)))
}
10 changes: 6 additions & 4 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,11 @@ type Config struct {
EnablePromQLExperimentalFunctions bool `yaml:"enable_promql_experimental_functions"`

// Query Parquet files if available
EnableParquetQueryable bool `yaml:"enable_parquet_queryable"`
ParquetQueryableShardCacheSize int `yaml:"parquet_queryable_shard_cache_size"`
ParquetQueryableDefaultBlockStore string `yaml:"parquet_queryable_default_block_store"`
ParquetQueryableFallbackDisabled bool `yaml:"parquet_queryable_fallback_disabled"`
EnableParquetQueryable bool `yaml:"enable_parquet_queryable"`
ParquetQueryableShardCacheSize int `yaml:"parquet_queryable_shard_cache_size"`
ParquetQueryableDefaultBlockStore string `yaml:"parquet_queryable_default_block_store"`
ParquetQueryableFallbackDisabled bool `yaml:"parquet_queryable_fallback_disabled"`
ParquetQueryableShardCacheTTL time.Duration `yaml:"parquet_queryable_shard_cache_ttl"`

DistributedExecEnabled bool `yaml:"distributed_exec_enabled" doc:"hidden"`
}
Expand Down Expand Up @@ -147,6 +148,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.EnablePromQLExperimentalFunctions, "querier.enable-promql-experimental-functions", false, "[Experimental] If true, experimental promQL functions are enabled.")
f.BoolVar(&cfg.EnableParquetQueryable, "querier.enable-parquet-queryable", false, "[Experimental] If true, querier will try to query the parquet files if available.")
f.IntVar(&cfg.ParquetQueryableShardCacheSize, "querier.parquet-queryable-shard-cache-size", 512, "[Experimental] Maximum size of the Parquet queryable shard cache. 0 to disable.")
f.DurationVar(&cfg.ParquetQueryableShardCacheTTL, "querier.parquet-queryable-shard-cache-ttl", 24*time.Hour, "[Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL.")
f.StringVar(&cfg.ParquetQueryableDefaultBlockStore, "querier.parquet-queryable-default-block-store", string(parquetBlockStore), "[Experimental] Parquet queryable's default block store to query. Valid options are tsdb and parquet. If it is set to tsdb, parquet queryable always fallback to store gateway.")
f.BoolVar(&cfg.DistributedExecEnabled, "querier.distributed-exec-enabled", false, "Experimental: Enables distributed execution of queries by passing logical query plan fragments to downstream components.")
f.BoolVar(&cfg.ParquetQueryableFallbackDisabled, "querier.parquet-queryable-fallback-disabled", false, "[Experimental] Disable Parquet queryable to fallback queries to Store Gateway if the block is not available as Parquet files but available in TSDB. Setting this to true will disable the fallback and users can remove Store Gateway. But need to make sure Parquet files are created before it is queryable.")
Expand Down
7 changes: 7 additions & 0 deletions schemas/cortex-config-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -5801,6 +5801,13 @@
"type": "number",
"x-cli-flag": "querier.parquet-queryable-shard-cache-size"
},
"parquet_queryable_shard_cache_ttl": {
"default": "24h0m0s",
"description": "[Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL.",
"type": "string",
"x-cli-flag": "querier.parquet-queryable-shard-cache-ttl",
"x-format": "duration"
},
"per_step_stats_enabled": {
"default": false,
"description": "Enable returning samples stats per steps in query response.",
Expand Down
Loading