From 23f83374be67301c74ad71017a64d9626c53fdc9 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Thu, 30 Oct 2025 15:06:25 +0900 Subject: [PATCH 1/5] Add per tenant flag to add unit and type labels for rw2 request Signed-off-by: SungJin1212 --- CHANGELOG.md | 1 + docs/configuration/config-file-reference.md | 5 + integration/e2e/util.go | 6 +- integration/remote_write_v2_test.go | 71 +++++++ pkg/api/api.go | 10 +- pkg/cortex/modules.go | 2 +- pkg/cortexpb/compatv2.go | 24 +++ pkg/util/push/push.go | 31 ++- pkg/util/push/push_test.go | 219 +++++++++++++++++++- pkg/util/validation/exporter_test.go | 1 + pkg/util/validation/limits.go | 6 + schemas/cortex-config-schema.json | 6 + 12 files changed, 360 insertions(+), 22 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b124f70faab..03c15d6a326 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,7 @@ * [FEATURE] Querier: Support for configuring query optimizers and enabling XFunctions in the Thanos engine. #6873 * [FEATURE] Query Frontend: Add support /api/v1/format_query API for formatting queries. #6893 * [FEATURE] Query Frontend: Add support for /api/v1/parse_query API (experimental) to parse a PromQL expression and return it as a JSON-formatted AST (abstract syntax tree). #6978 +* [FEATURE] Distributor: Add a per-tenant flag `-distributor.rw2-enable-type-and-unit-labels` which enables to add `__unit__` and `__type__` labels for remote write v2 requests. #7077 * [ENHANCEMENT] Upgrade the Prometheus version to 3.6.0 and add a `-name-validation-scheme` flag to support UTF-8. #7040 #7056 * [ENHANCEMENT] Distributor: Emit an error with a 400 status code when empty labels are found before the relabelling or label dropping process. #7052 * [ENHANCEMENT] Parquet Storage: Add support for additional sort columns during Parquet file generation #7003 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 200d23476de..fc7a7833a8d 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3998,6 +3998,11 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s # CLI flag: -distributor.promote-resource-attributes [promote_resource_attributes: | default = ] +# EXPERIMENTAL: If true, the __type__ and __unit__ labels are added to metrics. +# This only applies to remote write v2 requests. +# CLI flag: -distributor.rw2-enable-type-and-unit-labels +[rw_2_enable_type_and_unit_labels: | default = false] + # The maximum number of active series per user, per ingester. 0 to disable. # CLI flag: -ingester.max-series-per-user [max_series_per_user: | default = 5000000] diff --git a/integration/e2e/util.go b/integration/e2e/util.go index c7af4141574..dd8f112ac0d 100644 --- a/integration/e2e/util.go +++ b/integration/e2e/util.go @@ -465,10 +465,10 @@ func GenerateSeriesV2(name string, ts time.Time, additionalLabels ...prompb.Labe st := writev2.NewSymbolTable() lb := labels.NewScratchBuilder(0) lb.Add("__name__", name) - for _, label := range additionalLabels { lb.Add(label.Name, label.Value) } + series = append(series, writev2.TimeSeries{ // Generate the series LabelsRefs: st.SymbolizeLabels(lb.Labels(), nil), @@ -476,7 +476,9 @@ func GenerateSeriesV2(name string, ts time.Time, additionalLabels ...prompb.Labe {Value: value, Timestamp: tsMillis}, }, Metadata: writev2.Metadata{ - Type: writev2.Metadata_METRIC_TYPE_GAUGE, + Type: writev2.Metadata_METRIC_TYPE_GAUGE, + HelpRef: 2, // equal to name + UnitRef: 2, // equal to name }, }) symbols = st.Symbols() diff --git a/integration/remote_write_v2_test.go b/integration/remote_write_v2_test.go index c295b6a62ab..64b68aa9601 100644 --- a/integration/remote_write_v2_test.go +++ b/integration/remote_write_v2_test.go @@ -205,6 +205,77 @@ func TestIngest_SenderSendPRW2_DistributorNotAllowPRW2(t *testing.T) { require.Empty(t, result) } +func TestIngest_EnableTypeAndUnitLabels(t *testing.T) { + const blockRangePeriod = 5 * time.Second + + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Start dependencies. + consul := e2edb.NewConsulWithName("consul") + require.NoError(t, s.StartAndWaitReady(consul)) + + flags := mergeFlags( + AlertmanagerLocalFlags(), + map[string]string{ + "-store.engine": blocksStorageEngine, + "-blocks-storage.backend": "filesystem", + "-blocks-storage.tsdb.head-compaction-interval": "4m", + "-blocks-storage.bucket-store.sync-interval": "15m", + "-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory, + "-blocks-storage.bucket-store.bucket-index.enabled": "true", + "-querier.query-store-for-labels-enabled": "true", + "-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(), + "-blocks-storage.tsdb.ship-interval": "1s", + "-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(), + "-blocks-storage.tsdb.enable-native-histograms": "true", + // Ingester. + "-ring.store": "consul", + "-consul.hostname": consul.NetworkHTTPEndpoint(), + // Distributor. + "-distributor.replication-factor": "1", + "-distributor.remote-writev2-enabled": "true", + "-distributor.rw2-enable-type-and-unit-labels": "true", + // Store-gateway. + "-store-gateway.sharding-enabled": "false", + // alert manager + "-alertmanager.web.external-url": "http://localhost/alertmanager", + }, + ) + + // make alert manager config dir + require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{})) + + path := path.Join(s.SharedDir(), "cortex-1") + + flags = mergeFlags(flags, map[string]string{"-blocks-storage.filesystem.dir": path}) + // Start Cortex replicas. + cortex := e2ecortex.NewSingleBinary("cortex", flags, "") + require.NoError(t, s.StartAndWaitReady(cortex)) + + // Wait until Cortex replicas have updated the ring state. + require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total")) + + c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + now := time.Now() + + // series push + symbols1, series, _ := e2e.GenerateSeriesV2("test_series", now, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "foo", Value: "bar"}) + writeStats, err := c.PushV2(symbols1, series) + require.NoError(t, err) + testPushHeader(t, writeStats, 1, 0, 0) + + value, err := c.Query("test_series", now) + require.NoError(t, err) + require.Equal(t, model.ValVector, value.Type()) + vec := value.(model.Vector) + require.True(t, vec[0].Metric["__unit__"] != "") + require.True(t, vec[0].Metric["__type__"] != "") +} + func TestIngest(t *testing.T) { const blockRangePeriod = 5 * time.Second diff --git a/pkg/api/api.go b/pkg/api/api.go index e124fec3e68..e77d082d436 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -283,7 +283,7 @@ func (a *API) RegisterRuntimeConfig(runtimeConfigHandler http.HandlerFunc) { func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config, overrides *validation.Overrides) { distributorpb.RegisterDistributorServer(a.server.GRPC, d) - a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST") + a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST") a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(pushConfig.OTLPMaxRecvMsgSize, overrides, pushConfig.OTLPConfig, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST") a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/ring", "Distributor Ring Status") @@ -295,7 +295,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib a.RegisterRoute("/distributor/ha_tracker", d.HATracker, false, "GET") // Legacy Routes - a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/push"), push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST") + a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/push"), push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST") a.RegisterRoute("/all_user_stats", http.HandlerFunc(d.AllUserStatsHandler), false, "GET") a.RegisterRoute("/ha-tracker", d.HATracker, false, "GET") } @@ -313,7 +313,7 @@ type Ingester interface { } // RegisterIngester registers the ingesters HTTP and GRPC service -func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config) { +func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config, overrides *validation.Overrides) { client.RegisterIngesterServer(a.server.GRPC, i) a.indexPage.AddLink(SectionAdminEndpoints, "/ingester/all_user_stats", "Usage Statistics") @@ -328,12 +328,12 @@ func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config) { a.RegisterRoute("/ingester/renewTokens", http.HandlerFunc(i.RenewTokenHandler), false, "GET", "POST") a.RegisterRoute("/ingester/all_user_stats", http.HandlerFunc(i.AllUserStatsHandler), false, "GET") a.RegisterRoute("/ingester/mode", http.HandlerFunc(i.ModeHandler), false, "GET", "POST") - a.RegisterRoute("/ingester/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging. + a.RegisterRoute("/ingester/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, i.Push), true, "POST") // For testing and debugging. // Legacy Routes a.RegisterRoute("/flush", http.HandlerFunc(i.FlushHandler), false, "GET", "POST") a.RegisterRoute("/shutdown", http.HandlerFunc(i.ShutdownHandler), false, "GET", "POST") - a.RegisterRoute("/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging. + a.RegisterRoute("/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, i.Push), true, "POST") // For testing and debugging. } func (a *API) RegisterTenantDeletion(api *purger.TenantDeletionAPI) { diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 35511562150..67e0e65438f 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -492,7 +492,7 @@ func (t *Cortex) initIngesterService() (serv services.Service, err error) { } func (t *Cortex) initIngester() (serv services.Service, err error) { - t.API.RegisterIngester(t.Ingester, t.Cfg.Distributor) + t.API.RegisterIngester(t.Ingester, t.Cfg.Distributor, t.Overrides) return nil, nil } diff --git a/pkg/cortexpb/compatv2.go b/pkg/cortexpb/compatv2.go index e70c82b88f5..00b4ae478f7 100644 --- a/pkg/cortexpb/compatv2.go +++ b/pkg/cortexpb/compatv2.go @@ -3,6 +3,7 @@ package cortexpb import ( "fmt" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" ) @@ -32,3 +33,26 @@ func desymbolizeLabels(b *labels.ScratchBuilder, labelRefs []uint32, symbols []s b.Sort() return b.Labels(), nil } + +func MetadataV2MetricTypeToMetricType(mt MetadataV2_MetricType) model.MetricType { + switch mt { + case METRIC_TYPE_UNSPECIFIED: + return model.MetricTypeUnknown + case METRIC_TYPE_COUNTER: + return model.MetricTypeCounter + case METRIC_TYPE_GAUGE: + return model.MetricTypeGauge + case METRIC_TYPE_HISTOGRAM: + return model.MetricTypeHistogram + case METRIC_TYPE_GAUGEHISTOGRAM: + return model.MetricTypeGaugeHistogram + case METRIC_TYPE_SUMMARY: + return model.MetricTypeSummary + case METRIC_TYPE_INFO: + return model.MetricTypeInfo + case METRIC_TYPE_STATESET: + return model.MetricTypeStateset + default: + return model.MetricTypeUnknown + } +} diff --git a/pkg/util/push/push.go b/pkg/util/push/push.go index f2bd306bb81..c9fbf36ab0a 100644 --- a/pkg/util/push/push.go +++ b/pkg/util/push/push.go @@ -9,14 +9,17 @@ import ( "github.com/go-kit/log/level" "github.com/prometheus/client_golang/exp/api/remote" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/schema" "github.com/prometheus/prometheus/util/compression" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/middleware" "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/extract" "github.com/cortexproject/cortex/pkg/util/log" + "github.com/cortexproject/cortex/pkg/util/validation" ) const ( @@ -36,7 +39,7 @@ const ( type Func func(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) // Handler is a http.Handler which accepts WriteRequests. -func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler { +func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, overrides *validation.Overrides, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ctx := r.Context() logger := log.WithContext(ctx, log.Logger) @@ -78,8 +81,13 @@ func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, sourceIPs *middleware } handlePRW2 := func() { + userID, err := tenant.TenantID(ctx) + if err != nil { + return + } + var req cortexpb.PreallocWriteRequestV2 - err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, &req, util.RawSnappy) + err = util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, &req, util.RawSnappy) if err != nil { level.Error(logger).Log("err", err.Error()) http.Error(w, err.Error(), http.StatusBadRequest) @@ -91,7 +99,7 @@ func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, sourceIPs *middleware req.Source = cortexpb.API } - v1Req, err := convertV2RequestToV1(&req) + v1Req, err := convertV2RequestToV1(&req, overrides.RW2EnableTypeAndUnitLabels(userID)) if err != nil { level.Error(logger).Log("err", err.Error()) http.Error(w, err.Error(), http.StatusBadRequest) @@ -169,7 +177,7 @@ func setPRW2RespHeader(w http.ResponseWriter, samples, histograms, exemplars int w.Header().Set(rw20WrittenExemplarsHeader, strconv.FormatInt(exemplars, 10)) } -func convertV2RequestToV1(req *cortexpb.PreallocWriteRequestV2) (cortexpb.PreallocWriteRequest, error) { +func convertV2RequestToV1(req *cortexpb.PreallocWriteRequestV2, enableTypeAndUnitLabels bool) (cortexpb.PreallocWriteRequest, error) { var v1Req cortexpb.PreallocWriteRequest v1Timeseries := make([]cortexpb.PreallocTimeseries, 0, len(req.Timeseries)) var v1Metadata []*cortexpb.MetricMetadata @@ -181,10 +189,25 @@ func convertV2RequestToV1(req *cortexpb.PreallocWriteRequestV2) (cortexpb.Preall if err != nil { return v1Req, err } + + unit := symbols[v2Ts.Metadata.UnitRef] + metricType := v2Ts.Metadata.Type + shouldAttachTypeAndUnitLabels := enableTypeAndUnitLabels && (metricType != cortexpb.METRIC_TYPE_UNSPECIFIED || unit != "") + if shouldAttachTypeAndUnitLabels { + slb := labels.NewScratchBuilder(lbs.Len() + 2) // for __type__ and __unit__ + lbs.Range(func(l labels.Label) { + slb.Add(l.Name, l.Value) + }) + schema.Metadata{Type: cortexpb.MetadataV2MetricTypeToMetricType(metricType), Unit: unit}.AddToLabels(&slb) + slb.Sort() + lbs = slb.Labels() + } + exemplars, err := convertV2ToV1Exemplars(&b, symbols, v2Ts.Exemplars) if err != nil { return v1Req, err } + v1Timeseries = append(v1Timeseries, cortexpb.PreallocTimeseries{ TimeSeries: &cortexpb.TimeSeries{ Labels: cortexpb.FromLabelsToLabelAdapters(lbs), diff --git a/pkg/util/push/push_test.go b/pkg/util/push/push_test.go index 4c94acfdf98..df88e99c10f 100644 --- a/pkg/util/push/push_test.go +++ b/pkg/util/push/push_test.go @@ -11,14 +11,18 @@ import ( "github.com/golang/snappy" "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/prompb" writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/weaveworks/common/middleware" + "github.com/weaveworks/common/user" "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/cortexproject/cortex/pkg/util/validation" ) var ( @@ -68,7 +72,7 @@ func makeV2ReqWithSeries(num int) *cortexpb.PreallocWriteRequestV2 { func createPRW1HTTPRequest(seriesNum int) (*http.Request, error) { series := makeV2ReqWithSeries(seriesNum) - v1Req, err := convertV2RequestToV1(series) + v1Req, err := convertV2RequestToV1(series, false) if err != nil { return nil, err } @@ -111,6 +115,10 @@ func createPRW2HTTPRequest(seriesNum int) (*http.Request, error) { } func Benchmark_Handler(b *testing.B) { + var limits validation.Limits + flagext.DefaultValues(&limits) + overrides := validation.NewOverrides(limits, nil) + mockHandler := func(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) { // Nothing to do. return &cortexpb.WriteResponse{}, nil @@ -118,7 +126,7 @@ func Benchmark_Handler(b *testing.B) { testSeriesNums := []int{10, 100, 500, 1000} for _, seriesNum := range testSeriesNums { b.Run(fmt.Sprintf("PRW1 with %d series", seriesNum), func(b *testing.B) { - handler := Handler(true, 1000000, nil, mockHandler) + handler := Handler(true, 1000000, overrides, nil, mockHandler) req, err := createPRW1HTTPRequest(seriesNum) require.NoError(b, err) @@ -132,7 +140,7 @@ func Benchmark_Handler(b *testing.B) { } }) b.Run(fmt.Sprintf("PRW2 with %d series", seriesNum), func(b *testing.B) { - handler := Handler(true, 1000000, nil, mockHandler) + handler := Handler(true, 1000000, overrides, nil, mockHandler) req, err := createPRW2HTTPRequest(seriesNum) require.NoError(b, err) @@ -157,13 +165,176 @@ func Benchmark_convertV2RequestToV1(b *testing.B) { b.ReportAllocs() for b.Loop() { - _, err := convertV2RequestToV1(series) + _, err := convertV2RequestToV1(series, false) require.NoError(b, err) } }) } } +func Test_convertV2RequestToV1_WithEnableTypeAndUnitLabels(t *testing.T) { + symbols := []string{"", "__name__", "test_metric1", "b", "c", "baz", "qux", "d", "e", "foo", "bar", "f", "g", "h", "i", "Test gauge for test purposes", "Maybe op/sec who knows (:", "Test counter for test purposes"} + samples := []cortexpb.Sample{ + { + Value: 123, + TimestampMs: 123, + }, + } + + tests := []struct { + desc string + v2Req *cortexpb.PreallocWriteRequestV2 + expectedV1Req cortexpb.PreallocWriteRequest + enableTypeAndUnitLabels bool + }{ + { + desc: "should attach unit and type labels when the enableTypeAndUnitLabels is true", + v2Req: &cortexpb.PreallocWriteRequestV2{ + WriteRequestV2: cortexpb.WriteRequestV2{ + Symbols: symbols, + Timeseries: []cortexpb.PreallocTimeseriesV2{ + { + TimeSeriesV2: &cortexpb.TimeSeriesV2{ + LabelsRefs: []uint32{1, 2, 3, 4}, + Samples: samples, + Metadata: cortexpb.MetadataV2{Type: cortexpb.METRIC_TYPE_COUNTER, HelpRef: 15, UnitRef: 16}, + Exemplars: []cortexpb.ExemplarV2{{LabelsRefs: []uint32{11, 12}, Value: 1, Timestamp: 1}}, + }, + }, + }, + }, + }, + expectedV1Req: cortexpb.PreallocWriteRequest{ + WriteRequest: cortexpb.WriteRequest{ + Timeseries: []cortexpb.PreallocTimeseries{ + { + TimeSeries: &cortexpb.TimeSeries{ + Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("__name__", "test_metric1", "__type__", "counter", "__unit__", "Maybe op/sec who knows (:", "b", "c")), + Samples: samples, + Exemplars: []cortexpb.Exemplar{ + { + Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("f", "g")), + Value: 1, + TimestampMs: 1, + }, + }, + }, + }, + }, + Metadata: []*cortexpb.MetricMetadata{ + { + Type: cortexpb.COUNTER, + MetricFamilyName: "test_metric1", + Help: "Test gauge for test purposes", + Unit: "Maybe op/sec who knows (:", + }, + }, + }, + }, + enableTypeAndUnitLabels: true, + }, + { + desc: "should not attach unit and type labels when the enableTypeAndUnitLabels is false", + v2Req: &cortexpb.PreallocWriteRequestV2{ + WriteRequestV2: cortexpb.WriteRequestV2{ + Symbols: symbols, + Timeseries: []cortexpb.PreallocTimeseriesV2{ + { + TimeSeriesV2: &cortexpb.TimeSeriesV2{ + LabelsRefs: []uint32{1, 2, 3, 4}, + Samples: samples, + Metadata: cortexpb.MetadataV2{Type: cortexpb.METRIC_TYPE_COUNTER, HelpRef: 15, UnitRef: 16}, + Exemplars: []cortexpb.ExemplarV2{{LabelsRefs: []uint32{11, 12}, Value: 1, Timestamp: 1}}, + }, + }, + }, + }, + }, + expectedV1Req: cortexpb.PreallocWriteRequest{ + WriteRequest: cortexpb.WriteRequest{ + Timeseries: []cortexpb.PreallocTimeseries{ + { + TimeSeries: &cortexpb.TimeSeries{ + Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("__name__", "test_metric1", "b", "c")), + Samples: samples, + Exemplars: []cortexpb.Exemplar{ + { + Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("f", "g")), + Value: 1, + TimestampMs: 1, + }, + }, + }, + }, + }, + Metadata: []*cortexpb.MetricMetadata{ + { + Type: cortexpb.COUNTER, + MetricFamilyName: "test_metric1", + Help: "Test gauge for test purposes", + Unit: "Maybe op/sec who knows (:", + }, + }, + }, + }, + enableTypeAndUnitLabels: false, + }, + { + desc: "should not attach when type is unknown and unit is empty although the enableTypeAndUnitLabels is true", + v2Req: &cortexpb.PreallocWriteRequestV2{ + WriteRequestV2: cortexpb.WriteRequestV2{ + Symbols: symbols, + Timeseries: []cortexpb.PreallocTimeseriesV2{ + { + TimeSeriesV2: &cortexpb.TimeSeriesV2{ + LabelsRefs: []uint32{1, 2, 3, 4}, + Samples: samples, + Metadata: cortexpb.MetadataV2{Type: cortexpb.METRIC_TYPE_UNSPECIFIED, HelpRef: 15, UnitRef: 0}, + Exemplars: []cortexpb.ExemplarV2{{LabelsRefs: []uint32{11, 12}, Value: 1, Timestamp: 1}}, + }, + }, + }, + }, + }, + expectedV1Req: cortexpb.PreallocWriteRequest{ + WriteRequest: cortexpb.WriteRequest{ + Timeseries: []cortexpb.PreallocTimeseries{ + { + TimeSeries: &cortexpb.TimeSeries{ + Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("__name__", "test_metric1", "b", "c")), + Samples: samples, + Exemplars: []cortexpb.Exemplar{ + { + Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("f", "g")), + Value: 1, + TimestampMs: 1, + }, + }, + }, + }, + }, + Metadata: []*cortexpb.MetricMetadata{ + { + Type: cortexpb.UNKNOWN, + MetricFamilyName: "test_metric1", + Help: "Test gauge for test purposes", + }, + }, + }, + }, + enableTypeAndUnitLabels: false, + }, + } + + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + v1Req, err := convertV2RequestToV1(test.v2Req, test.enableTypeAndUnitLabels) + require.NoError(t, err) + require.Equal(t, test.expectedV1Req, v1Req) + }) + } +} + func Test_convertV2RequestToV1(t *testing.T) { var v2Req cortexpb.PreallocWriteRequestV2 @@ -208,7 +379,7 @@ func Test_convertV2RequestToV1(t *testing.T) { v2Req.Symbols = symbols v2Req.Timeseries = timeseries - v1Req, err := convertV2RequestToV1(&v2Req) + v1Req, err := convertV2RequestToV1(&v2Req, false) assert.NoError(t, err) expectedSamples := 3 expectedExemplars := 2 @@ -231,16 +402,24 @@ func Test_convertV2RequestToV1(t *testing.T) { } func TestHandler_remoteWrite(t *testing.T) { + var limits validation.Limits + flagext.DefaultValues(&limits) + overrides := validation.NewOverrides(limits, nil) + t.Run("remote write v1", func(t *testing.T) { - handler := Handler(true, 100000, nil, verifyWriteRequestHandler(t, cortexpb.API)) + handler := Handler(true, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.API)) req := createRequest(t, createPrometheusRemoteWriteProtobuf(t), false) resp := httptest.NewRecorder() handler.ServeHTTP(resp, req) assert.Equal(t, http.StatusOK, resp.Code) }) t.Run("remote write v2", func(t *testing.T) { - handler := Handler(true, 100000, nil, verifyWriteRequestHandler(t, cortexpb.API)) + ctx := context.Background() + ctx = user.InjectOrgID(ctx, "user-1") + + handler := Handler(true, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.API)) req := createRequest(t, createPrometheusRemoteWriteV2Protobuf(t), true) + req = req.WithContext(ctx) resp := httptest.NewRecorder() handler.ServeHTTP(resp, req) assert.Equal(t, http.StatusNoContent, resp.Code) @@ -254,8 +433,12 @@ func TestHandler_remoteWrite(t *testing.T) { } func TestHandler_ContentTypeAndEncoding(t *testing.T) { + var limits validation.Limits + flagext.DefaultValues(&limits) + overrides := validation.NewOverrides(limits, nil) + sourceIPs, _ := middleware.NewSourceIPs("SomeField", "(.*)") - handler := Handler(true, 100000, sourceIPs, verifyWriteRequestHandler(t, cortexpb.API)) + handler := Handler(true, 100000, overrides, sourceIPs, verifyWriteRequestHandler(t, cortexpb.API)) tests := []struct { description string @@ -359,7 +542,11 @@ func TestHandler_ContentTypeAndEncoding(t *testing.T) { for _, test := range tests { t.Run(test.description, func(t *testing.T) { if test.isV2 { + ctx := context.Background() + ctx = user.InjectOrgID(ctx, "user-1") + req := createRequestWithHeaders(t, test.reqHeaders, createCortexRemoteWriteV2Protobuf(t, false, cortexpb.API)) + req = req.WithContext(ctx) resp := httptest.NewRecorder() handler.ServeHTTP(resp, req) assert.Equal(t, test.expectedCode, resp.Code) @@ -374,8 +561,12 @@ func TestHandler_ContentTypeAndEncoding(t *testing.T) { } func TestHandler_cortexWriteRequest(t *testing.T) { + var limits validation.Limits + flagext.DefaultValues(&limits) + overrides := validation.NewOverrides(limits, nil) + sourceIPs, _ := middleware.NewSourceIPs("SomeField", "(.*)") - handler := Handler(true, 100000, sourceIPs, verifyWriteRequestHandler(t, cortexpb.API)) + handler := Handler(true, 100000, overrides, sourceIPs, verifyWriteRequestHandler(t, cortexpb.API)) t.Run("remote write v1", func(t *testing.T) { req := createRequest(t, createCortexWriteRequestProtobuf(t, false, cortexpb.API), false) @@ -384,7 +575,11 @@ func TestHandler_cortexWriteRequest(t *testing.T) { assert.Equal(t, 200, resp.Code) }) t.Run("remote write v2", func(t *testing.T) { + ctx := context.Background() + ctx = user.InjectOrgID(ctx, "user-1") + req := createRequest(t, createCortexRemoteWriteV2Protobuf(t, false, cortexpb.API), true) + req = req.WithContext(ctx) resp := httptest.NewRecorder() handler.ServeHTTP(resp, req) assert.Equal(t, http.StatusNoContent, resp.Code) @@ -392,12 +587,16 @@ func TestHandler_cortexWriteRequest(t *testing.T) { } func TestHandler_ignoresSkipLabelNameValidationIfSet(t *testing.T) { + var limits validation.Limits + flagext.DefaultValues(&limits) + overrides := validation.NewOverrides(limits, nil) + for _, req := range []*http.Request{ createRequest(t, createCortexWriteRequestProtobuf(t, true, cortexpb.RULE), false), createRequest(t, createCortexWriteRequestProtobuf(t, true, cortexpb.RULE), false), } { resp := httptest.NewRecorder() - handler := Handler(true, 100000, nil, verifyWriteRequestHandler(t, cortexpb.RULE)) + handler := Handler(true, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.RULE)) handler.ServeHTTP(resp, req) assert.Equal(t, 200, resp.Code) } diff --git a/pkg/util/validation/exporter_test.go b/pkg/util/validation/exporter_test.go index 3c2ca56e59b..0f387855be0 100644 --- a/pkg/util/validation/exporter_test.go +++ b/pkg/util/validation/exporter_test.go @@ -108,6 +108,7 @@ func TestOverridesExporter_withConfig(t *testing.T) { cortex_overrides{limit_name="ruler_query_offset",user="tenant-a"} 0 cortex_overrides{limit_name="ruler_tenant_shard_size",user="tenant-a"} 0 cortex_overrides{limit_name="rules_partial_data",user="tenant-a"} 0 + cortex_overrides{limit_name="rw_2_enable_type_and_unit_labels",user="tenant-a"} 0 cortex_overrides{limit_name="store_gateway_tenant_shard_size",user="tenant-a"} 0 `), "cortex_overrides")) } diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 16b956bef1b..6fb069d7ed2 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -152,6 +152,7 @@ type Limits struct { MetricRelabelConfigs []*relabel.Config `yaml:"metric_relabel_configs,omitempty" json:"metric_relabel_configs,omitempty" doc:"nocli|description=List of metric relabel configurations. Note that in most situations, it is more effective to use metrics relabeling directly in the Prometheus server, e.g. remote_write.write_relabel_configs."` MaxNativeHistogramBuckets int `yaml:"max_native_histogram_buckets" json:"max_native_histogram_buckets"` PromoteResourceAttributes []string `yaml:"promote_resource_attributes" json:"promote_resource_attributes"` + RW2EnableTypeAndUnitLabels bool `yaml:"rw_2_enable_type_and_unit_labels" json:"rw_2_enable_type_and_unit_labels"` // Ingester enforced limits. // Series @@ -264,6 +265,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.HAMaxClusters, "distributor.ha-tracker.max-clusters", 0, "Maximum number of clusters that HA tracker will keep track of for single user. 0 to disable the limit.") f.Var((*flagext.StringSliceCSV)(&l.PromoteResourceAttributes), "distributor.promote-resource-attributes", "Comma separated list of resource attributes that should be converted to labels.") f.Var(&l.DropLabels, "distributor.drop-label", "This flag can be used to specify label names that to drop during sample ingestion within the distributor and can be repeated in order to drop multiple labels.") + f.BoolVar(&l.RW2EnableTypeAndUnitLabels, "distributor.rw2-enable-type-and-unit-labels", false, "EXPERIMENTAL: If true, the __type__ and __unit__ labels are added to metrics. This only applies to remote write v2 requests.") f.IntVar(&l.MaxLabelNameLength, "validation.max-length-label-name", 1024, "Maximum length accepted for label names") f.IntVar(&l.MaxLabelValueLength, "validation.max-length-label-value", 2048, "Maximum length accepted for label value. This setting also applies to the metric name") f.IntVar(&l.MaxLabelNamesPerSeries, "validation.max-label-names-per-series", 30, "Maximum number of label names per series.") @@ -1092,6 +1094,10 @@ func (o *Overrides) AlertmanagerMaxSilenceSizeBytes(userID string) int { return o.GetOverridesForUser(userID).AlertmanagerMaxSilencesSizeBytes } +func (o *Overrides) RW2EnableTypeAndUnitLabels(userID string) bool { + return o.GetOverridesForUser(userID).RW2EnableTypeAndUnitLabels +} + func (o *Overrides) DisabledRuleGroups(userID string) DisabledRuleGroups { if o.tenantLimits != nil { l := o.tenantLimits.ByUserID(userID) diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 8b748f0c8d3..5eaeae0f883 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -5375,6 +5375,12 @@ "description": "Enable to allow rules to be evaluated with data from a single zone, if other zones are not available.", "type": "boolean" }, + "rw_2_enable_type_and_unit_labels": { + "default": false, + "description": "EXPERIMENTAL: If true, the __type__ and __unit__ labels are added to metrics. This only applies to remote write v2 requests.", + "type": "boolean", + "x-cli-flag": "distributor.rw2-enable-type-and-unit-labels" + }, "s3_sse_kms_encryption_context": { "description": "S3 server-side encryption KMS encryption context. If unset and the key ID override is set, the encryption context will not be provided to S3. Ignored if the SSE type override is not set.", "type": "string" From 20f5752a902850651d68a4773c406eb3a63a8838 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Sun, 2 Nov 2025 08:26:25 +0900 Subject: [PATCH 2/5] Add -distributor.enable-type-and-unit-labels per tenant flag Signed-off-by: SungJin1212 --- CHANGELOG.md | 2 +- docs/configuration/config-file-reference.md | 11 +++-------- integration/remote_write_v2_test.go | 6 +++--- pkg/distributor/distributor.go | 8 +++----- pkg/util/push/otlp.go | 2 +- pkg/util/push/otlp_test.go | 4 ++-- pkg/util/push/push.go | 2 +- pkg/util/validation/limits.go | 8 ++++---- schemas/cortex-config-schema.json | 18 ++++++------------ 9 files changed, 24 insertions(+), 37 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 03c15d6a326..853db5dab50 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## master / unreleased * [FEATURE] StoreGateway: Introduces a new parquet mode. #7046 +* [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` which enables to add `__unit__` and `__type__` labels for remote write v2 and OTLP requests. #7077 * [ENHANCEMENT] Ingester: Add `enable_matcher_optimization` config to apply low selectivity matchers lazily. #7063 * [ENHANCEMENT] Distributor: Add a label references validation for remote write v2 request. #7074 * [ENHANCEMENT] Distributor: Add count, spans, and buckets validations for native histogram. #7072 @@ -36,7 +37,6 @@ * [FEATURE] Querier: Support for configuring query optimizers and enabling XFunctions in the Thanos engine. #6873 * [FEATURE] Query Frontend: Add support /api/v1/format_query API for formatting queries. #6893 * [FEATURE] Query Frontend: Add support for /api/v1/parse_query API (experimental) to parse a PromQL expression and return it as a JSON-formatted AST (abstract syntax tree). #6978 -* [FEATURE] Distributor: Add a per-tenant flag `-distributor.rw2-enable-type-and-unit-labels` which enables to add `__unit__` and `__type__` labels for remote write v2 requests. #7077 * [ENHANCEMENT] Upgrade the Prometheus version to 3.6.0 and add a `-name-validation-scheme` flag to support UTF-8. #7040 #7056 * [ENHANCEMENT] Distributor: Emit an error with a 400 status code when empty labels are found before the relabelling or label dropping process. #7052 * [ENHANCEMENT] Parquet Storage: Add support for additional sort columns during Parquet file generation #7003 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index fc7a7833a8d..20a55cf10fd 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3274,11 +3274,6 @@ otlp: # EXPERIMENTAL: If true, delta temporality otlp metrics to be ingested. # CLI flag: -distributor.otlp.allow-delta-temporality [allow_delta_temporality: | default = false] - - # EXPERIMENTAL: If true, the '__type__' and '__unit__' labels are added for - # the OTLP metrics. - # CLI flag: -distributor.otlp.enable-type-and-unit-labels - [enable_type_and_unit_labels: | default = false] ``` ### `etcd_config` @@ -3999,9 +3994,9 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s [promote_resource_attributes: | default = ] # EXPERIMENTAL: If true, the __type__ and __unit__ labels are added to metrics. -# This only applies to remote write v2 requests. -# CLI flag: -distributor.rw2-enable-type-and-unit-labels -[rw_2_enable_type_and_unit_labels: | default = false] +# This applies to remote write v2 and OTLP requests. +# CLI flag: -distributor.enable-type-and-unit-labels +[enable_type_and_unit_labels: | default = false] # The maximum number of active series per user, per ingester. 0 to disable. # CLI flag: -ingester.max-series-per-user diff --git a/integration/remote_write_v2_test.go b/integration/remote_write_v2_test.go index 64b68aa9601..9029d3e115b 100644 --- a/integration/remote_write_v2_test.go +++ b/integration/remote_write_v2_test.go @@ -234,9 +234,9 @@ func TestIngest_EnableTypeAndUnitLabels(t *testing.T) { "-ring.store": "consul", "-consul.hostname": consul.NetworkHTTPEndpoint(), // Distributor. - "-distributor.replication-factor": "1", - "-distributor.remote-writev2-enabled": "true", - "-distributor.rw2-enable-type-and-unit-labels": "true", + "-distributor.replication-factor": "1", + "-distributor.remote-writev2-enabled": "true", + "-distributor.enable-type-and-unit-labels": "true", // Store-gateway. "-store-gateway.sharding-enabled": "false", // alert manager diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 2065d0eea7b..b3f0d37c6f9 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -197,10 +197,9 @@ type InstanceLimits struct { } type OTLPConfig struct { - ConvertAllAttributes bool `yaml:"convert_all_attributes"` - DisableTargetInfo bool `yaml:"disable_target_info"` - AllowDeltaTemporality bool `yaml:"allow_delta_temporality"` - EnableTypeAndUnitLabels bool `yaml:"enable_type_and_unit_labels"` + ConvertAllAttributes bool `yaml:"convert_all_attributes"` + DisableTargetInfo bool `yaml:"disable_target_info"` + AllowDeltaTemporality bool `yaml:"allow_delta_temporality"` } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -229,7 +228,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.OTLPConfig.ConvertAllAttributes, "distributor.otlp.convert-all-attributes", false, "If true, all resource attributes are converted to labels.") f.BoolVar(&cfg.OTLPConfig.DisableTargetInfo, "distributor.otlp.disable-target-info", false, "If true, a target_info metric is not ingested. (refer to: https://github.com/prometheus/OpenMetrics/blob/main/specification/OpenMetrics.md#supporting-target-metadata-in-both-push-based-and-pull-based-systems)") f.BoolVar(&cfg.OTLPConfig.AllowDeltaTemporality, "distributor.otlp.allow-delta-temporality", false, "EXPERIMENTAL: If true, delta temporality otlp metrics to be ingested.") - f.BoolVar(&cfg.OTLPConfig.EnableTypeAndUnitLabels, "distributor.otlp.enable-type-and-unit-labels", false, "EXPERIMENTAL: If true, the '__type__' and '__unit__' labels are added for the OTLP metrics.") } // Validate config and returns error on failure diff --git a/pkg/util/push/otlp.go b/pkg/util/push/otlp.go index cdf1259d122..b7479359d4a 100644 --- a/pkg/util/push/otlp.go +++ b/pkg/util/push/otlp.go @@ -181,7 +181,7 @@ func convertToPromTS(ctx context.Context, pmetrics pmetric.Metrics, cfg distribu AddMetricSuffixes: true, DisableTargetInfo: cfg.DisableTargetInfo, AllowDeltaTemporality: cfg.AllowDeltaTemporality, - EnableTypeAndUnitLabels: cfg.EnableTypeAndUnitLabels, + EnableTypeAndUnitLabels: overrides.EnableTypeAndUnitLabels(userID), } var annots annotations.Annotations diff --git a/pkg/util/push/otlp_test.go b/pkg/util/push/otlp_test.go index 117f98e1782..67dc70f8b0e 100644 --- a/pkg/util/push/otlp_test.go +++ b/pkg/util/push/otlp_test.go @@ -80,8 +80,7 @@ func TestOTLP_EnableTypeAndUnitLabels(t *testing.T) { for _, test := range tests { t.Run(test.description, func(t *testing.T) { cfg := distributor.OTLPConfig{ - EnableTypeAndUnitLabels: test.enableTypeAndUnitLabels, - AllowDeltaTemporality: test.allowDeltaTemporality, + AllowDeltaTemporality: test.allowDeltaTemporality, } metrics := pmetric.NewMetrics() rm := metrics.ResourceMetrics().AppendEmpty() @@ -90,6 +89,7 @@ func TestOTLP_EnableTypeAndUnitLabels(t *testing.T) { test.otlpSeries.CopyTo(sm.Metrics().AppendEmpty()) limits := validation.Limits{} + limits.EnableTypeAndUnitLabels = test.enableTypeAndUnitLabels overrides := validation.NewOverrides(limits, nil) promSeries, metadata, err := convertToPromTS(ctx, metrics, cfg, overrides, "user-1", logger) require.NoError(t, err) diff --git a/pkg/util/push/push.go b/pkg/util/push/push.go index c9fbf36ab0a..39e4ceb54ff 100644 --- a/pkg/util/push/push.go +++ b/pkg/util/push/push.go @@ -99,7 +99,7 @@ func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, overrides *validation req.Source = cortexpb.API } - v1Req, err := convertV2RequestToV1(&req, overrides.RW2EnableTypeAndUnitLabels(userID)) + v1Req, err := convertV2RequestToV1(&req, overrides.EnableTypeAndUnitLabels(userID)) if err != nil { level.Error(logger).Log("err", err.Error()) http.Error(w, err.Error(), http.StatusBadRequest) diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 6fb069d7ed2..15129e96834 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -152,7 +152,7 @@ type Limits struct { MetricRelabelConfigs []*relabel.Config `yaml:"metric_relabel_configs,omitempty" json:"metric_relabel_configs,omitempty" doc:"nocli|description=List of metric relabel configurations. Note that in most situations, it is more effective to use metrics relabeling directly in the Prometheus server, e.g. remote_write.write_relabel_configs."` MaxNativeHistogramBuckets int `yaml:"max_native_histogram_buckets" json:"max_native_histogram_buckets"` PromoteResourceAttributes []string `yaml:"promote_resource_attributes" json:"promote_resource_attributes"` - RW2EnableTypeAndUnitLabels bool `yaml:"rw_2_enable_type_and_unit_labels" json:"rw_2_enable_type_and_unit_labels"` + EnableTypeAndUnitLabels bool `yaml:"enable_type_and_unit_labels" json:"enable_type_and_unit_labels"` // Ingester enforced limits. // Series @@ -265,7 +265,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.HAMaxClusters, "distributor.ha-tracker.max-clusters", 0, "Maximum number of clusters that HA tracker will keep track of for single user. 0 to disable the limit.") f.Var((*flagext.StringSliceCSV)(&l.PromoteResourceAttributes), "distributor.promote-resource-attributes", "Comma separated list of resource attributes that should be converted to labels.") f.Var(&l.DropLabels, "distributor.drop-label", "This flag can be used to specify label names that to drop during sample ingestion within the distributor and can be repeated in order to drop multiple labels.") - f.BoolVar(&l.RW2EnableTypeAndUnitLabels, "distributor.rw2-enable-type-and-unit-labels", false, "EXPERIMENTAL: If true, the __type__ and __unit__ labels are added to metrics. This only applies to remote write v2 requests.") + f.BoolVar(&l.EnableTypeAndUnitLabels, "distributor.enable-type-and-unit-labels", false, "EXPERIMENTAL: If true, the __type__ and __unit__ labels are added to metrics. This applies to remote write v2 and OTLP requests.") f.IntVar(&l.MaxLabelNameLength, "validation.max-length-label-name", 1024, "Maximum length accepted for label names") f.IntVar(&l.MaxLabelValueLength, "validation.max-length-label-value", 2048, "Maximum length accepted for label value. This setting also applies to the metric name") f.IntVar(&l.MaxLabelNamesPerSeries, "validation.max-label-names-per-series", 30, "Maximum number of label names per series.") @@ -1094,8 +1094,8 @@ func (o *Overrides) AlertmanagerMaxSilenceSizeBytes(userID string) int { return o.GetOverridesForUser(userID).AlertmanagerMaxSilencesSizeBytes } -func (o *Overrides) RW2EnableTypeAndUnitLabels(userID string) bool { - return o.GetOverridesForUser(userID).RW2EnableTypeAndUnitLabels +func (o *Overrides) EnableTypeAndUnitLabels(userID string) bool { + return o.GetOverridesForUser(userID).EnableTypeAndUnitLabels } func (o *Overrides) DisabledRuleGroups(userID string) DisabledRuleGroups { diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 5eaeae0f883..78dda6a83da 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -3822,12 +3822,6 @@ "description": "If true, a target_info metric is not ingested. (refer to: https://github.com/prometheus/OpenMetrics/blob/main/specification/OpenMetrics.md#supporting-target-metadata-in-both-push-based-and-pull-based-systems)", "type": "boolean", "x-cli-flag": "distributor.otlp.disable-target-info" - }, - "enable_type_and_unit_labels": { - "default": false, - "description": "EXPERIMENTAL: If true, the '__type__' and '__unit__' labels are added for the OTLP metrics.", - "type": "boolean", - "x-cli-flag": "distributor.otlp.enable-type-and-unit-labels" } }, "type": "object" @@ -4946,6 +4940,12 @@ "type": "boolean", "x-cli-flag": "blocks-storage.tsdb.enable-native-histograms" }, + "enable_type_and_unit_labels": { + "default": false, + "description": "EXPERIMENTAL: If true, the __type__ and __unit__ labels are added to metrics. This applies to remote write v2 and OTLP requests.", + "type": "boolean", + "x-cli-flag": "distributor.enable-type-and-unit-labels" + }, "enforce_metadata_metric_name": { "default": true, "description": "Enforce every metadata has a metric name.", @@ -5375,12 +5375,6 @@ "description": "Enable to allow rules to be evaluated with data from a single zone, if other zones are not available.", "type": "boolean" }, - "rw_2_enable_type_and_unit_labels": { - "default": false, - "description": "EXPERIMENTAL: If true, the __type__ and __unit__ labels are added to metrics. This only applies to remote write v2 requests.", - "type": "boolean", - "x-cli-flag": "distributor.rw2-enable-type-and-unit-labels" - }, "s3_sse_kms_encryption_context": { "description": "S3 server-side encryption KMS encryption context. If unset and the key ID override is set, the encryption context will not be provided to S3. Ignored if the SSE type override is not set.", "type": "string" From 1c1011038b9d7b16b653c1ecda015292c9a31653 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Sun, 2 Nov 2025 10:13:04 +0900 Subject: [PATCH 3/5] fix test Signed-off-by: SungJin1212 --- pkg/util/validation/exporter_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/util/validation/exporter_test.go b/pkg/util/validation/exporter_test.go index 0f387855be0..fc3fee57319 100644 --- a/pkg/util/validation/exporter_test.go +++ b/pkg/util/validation/exporter_test.go @@ -54,6 +54,7 @@ func TestOverridesExporter_withConfig(t *testing.T) { cortex_overrides{limit_name="compactor_tenant_shard_size",user="tenant-a"} 0 cortex_overrides{limit_name="creation_grace_period",user="tenant-a"} 600 cortex_overrides{limit_name="enable_native_histograms",user="tenant-a"} 0 + cortex_overrides{limit_name="enable_type_and_unit_labels",user="tenant-a"} 0 cortex_overrides{limit_name="enforce_metadata_metric_name",user="tenant-a"} 1 cortex_overrides{limit_name="enforce_metric_name",user="tenant-a"} 1 cortex_overrides{limit_name="ha_max_clusters",user="tenant-a"} 0 @@ -108,7 +109,6 @@ func TestOverridesExporter_withConfig(t *testing.T) { cortex_overrides{limit_name="ruler_query_offset",user="tenant-a"} 0 cortex_overrides{limit_name="ruler_tenant_shard_size",user="tenant-a"} 0 cortex_overrides{limit_name="rules_partial_data",user="tenant-a"} 0 - cortex_overrides{limit_name="rw_2_enable_type_and_unit_labels",user="tenant-a"} 0 cortex_overrides{limit_name="store_gateway_tenant_shard_size",user="tenant-a"} 0 `), "cortex_overrides")) } From 28db4d27b2d87ea5384ca1f36c455eff37046f8f Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Mon, 3 Nov 2025 21:24:24 +0900 Subject: [PATCH 4/5] Edit change log Signed-off-by: SungJin1212 --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 853db5dab50..3fd87ade43a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,7 @@ ## master / unreleased * [FEATURE] StoreGateway: Introduces a new parquet mode. #7046 -* [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` which enables to add `__unit__` and `__type__` labels for remote write v2 and OTLP requests. #7077 +* [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` which enables to add `__unit__` and `__type__` labels for remote write v2 and OTLP requests. The `-distributor.otlp.enable-type-and-unit-labels` flag has been consolidated into this flag. #7077 * [ENHANCEMENT] Ingester: Add `enable_matcher_optimization` config to apply low selectivity matchers lazily. #7063 * [ENHANCEMENT] Distributor: Add a label references validation for remote write v2 request. #7074 * [ENHANCEMENT] Distributor: Add count, spans, and buckets validations for native histogram. #7072 From 28e61e7e61ab2e55d48b634526c8b5f386ffb98b Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Mon, 3 Nov 2025 21:50:52 +0900 Subject: [PATCH 5/5] fix test Signed-off-by: SungJin1212 --- integration/otlp_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/otlp_test.go b/integration/otlp_test.go index fe83c1852fa..2963db8d34b 100644 --- a/integration/otlp_test.go +++ b/integration/otlp_test.go @@ -282,7 +282,7 @@ func TestOTLPEnableTypeAndUnitLabels(t *testing.T) { "-auth.enabled": "true", // OTLP - "-distributor.otlp.enable-type-and-unit-labels": "true", + "-distributor.enable-type-and-unit-labels": "true", // alert manager "-alertmanager.web.external-url": "http://localhost/alertmanager",