diff --git a/CHANGELOG.md b/CHANGELOG.md index b124f70faab..3fd87ade43a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## master / unreleased * [FEATURE] StoreGateway: Introduces a new parquet mode. #7046 +* [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` which enables to add `__unit__` and `__type__` labels for remote write v2 and OTLP requests. The `-distributor.otlp.enable-type-and-unit-labels` flag has been consolidated into this flag. #7077 * [ENHANCEMENT] Ingester: Add `enable_matcher_optimization` config to apply low selectivity matchers lazily. #7063 * [ENHANCEMENT] Distributor: Add a label references validation for remote write v2 request. #7074 * [ENHANCEMENT] Distributor: Add count, spans, and buckets validations for native histogram. #7072 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 200d23476de..20a55cf10fd 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3274,11 +3274,6 @@ otlp: # EXPERIMENTAL: If true, delta temporality otlp metrics to be ingested. # CLI flag: -distributor.otlp.allow-delta-temporality [allow_delta_temporality: | default = false] - - # EXPERIMENTAL: If true, the '__type__' and '__unit__' labels are added for - # the OTLP metrics. - # CLI flag: -distributor.otlp.enable-type-and-unit-labels - [enable_type_and_unit_labels: | default = false] ``` ### `etcd_config` @@ -3998,6 +3993,11 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s # CLI flag: -distributor.promote-resource-attributes [promote_resource_attributes: | default = ] +# EXPERIMENTAL: If true, the __type__ and __unit__ labels are added to metrics. +# This applies to remote write v2 and OTLP requests. +# CLI flag: -distributor.enable-type-and-unit-labels +[enable_type_and_unit_labels: | default = false] + # The maximum number of active series per user, per ingester. 0 to disable. # CLI flag: -ingester.max-series-per-user [max_series_per_user: | default = 5000000] diff --git a/integration/e2e/util.go b/integration/e2e/util.go index c7af4141574..dd8f112ac0d 100644 --- a/integration/e2e/util.go +++ b/integration/e2e/util.go @@ -465,10 +465,10 @@ func GenerateSeriesV2(name string, ts time.Time, additionalLabels ...prompb.Labe st := writev2.NewSymbolTable() lb := labels.NewScratchBuilder(0) lb.Add("__name__", name) - for _, label := range additionalLabels { lb.Add(label.Name, label.Value) } + series = append(series, writev2.TimeSeries{ // Generate the series LabelsRefs: st.SymbolizeLabels(lb.Labels(), nil), @@ -476,7 +476,9 @@ func GenerateSeriesV2(name string, ts time.Time, additionalLabels ...prompb.Labe {Value: value, Timestamp: tsMillis}, }, Metadata: writev2.Metadata{ - Type: writev2.Metadata_METRIC_TYPE_GAUGE, + Type: writev2.Metadata_METRIC_TYPE_GAUGE, + HelpRef: 2, // equal to name + UnitRef: 2, // equal to name }, }) symbols = st.Symbols() diff --git a/integration/otlp_test.go b/integration/otlp_test.go index fe83c1852fa..2963db8d34b 100644 --- a/integration/otlp_test.go +++ b/integration/otlp_test.go @@ -282,7 +282,7 @@ func TestOTLPEnableTypeAndUnitLabels(t *testing.T) { "-auth.enabled": "true", // OTLP - "-distributor.otlp.enable-type-and-unit-labels": "true", + "-distributor.enable-type-and-unit-labels": "true", // alert manager "-alertmanager.web.external-url": "http://localhost/alertmanager", diff --git a/integration/remote_write_v2_test.go b/integration/remote_write_v2_test.go index c295b6a62ab..9029d3e115b 100644 --- a/integration/remote_write_v2_test.go +++ b/integration/remote_write_v2_test.go @@ -205,6 +205,77 @@ func TestIngest_SenderSendPRW2_DistributorNotAllowPRW2(t *testing.T) { require.Empty(t, result) } +func TestIngest_EnableTypeAndUnitLabels(t *testing.T) { + const blockRangePeriod = 5 * time.Second + + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Start dependencies. + consul := e2edb.NewConsulWithName("consul") + require.NoError(t, s.StartAndWaitReady(consul)) + + flags := mergeFlags( + AlertmanagerLocalFlags(), + map[string]string{ + "-store.engine": blocksStorageEngine, + "-blocks-storage.backend": "filesystem", + "-blocks-storage.tsdb.head-compaction-interval": "4m", + "-blocks-storage.bucket-store.sync-interval": "15m", + "-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory, + "-blocks-storage.bucket-store.bucket-index.enabled": "true", + "-querier.query-store-for-labels-enabled": "true", + "-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(), + "-blocks-storage.tsdb.ship-interval": "1s", + "-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(), + "-blocks-storage.tsdb.enable-native-histograms": "true", + // Ingester. + "-ring.store": "consul", + "-consul.hostname": consul.NetworkHTTPEndpoint(), + // Distributor. + "-distributor.replication-factor": "1", + "-distributor.remote-writev2-enabled": "true", + "-distributor.enable-type-and-unit-labels": "true", + // Store-gateway. + "-store-gateway.sharding-enabled": "false", + // alert manager + "-alertmanager.web.external-url": "http://localhost/alertmanager", + }, + ) + + // make alert manager config dir + require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{})) + + path := path.Join(s.SharedDir(), "cortex-1") + + flags = mergeFlags(flags, map[string]string{"-blocks-storage.filesystem.dir": path}) + // Start Cortex replicas. + cortex := e2ecortex.NewSingleBinary("cortex", flags, "") + require.NoError(t, s.StartAndWaitReady(cortex)) + + // Wait until Cortex replicas have updated the ring state. + require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total")) + + c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + now := time.Now() + + // series push + symbols1, series, _ := e2e.GenerateSeriesV2("test_series", now, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "foo", Value: "bar"}) + writeStats, err := c.PushV2(symbols1, series) + require.NoError(t, err) + testPushHeader(t, writeStats, 1, 0, 0) + + value, err := c.Query("test_series", now) + require.NoError(t, err) + require.Equal(t, model.ValVector, value.Type()) + vec := value.(model.Vector) + require.True(t, vec[0].Metric["__unit__"] != "") + require.True(t, vec[0].Metric["__type__"] != "") +} + func TestIngest(t *testing.T) { const blockRangePeriod = 5 * time.Second diff --git a/pkg/api/api.go b/pkg/api/api.go index e124fec3e68..e77d082d436 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -283,7 +283,7 @@ func (a *API) RegisterRuntimeConfig(runtimeConfigHandler http.HandlerFunc) { func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config, overrides *validation.Overrides) { distributorpb.RegisterDistributorServer(a.server.GRPC, d) - a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST") + a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST") a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(pushConfig.OTLPMaxRecvMsgSize, overrides, pushConfig.OTLPConfig, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST") a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/ring", "Distributor Ring Status") @@ -295,7 +295,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib a.RegisterRoute("/distributor/ha_tracker", d.HATracker, false, "GET") // Legacy Routes - a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/push"), push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST") + a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/push"), push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST") a.RegisterRoute("/all_user_stats", http.HandlerFunc(d.AllUserStatsHandler), false, "GET") a.RegisterRoute("/ha-tracker", d.HATracker, false, "GET") } @@ -313,7 +313,7 @@ type Ingester interface { } // RegisterIngester registers the ingesters HTTP and GRPC service -func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config) { +func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config, overrides *validation.Overrides) { client.RegisterIngesterServer(a.server.GRPC, i) a.indexPage.AddLink(SectionAdminEndpoints, "/ingester/all_user_stats", "Usage Statistics") @@ -328,12 +328,12 @@ func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config) { a.RegisterRoute("/ingester/renewTokens", http.HandlerFunc(i.RenewTokenHandler), false, "GET", "POST") a.RegisterRoute("/ingester/all_user_stats", http.HandlerFunc(i.AllUserStatsHandler), false, "GET") a.RegisterRoute("/ingester/mode", http.HandlerFunc(i.ModeHandler), false, "GET", "POST") - a.RegisterRoute("/ingester/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging. + a.RegisterRoute("/ingester/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, i.Push), true, "POST") // For testing and debugging. // Legacy Routes a.RegisterRoute("/flush", http.HandlerFunc(i.FlushHandler), false, "GET", "POST") a.RegisterRoute("/shutdown", http.HandlerFunc(i.ShutdownHandler), false, "GET", "POST") - a.RegisterRoute("/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging. + a.RegisterRoute("/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, i.Push), true, "POST") // For testing and debugging. } func (a *API) RegisterTenantDeletion(api *purger.TenantDeletionAPI) { diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 35511562150..67e0e65438f 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -492,7 +492,7 @@ func (t *Cortex) initIngesterService() (serv services.Service, err error) { } func (t *Cortex) initIngester() (serv services.Service, err error) { - t.API.RegisterIngester(t.Ingester, t.Cfg.Distributor) + t.API.RegisterIngester(t.Ingester, t.Cfg.Distributor, t.Overrides) return nil, nil } diff --git a/pkg/cortexpb/compatv2.go b/pkg/cortexpb/compatv2.go index e70c82b88f5..00b4ae478f7 100644 --- a/pkg/cortexpb/compatv2.go +++ b/pkg/cortexpb/compatv2.go @@ -3,6 +3,7 @@ package cortexpb import ( "fmt" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" ) @@ -32,3 +33,26 @@ func desymbolizeLabels(b *labels.ScratchBuilder, labelRefs []uint32, symbols []s b.Sort() return b.Labels(), nil } + +func MetadataV2MetricTypeToMetricType(mt MetadataV2_MetricType) model.MetricType { + switch mt { + case METRIC_TYPE_UNSPECIFIED: + return model.MetricTypeUnknown + case METRIC_TYPE_COUNTER: + return model.MetricTypeCounter + case METRIC_TYPE_GAUGE: + return model.MetricTypeGauge + case METRIC_TYPE_HISTOGRAM: + return model.MetricTypeHistogram + case METRIC_TYPE_GAUGEHISTOGRAM: + return model.MetricTypeGaugeHistogram + case METRIC_TYPE_SUMMARY: + return model.MetricTypeSummary + case METRIC_TYPE_INFO: + return model.MetricTypeInfo + case METRIC_TYPE_STATESET: + return model.MetricTypeStateset + default: + return model.MetricTypeUnknown + } +} diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 2065d0eea7b..b3f0d37c6f9 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -197,10 +197,9 @@ type InstanceLimits struct { } type OTLPConfig struct { - ConvertAllAttributes bool `yaml:"convert_all_attributes"` - DisableTargetInfo bool `yaml:"disable_target_info"` - AllowDeltaTemporality bool `yaml:"allow_delta_temporality"` - EnableTypeAndUnitLabels bool `yaml:"enable_type_and_unit_labels"` + ConvertAllAttributes bool `yaml:"convert_all_attributes"` + DisableTargetInfo bool `yaml:"disable_target_info"` + AllowDeltaTemporality bool `yaml:"allow_delta_temporality"` } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -229,7 +228,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.OTLPConfig.ConvertAllAttributes, "distributor.otlp.convert-all-attributes", false, "If true, all resource attributes are converted to labels.") f.BoolVar(&cfg.OTLPConfig.DisableTargetInfo, "distributor.otlp.disable-target-info", false, "If true, a target_info metric is not ingested. (refer to: https://github.com/prometheus/OpenMetrics/blob/main/specification/OpenMetrics.md#supporting-target-metadata-in-both-push-based-and-pull-based-systems)") f.BoolVar(&cfg.OTLPConfig.AllowDeltaTemporality, "distributor.otlp.allow-delta-temporality", false, "EXPERIMENTAL: If true, delta temporality otlp metrics to be ingested.") - f.BoolVar(&cfg.OTLPConfig.EnableTypeAndUnitLabels, "distributor.otlp.enable-type-and-unit-labels", false, "EXPERIMENTAL: If true, the '__type__' and '__unit__' labels are added for the OTLP metrics.") } // Validate config and returns error on failure diff --git a/pkg/util/push/otlp.go b/pkg/util/push/otlp.go index cdf1259d122..b7479359d4a 100644 --- a/pkg/util/push/otlp.go +++ b/pkg/util/push/otlp.go @@ -181,7 +181,7 @@ func convertToPromTS(ctx context.Context, pmetrics pmetric.Metrics, cfg distribu AddMetricSuffixes: true, DisableTargetInfo: cfg.DisableTargetInfo, AllowDeltaTemporality: cfg.AllowDeltaTemporality, - EnableTypeAndUnitLabels: cfg.EnableTypeAndUnitLabels, + EnableTypeAndUnitLabels: overrides.EnableTypeAndUnitLabels(userID), } var annots annotations.Annotations diff --git a/pkg/util/push/otlp_test.go b/pkg/util/push/otlp_test.go index 117f98e1782..67dc70f8b0e 100644 --- a/pkg/util/push/otlp_test.go +++ b/pkg/util/push/otlp_test.go @@ -80,8 +80,7 @@ func TestOTLP_EnableTypeAndUnitLabels(t *testing.T) { for _, test := range tests { t.Run(test.description, func(t *testing.T) { cfg := distributor.OTLPConfig{ - EnableTypeAndUnitLabels: test.enableTypeAndUnitLabels, - AllowDeltaTemporality: test.allowDeltaTemporality, + AllowDeltaTemporality: test.allowDeltaTemporality, } metrics := pmetric.NewMetrics() rm := metrics.ResourceMetrics().AppendEmpty() @@ -90,6 +89,7 @@ func TestOTLP_EnableTypeAndUnitLabels(t *testing.T) { test.otlpSeries.CopyTo(sm.Metrics().AppendEmpty()) limits := validation.Limits{} + limits.EnableTypeAndUnitLabels = test.enableTypeAndUnitLabels overrides := validation.NewOverrides(limits, nil) promSeries, metadata, err := convertToPromTS(ctx, metrics, cfg, overrides, "user-1", logger) require.NoError(t, err) diff --git a/pkg/util/push/push.go b/pkg/util/push/push.go index f2bd306bb81..39e4ceb54ff 100644 --- a/pkg/util/push/push.go +++ b/pkg/util/push/push.go @@ -9,14 +9,17 @@ import ( "github.com/go-kit/log/level" "github.com/prometheus/client_golang/exp/api/remote" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/schema" "github.com/prometheus/prometheus/util/compression" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/middleware" "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/extract" "github.com/cortexproject/cortex/pkg/util/log" + "github.com/cortexproject/cortex/pkg/util/validation" ) const ( @@ -36,7 +39,7 @@ const ( type Func func(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) // Handler is a http.Handler which accepts WriteRequests. -func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler { +func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, overrides *validation.Overrides, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ctx := r.Context() logger := log.WithContext(ctx, log.Logger) @@ -78,8 +81,13 @@ func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, sourceIPs *middleware } handlePRW2 := func() { + userID, err := tenant.TenantID(ctx) + if err != nil { + return + } + var req cortexpb.PreallocWriteRequestV2 - err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, &req, util.RawSnappy) + err = util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, &req, util.RawSnappy) if err != nil { level.Error(logger).Log("err", err.Error()) http.Error(w, err.Error(), http.StatusBadRequest) @@ -91,7 +99,7 @@ func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, sourceIPs *middleware req.Source = cortexpb.API } - v1Req, err := convertV2RequestToV1(&req) + v1Req, err := convertV2RequestToV1(&req, overrides.EnableTypeAndUnitLabels(userID)) if err != nil { level.Error(logger).Log("err", err.Error()) http.Error(w, err.Error(), http.StatusBadRequest) @@ -169,7 +177,7 @@ func setPRW2RespHeader(w http.ResponseWriter, samples, histograms, exemplars int w.Header().Set(rw20WrittenExemplarsHeader, strconv.FormatInt(exemplars, 10)) } -func convertV2RequestToV1(req *cortexpb.PreallocWriteRequestV2) (cortexpb.PreallocWriteRequest, error) { +func convertV2RequestToV1(req *cortexpb.PreallocWriteRequestV2, enableTypeAndUnitLabels bool) (cortexpb.PreallocWriteRequest, error) { var v1Req cortexpb.PreallocWriteRequest v1Timeseries := make([]cortexpb.PreallocTimeseries, 0, len(req.Timeseries)) var v1Metadata []*cortexpb.MetricMetadata @@ -181,10 +189,25 @@ func convertV2RequestToV1(req *cortexpb.PreallocWriteRequestV2) (cortexpb.Preall if err != nil { return v1Req, err } + + unit := symbols[v2Ts.Metadata.UnitRef] + metricType := v2Ts.Metadata.Type + shouldAttachTypeAndUnitLabels := enableTypeAndUnitLabels && (metricType != cortexpb.METRIC_TYPE_UNSPECIFIED || unit != "") + if shouldAttachTypeAndUnitLabels { + slb := labels.NewScratchBuilder(lbs.Len() + 2) // for __type__ and __unit__ + lbs.Range(func(l labels.Label) { + slb.Add(l.Name, l.Value) + }) + schema.Metadata{Type: cortexpb.MetadataV2MetricTypeToMetricType(metricType), Unit: unit}.AddToLabels(&slb) + slb.Sort() + lbs = slb.Labels() + } + exemplars, err := convertV2ToV1Exemplars(&b, symbols, v2Ts.Exemplars) if err != nil { return v1Req, err } + v1Timeseries = append(v1Timeseries, cortexpb.PreallocTimeseries{ TimeSeries: &cortexpb.TimeSeries{ Labels: cortexpb.FromLabelsToLabelAdapters(lbs), diff --git a/pkg/util/push/push_test.go b/pkg/util/push/push_test.go index 4c94acfdf98..df88e99c10f 100644 --- a/pkg/util/push/push_test.go +++ b/pkg/util/push/push_test.go @@ -11,14 +11,18 @@ import ( "github.com/golang/snappy" "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/prompb" writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/weaveworks/common/middleware" + "github.com/weaveworks/common/user" "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/cortexproject/cortex/pkg/util/validation" ) var ( @@ -68,7 +72,7 @@ func makeV2ReqWithSeries(num int) *cortexpb.PreallocWriteRequestV2 { func createPRW1HTTPRequest(seriesNum int) (*http.Request, error) { series := makeV2ReqWithSeries(seriesNum) - v1Req, err := convertV2RequestToV1(series) + v1Req, err := convertV2RequestToV1(series, false) if err != nil { return nil, err } @@ -111,6 +115,10 @@ func createPRW2HTTPRequest(seriesNum int) (*http.Request, error) { } func Benchmark_Handler(b *testing.B) { + var limits validation.Limits + flagext.DefaultValues(&limits) + overrides := validation.NewOverrides(limits, nil) + mockHandler := func(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) { // Nothing to do. return &cortexpb.WriteResponse{}, nil @@ -118,7 +126,7 @@ func Benchmark_Handler(b *testing.B) { testSeriesNums := []int{10, 100, 500, 1000} for _, seriesNum := range testSeriesNums { b.Run(fmt.Sprintf("PRW1 with %d series", seriesNum), func(b *testing.B) { - handler := Handler(true, 1000000, nil, mockHandler) + handler := Handler(true, 1000000, overrides, nil, mockHandler) req, err := createPRW1HTTPRequest(seriesNum) require.NoError(b, err) @@ -132,7 +140,7 @@ func Benchmark_Handler(b *testing.B) { } }) b.Run(fmt.Sprintf("PRW2 with %d series", seriesNum), func(b *testing.B) { - handler := Handler(true, 1000000, nil, mockHandler) + handler := Handler(true, 1000000, overrides, nil, mockHandler) req, err := createPRW2HTTPRequest(seriesNum) require.NoError(b, err) @@ -157,13 +165,176 @@ func Benchmark_convertV2RequestToV1(b *testing.B) { b.ReportAllocs() for b.Loop() { - _, err := convertV2RequestToV1(series) + _, err := convertV2RequestToV1(series, false) require.NoError(b, err) } }) } } +func Test_convertV2RequestToV1_WithEnableTypeAndUnitLabels(t *testing.T) { + symbols := []string{"", "__name__", "test_metric1", "b", "c", "baz", "qux", "d", "e", "foo", "bar", "f", "g", "h", "i", "Test gauge for test purposes", "Maybe op/sec who knows (:", "Test counter for test purposes"} + samples := []cortexpb.Sample{ + { + Value: 123, + TimestampMs: 123, + }, + } + + tests := []struct { + desc string + v2Req *cortexpb.PreallocWriteRequestV2 + expectedV1Req cortexpb.PreallocWriteRequest + enableTypeAndUnitLabels bool + }{ + { + desc: "should attach unit and type labels when the enableTypeAndUnitLabels is true", + v2Req: &cortexpb.PreallocWriteRequestV2{ + WriteRequestV2: cortexpb.WriteRequestV2{ + Symbols: symbols, + Timeseries: []cortexpb.PreallocTimeseriesV2{ + { + TimeSeriesV2: &cortexpb.TimeSeriesV2{ + LabelsRefs: []uint32{1, 2, 3, 4}, + Samples: samples, + Metadata: cortexpb.MetadataV2{Type: cortexpb.METRIC_TYPE_COUNTER, HelpRef: 15, UnitRef: 16}, + Exemplars: []cortexpb.ExemplarV2{{LabelsRefs: []uint32{11, 12}, Value: 1, Timestamp: 1}}, + }, + }, + }, + }, + }, + expectedV1Req: cortexpb.PreallocWriteRequest{ + WriteRequest: cortexpb.WriteRequest{ + Timeseries: []cortexpb.PreallocTimeseries{ + { + TimeSeries: &cortexpb.TimeSeries{ + Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("__name__", "test_metric1", "__type__", "counter", "__unit__", "Maybe op/sec who knows (:", "b", "c")), + Samples: samples, + Exemplars: []cortexpb.Exemplar{ + { + Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("f", "g")), + Value: 1, + TimestampMs: 1, + }, + }, + }, + }, + }, + Metadata: []*cortexpb.MetricMetadata{ + { + Type: cortexpb.COUNTER, + MetricFamilyName: "test_metric1", + Help: "Test gauge for test purposes", + Unit: "Maybe op/sec who knows (:", + }, + }, + }, + }, + enableTypeAndUnitLabels: true, + }, + { + desc: "should not attach unit and type labels when the enableTypeAndUnitLabels is false", + v2Req: &cortexpb.PreallocWriteRequestV2{ + WriteRequestV2: cortexpb.WriteRequestV2{ + Symbols: symbols, + Timeseries: []cortexpb.PreallocTimeseriesV2{ + { + TimeSeriesV2: &cortexpb.TimeSeriesV2{ + LabelsRefs: []uint32{1, 2, 3, 4}, + Samples: samples, + Metadata: cortexpb.MetadataV2{Type: cortexpb.METRIC_TYPE_COUNTER, HelpRef: 15, UnitRef: 16}, + Exemplars: []cortexpb.ExemplarV2{{LabelsRefs: []uint32{11, 12}, Value: 1, Timestamp: 1}}, + }, + }, + }, + }, + }, + expectedV1Req: cortexpb.PreallocWriteRequest{ + WriteRequest: cortexpb.WriteRequest{ + Timeseries: []cortexpb.PreallocTimeseries{ + { + TimeSeries: &cortexpb.TimeSeries{ + Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("__name__", "test_metric1", "b", "c")), + Samples: samples, + Exemplars: []cortexpb.Exemplar{ + { + Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("f", "g")), + Value: 1, + TimestampMs: 1, + }, + }, + }, + }, + }, + Metadata: []*cortexpb.MetricMetadata{ + { + Type: cortexpb.COUNTER, + MetricFamilyName: "test_metric1", + Help: "Test gauge for test purposes", + Unit: "Maybe op/sec who knows (:", + }, + }, + }, + }, + enableTypeAndUnitLabels: false, + }, + { + desc: "should not attach when type is unknown and unit is empty although the enableTypeAndUnitLabels is true", + v2Req: &cortexpb.PreallocWriteRequestV2{ + WriteRequestV2: cortexpb.WriteRequestV2{ + Symbols: symbols, + Timeseries: []cortexpb.PreallocTimeseriesV2{ + { + TimeSeriesV2: &cortexpb.TimeSeriesV2{ + LabelsRefs: []uint32{1, 2, 3, 4}, + Samples: samples, + Metadata: cortexpb.MetadataV2{Type: cortexpb.METRIC_TYPE_UNSPECIFIED, HelpRef: 15, UnitRef: 0}, + Exemplars: []cortexpb.ExemplarV2{{LabelsRefs: []uint32{11, 12}, Value: 1, Timestamp: 1}}, + }, + }, + }, + }, + }, + expectedV1Req: cortexpb.PreallocWriteRequest{ + WriteRequest: cortexpb.WriteRequest{ + Timeseries: []cortexpb.PreallocTimeseries{ + { + TimeSeries: &cortexpb.TimeSeries{ + Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("__name__", "test_metric1", "b", "c")), + Samples: samples, + Exemplars: []cortexpb.Exemplar{ + { + Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("f", "g")), + Value: 1, + TimestampMs: 1, + }, + }, + }, + }, + }, + Metadata: []*cortexpb.MetricMetadata{ + { + Type: cortexpb.UNKNOWN, + MetricFamilyName: "test_metric1", + Help: "Test gauge for test purposes", + }, + }, + }, + }, + enableTypeAndUnitLabels: false, + }, + } + + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + v1Req, err := convertV2RequestToV1(test.v2Req, test.enableTypeAndUnitLabels) + require.NoError(t, err) + require.Equal(t, test.expectedV1Req, v1Req) + }) + } +} + func Test_convertV2RequestToV1(t *testing.T) { var v2Req cortexpb.PreallocWriteRequestV2 @@ -208,7 +379,7 @@ func Test_convertV2RequestToV1(t *testing.T) { v2Req.Symbols = symbols v2Req.Timeseries = timeseries - v1Req, err := convertV2RequestToV1(&v2Req) + v1Req, err := convertV2RequestToV1(&v2Req, false) assert.NoError(t, err) expectedSamples := 3 expectedExemplars := 2 @@ -231,16 +402,24 @@ func Test_convertV2RequestToV1(t *testing.T) { } func TestHandler_remoteWrite(t *testing.T) { + var limits validation.Limits + flagext.DefaultValues(&limits) + overrides := validation.NewOverrides(limits, nil) + t.Run("remote write v1", func(t *testing.T) { - handler := Handler(true, 100000, nil, verifyWriteRequestHandler(t, cortexpb.API)) + handler := Handler(true, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.API)) req := createRequest(t, createPrometheusRemoteWriteProtobuf(t), false) resp := httptest.NewRecorder() handler.ServeHTTP(resp, req) assert.Equal(t, http.StatusOK, resp.Code) }) t.Run("remote write v2", func(t *testing.T) { - handler := Handler(true, 100000, nil, verifyWriteRequestHandler(t, cortexpb.API)) + ctx := context.Background() + ctx = user.InjectOrgID(ctx, "user-1") + + handler := Handler(true, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.API)) req := createRequest(t, createPrometheusRemoteWriteV2Protobuf(t), true) + req = req.WithContext(ctx) resp := httptest.NewRecorder() handler.ServeHTTP(resp, req) assert.Equal(t, http.StatusNoContent, resp.Code) @@ -254,8 +433,12 @@ func TestHandler_remoteWrite(t *testing.T) { } func TestHandler_ContentTypeAndEncoding(t *testing.T) { + var limits validation.Limits + flagext.DefaultValues(&limits) + overrides := validation.NewOverrides(limits, nil) + sourceIPs, _ := middleware.NewSourceIPs("SomeField", "(.*)") - handler := Handler(true, 100000, sourceIPs, verifyWriteRequestHandler(t, cortexpb.API)) + handler := Handler(true, 100000, overrides, sourceIPs, verifyWriteRequestHandler(t, cortexpb.API)) tests := []struct { description string @@ -359,7 +542,11 @@ func TestHandler_ContentTypeAndEncoding(t *testing.T) { for _, test := range tests { t.Run(test.description, func(t *testing.T) { if test.isV2 { + ctx := context.Background() + ctx = user.InjectOrgID(ctx, "user-1") + req := createRequestWithHeaders(t, test.reqHeaders, createCortexRemoteWriteV2Protobuf(t, false, cortexpb.API)) + req = req.WithContext(ctx) resp := httptest.NewRecorder() handler.ServeHTTP(resp, req) assert.Equal(t, test.expectedCode, resp.Code) @@ -374,8 +561,12 @@ func TestHandler_ContentTypeAndEncoding(t *testing.T) { } func TestHandler_cortexWriteRequest(t *testing.T) { + var limits validation.Limits + flagext.DefaultValues(&limits) + overrides := validation.NewOverrides(limits, nil) + sourceIPs, _ := middleware.NewSourceIPs("SomeField", "(.*)") - handler := Handler(true, 100000, sourceIPs, verifyWriteRequestHandler(t, cortexpb.API)) + handler := Handler(true, 100000, overrides, sourceIPs, verifyWriteRequestHandler(t, cortexpb.API)) t.Run("remote write v1", func(t *testing.T) { req := createRequest(t, createCortexWriteRequestProtobuf(t, false, cortexpb.API), false) @@ -384,7 +575,11 @@ func TestHandler_cortexWriteRequest(t *testing.T) { assert.Equal(t, 200, resp.Code) }) t.Run("remote write v2", func(t *testing.T) { + ctx := context.Background() + ctx = user.InjectOrgID(ctx, "user-1") + req := createRequest(t, createCortexRemoteWriteV2Protobuf(t, false, cortexpb.API), true) + req = req.WithContext(ctx) resp := httptest.NewRecorder() handler.ServeHTTP(resp, req) assert.Equal(t, http.StatusNoContent, resp.Code) @@ -392,12 +587,16 @@ func TestHandler_cortexWriteRequest(t *testing.T) { } func TestHandler_ignoresSkipLabelNameValidationIfSet(t *testing.T) { + var limits validation.Limits + flagext.DefaultValues(&limits) + overrides := validation.NewOverrides(limits, nil) + for _, req := range []*http.Request{ createRequest(t, createCortexWriteRequestProtobuf(t, true, cortexpb.RULE), false), createRequest(t, createCortexWriteRequestProtobuf(t, true, cortexpb.RULE), false), } { resp := httptest.NewRecorder() - handler := Handler(true, 100000, nil, verifyWriteRequestHandler(t, cortexpb.RULE)) + handler := Handler(true, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.RULE)) handler.ServeHTTP(resp, req) assert.Equal(t, 200, resp.Code) } diff --git a/pkg/util/validation/exporter_test.go b/pkg/util/validation/exporter_test.go index 3c2ca56e59b..fc3fee57319 100644 --- a/pkg/util/validation/exporter_test.go +++ b/pkg/util/validation/exporter_test.go @@ -54,6 +54,7 @@ func TestOverridesExporter_withConfig(t *testing.T) { cortex_overrides{limit_name="compactor_tenant_shard_size",user="tenant-a"} 0 cortex_overrides{limit_name="creation_grace_period",user="tenant-a"} 600 cortex_overrides{limit_name="enable_native_histograms",user="tenant-a"} 0 + cortex_overrides{limit_name="enable_type_and_unit_labels",user="tenant-a"} 0 cortex_overrides{limit_name="enforce_metadata_metric_name",user="tenant-a"} 1 cortex_overrides{limit_name="enforce_metric_name",user="tenant-a"} 1 cortex_overrides{limit_name="ha_max_clusters",user="tenant-a"} 0 diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 16b956bef1b..15129e96834 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -152,6 +152,7 @@ type Limits struct { MetricRelabelConfigs []*relabel.Config `yaml:"metric_relabel_configs,omitempty" json:"metric_relabel_configs,omitempty" doc:"nocli|description=List of metric relabel configurations. Note that in most situations, it is more effective to use metrics relabeling directly in the Prometheus server, e.g. remote_write.write_relabel_configs."` MaxNativeHistogramBuckets int `yaml:"max_native_histogram_buckets" json:"max_native_histogram_buckets"` PromoteResourceAttributes []string `yaml:"promote_resource_attributes" json:"promote_resource_attributes"` + EnableTypeAndUnitLabels bool `yaml:"enable_type_and_unit_labels" json:"enable_type_and_unit_labels"` // Ingester enforced limits. // Series @@ -264,6 +265,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.HAMaxClusters, "distributor.ha-tracker.max-clusters", 0, "Maximum number of clusters that HA tracker will keep track of for single user. 0 to disable the limit.") f.Var((*flagext.StringSliceCSV)(&l.PromoteResourceAttributes), "distributor.promote-resource-attributes", "Comma separated list of resource attributes that should be converted to labels.") f.Var(&l.DropLabels, "distributor.drop-label", "This flag can be used to specify label names that to drop during sample ingestion within the distributor and can be repeated in order to drop multiple labels.") + f.BoolVar(&l.EnableTypeAndUnitLabels, "distributor.enable-type-and-unit-labels", false, "EXPERIMENTAL: If true, the __type__ and __unit__ labels are added to metrics. This applies to remote write v2 and OTLP requests.") f.IntVar(&l.MaxLabelNameLength, "validation.max-length-label-name", 1024, "Maximum length accepted for label names") f.IntVar(&l.MaxLabelValueLength, "validation.max-length-label-value", 2048, "Maximum length accepted for label value. This setting also applies to the metric name") f.IntVar(&l.MaxLabelNamesPerSeries, "validation.max-label-names-per-series", 30, "Maximum number of label names per series.") @@ -1092,6 +1094,10 @@ func (o *Overrides) AlertmanagerMaxSilenceSizeBytes(userID string) int { return o.GetOverridesForUser(userID).AlertmanagerMaxSilencesSizeBytes } +func (o *Overrides) EnableTypeAndUnitLabels(userID string) bool { + return o.GetOverridesForUser(userID).EnableTypeAndUnitLabels +} + func (o *Overrides) DisabledRuleGroups(userID string) DisabledRuleGroups { if o.tenantLimits != nil { l := o.tenantLimits.ByUserID(userID) diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 8b748f0c8d3..78dda6a83da 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -3822,12 +3822,6 @@ "description": "If true, a target_info metric is not ingested. (refer to: https://github.com/prometheus/OpenMetrics/blob/main/specification/OpenMetrics.md#supporting-target-metadata-in-both-push-based-and-pull-based-systems)", "type": "boolean", "x-cli-flag": "distributor.otlp.disable-target-info" - }, - "enable_type_and_unit_labels": { - "default": false, - "description": "EXPERIMENTAL: If true, the '__type__' and '__unit__' labels are added for the OTLP metrics.", - "type": "boolean", - "x-cli-flag": "distributor.otlp.enable-type-and-unit-labels" } }, "type": "object" @@ -4946,6 +4940,12 @@ "type": "boolean", "x-cli-flag": "blocks-storage.tsdb.enable-native-histograms" }, + "enable_type_and_unit_labels": { + "default": false, + "description": "EXPERIMENTAL: If true, the __type__ and __unit__ labels are added to metrics. This applies to remote write v2 and OTLP requests.", + "type": "boolean", + "x-cli-flag": "distributor.enable-type-and-unit-labels" + }, "enforce_metadata_metric_name": { "default": true, "description": "Enforce every metadata has a metric name.",