@@ -95,7 +95,8 @@ type Distributor struct {
9595 HATracker * ha.HATracker
9696
9797 // Per-user rate limiter.
98- ingestionRateLimiter * limiter.RateLimiter
98+ ingestionRateLimiter * limiter.RateLimiter
99+ nativeHistogramIngestionRateLimiter * limiter.RateLimiter
99100
100101 // Manager for subservices (HA Tracker, distributor ring and client pool)
101102 subservices * services.Manager
@@ -267,11 +268,13 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
267268 // it's an internal dependency and can't join the distributors ring, we skip rate
268269 // limiting.
269270 var ingestionRateStrategy limiter.RateLimiterStrategy
271+ var nativeHistogramIngestionRateStrategy limiter.RateLimiterStrategy
270272 var distributorsLifeCycler * ring.Lifecycler
271273 var distributorsRing * ring.Ring
272274
273275 if ! canJoinDistributorsRing {
274276 ingestionRateStrategy = newInfiniteIngestionRateStrategy ()
277+ nativeHistogramIngestionRateStrategy = newInfiniteIngestionRateStrategy ()
275278 } else if limits .IngestionRateStrategy () == validation .GlobalIngestionRateStrategy {
276279 distributorsLifeCycler , err = ring .NewLifecycler (cfg .DistributorRing .ToLifecyclerConfig (), nil , "distributor" , ringKey , true , true , log , prometheus .WrapRegistererWithPrefix ("cortex_" , reg ))
277280 if err != nil {
@@ -285,21 +288,24 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
285288 subservices = append (subservices , distributorsLifeCycler , distributorsRing )
286289
287290 ingestionRateStrategy = newGlobalIngestionRateStrategy (limits , distributorsLifeCycler )
291+ nativeHistogramIngestionRateStrategy = newGlobalNativeHistogramIngestionRateStrategy (limits , distributorsLifeCycler )
288292 } else {
289293 ingestionRateStrategy = newLocalIngestionRateStrategy (limits )
294+ nativeHistogramIngestionRateStrategy = newLocalNativeHistogramIngestionRateStrategy (limits )
290295 }
291296
292297 d := & Distributor {
293- cfg : cfg ,
294- log : log ,
295- ingestersRing : ingestersRing ,
296- ingesterPool : NewPool (cfg .PoolConfig , ingestersRing , cfg .IngesterClientFactory , log ),
297- distributorsLifeCycler : distributorsLifeCycler ,
298- distributorsRing : distributorsRing ,
299- limits : limits ,
300- ingestionRateLimiter : limiter .NewRateLimiter (ingestionRateStrategy , 10 * time .Second ),
301- HATracker : haTracker ,
302- ingestionRate : util_math .NewEWMARate (0.2 , instanceIngestionRateTickInterval ),
298+ cfg : cfg ,
299+ log : log ,
300+ ingestersRing : ingestersRing ,
301+ ingesterPool : NewPool (cfg .PoolConfig , ingestersRing , cfg .IngesterClientFactory , log ),
302+ distributorsLifeCycler : distributorsLifeCycler ,
303+ distributorsRing : distributorsRing ,
304+ limits : limits ,
305+ ingestionRateLimiter : limiter .NewRateLimiter (ingestionRateStrategy , 10 * time .Second ),
306+ nativeHistogramIngestionRateLimiter : limiter .NewRateLimiter (nativeHistogramIngestionRateStrategy , 10 * time .Second ),
307+ HATracker : haTracker ,
308+ ingestionRate : util_math .NewEWMARate (0.2 , instanceIngestionRateTickInterval ),
303309
304310 queryDuration : instrument .NewHistogramCollector (promauto .With (reg ).NewHistogramVec (prometheus.HistogramOpts {
305311 Namespace : "cortex" ,
@@ -754,7 +760,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
754760 }
755761
756762 // A WriteRequest can only contain series or metadata but not both. This might change in the future.
757- seriesKeys , validatedTimeseries , validatedFloatSamples , validatedHistogramSamples , validatedExemplars , firstPartialErr , err := d .prepareSeriesKeys (ctx , req , userID , limits , removeReplica )
763+ seriesKeys , nhSeriesKeys , validatedTimeseries , nhValidatedTimeseries , validatedFloatSamples , validatedHistogramSamples , validatedExemplars , firstPartialErr , err := d .prepareSeriesKeys (ctx , req , userID , limits , removeReplica )
758764 if err != nil {
759765 return nil , err
760766 }
@@ -765,6 +771,15 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
765771 d .receivedExemplars .WithLabelValues (userID ).Add (float64 (validatedExemplars ))
766772 d .receivedMetadata .WithLabelValues (userID ).Add (float64 (len (validatedMetadata )))
767773
774+ if ! d .nativeHistogramIngestionRateLimiter .AllowN (now , userID , validatedHistogramSamples ) {
775+ level .Warn (d .log ).Log ("msg" , "native histogram ingestion rate limit (%v) exceeded while adding %d native histogram samples" , d .nativeHistogramIngestionRateLimiter .Limit (now , userID ), validatedHistogramSamples )
776+ d .validateMetrics .DiscardedSamples .WithLabelValues (validation .NativeHistogramRateLimited , userID ).Add (float64 (validatedHistogramSamples ))
777+ validatedHistogramSamples = 0
778+ } else {
779+ seriesKeys = append (seriesKeys , nhSeriesKeys ... )
780+ validatedTimeseries = append (validatedTimeseries , nhValidatedTimeseries ... )
781+ }
782+
768783 if len (seriesKeys ) == 0 && len (metadataKeys ) == 0 {
769784 // Ensure the request slice is reused if there's no series or metadata passing the validation.
770785 cortexpb .ReuseSlice (req .Timeseries )
@@ -936,14 +951,16 @@ type samplesLabelSetEntry struct {
936951 labels labels.Labels
937952}
938953
939- func (d * Distributor ) prepareSeriesKeys (ctx context.Context , req * cortexpb.WriteRequest , userID string , limits * validation.Limits , removeReplica bool ) ([]uint32 , []cortexpb.PreallocTimeseries , int , int , int , error , error ) {
954+ func (d * Distributor ) prepareSeriesKeys (ctx context.Context , req * cortexpb.WriteRequest , userID string , limits * validation.Limits , removeReplica bool ) ([]uint32 , []uint32 , []cortexpb. PreallocTimeseries , [] cortexpb.PreallocTimeseries , int , int , int , error , error ) {
940955 pSpan , _ := opentracing .StartSpanFromContext (ctx , "prepareSeriesKeys" )
941956 defer pSpan .Finish ()
942957
943958 // For each timeseries or samples, we compute a hash to distribute across ingesters;
944- // check each sample/metadata and discard if outside limits.
959+ // check each sample/metadata and discard if outside limits.
945960 validatedTimeseries := make ([]cortexpb.PreallocTimeseries , 0 , len (req .Timeseries ))
961+ nhValidatedTimeseries := make ([]cortexpb.PreallocTimeseries , 0 , len (req .Timeseries ))
946962 seriesKeys := make ([]uint32 , 0 , len (req .Timeseries ))
963+ nhSeriesKeys := make ([]uint32 , 0 , len (req .Timeseries ))
947964 validatedFloatSamples := 0
948965 validatedHistogramSamples := 0
949966 validatedExemplars := 0
@@ -1051,7 +1068,7 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
10511068 // label and dropped labels (if any)
10521069 key , err := d .tokenForLabels (userID , ts .Labels )
10531070 if err != nil {
1054- return nil , nil , 0 , 0 , 0 , nil , err
1071+ return nil , nil , nil , nil , 0 , 0 , 0 , nil , err
10551072 }
10561073 validatedSeries , validationErr := d .validateSeries (ts , userID , skipLabelNameValidation , limits )
10571074
@@ -1086,8 +1103,13 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
10861103 }
10871104 }
10881105
1089- seriesKeys = append (seriesKeys , key )
1090- validatedTimeseries = append (validatedTimeseries , validatedSeries )
1106+ if len (ts .Histograms ) > 0 {
1107+ nhSeriesKeys = append (nhSeriesKeys , key )
1108+ nhValidatedTimeseries = append (nhValidatedTimeseries , validatedSeries )
1109+ } else {
1110+ seriesKeys = append (seriesKeys , key )
1111+ validatedTimeseries = append (validatedTimeseries , validatedSeries )
1112+ }
10911113 validatedFloatSamples += len (ts .Samples )
10921114 validatedHistogramSamples += len (ts .Histograms )
10931115 validatedExemplars += len (ts .Exemplars )
@@ -1103,7 +1125,7 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
11031125 }
11041126 }
11051127
1106- return seriesKeys , validatedTimeseries , validatedFloatSamples , validatedHistogramSamples , validatedExemplars , firstPartialErr , nil
1128+ return seriesKeys , nhSeriesKeys , validatedTimeseries , nhValidatedTimeseries , validatedFloatSamples , validatedHistogramSamples , validatedExemplars , firstPartialErr , nil
11071129}
11081130
11091131func sortLabelsIfNeeded (labels []cortexpb.LabelAdapter ) {
0 commit comments