@@ -28,25 +28,255 @@ type querierTenantFederationConfig struct {
2828
2929func TestQuerierTenantFederation (t * testing.T ) {
3030 runQuerierTenantFederationTest (t , querierTenantFederationConfig {})
31+ runQuerierTenantFederationTest_UseRegexResolver (t , querierTenantFederationConfig {})
3132}
3233
3334func TestQuerierTenantFederationWithQueryScheduler (t * testing.T ) {
3435 runQuerierTenantFederationTest (t , querierTenantFederationConfig {
3536 querySchedulerEnabled : true ,
3637 })
38+ runQuerierTenantFederationTest_UseRegexResolver (t , querierTenantFederationConfig {
39+ querySchedulerEnabled : true ,
40+ })
3741}
3842
3943func TestQuerierTenantFederationWithShuffleSharding (t * testing.T ) {
4044 runQuerierTenantFederationTest (t , querierTenantFederationConfig {
4145 shuffleShardingEnabled : true ,
4246 })
47+ runQuerierTenantFederationTest_UseRegexResolver (t , querierTenantFederationConfig {
48+ shuffleShardingEnabled : true ,
49+ })
4350}
4451
4552func TestQuerierTenantFederationWithQuerySchedulerAndShuffleSharding (t * testing.T ) {
4653 runQuerierTenantFederationTest (t , querierTenantFederationConfig {
4754 querySchedulerEnabled : true ,
4855 shuffleShardingEnabled : true ,
4956 })
57+ runQuerierTenantFederationTest_UseRegexResolver (t , querierTenantFederationConfig {
58+ querySchedulerEnabled : true ,
59+ shuffleShardingEnabled : true ,
60+ })
61+ }
62+
63+ func TestRegexResolver_NewlyCreatedTenant (t * testing.T ) {
64+ const blockRangePeriod = 5 * time .Second
65+
66+ s , err := e2e .NewScenario (networkName )
67+ require .NoError (t , err )
68+ defer s .Close ()
69+
70+ consul := e2edb .NewConsulWithName ("consul" )
71+ require .NoError (t , s .StartAndWaitReady (consul ))
72+
73+ flags := mergeFlags (BlocksStorageFlags (), map [string ]string {
74+ "-querier.cache-results" : "true" ,
75+ "-querier.split-queries-by-interval" : "24h" ,
76+ "-querier.query-ingesters-within" : "12h" , // Required by the test on query /series out of ingesters time range
77+ "-tenant-federation.enabled" : "true" ,
78+ "-tenant-federation.regex-matcher-enabled" : "true" ,
79+
80+ // to upload block quickly
81+ "-blocks-storage.tsdb.block-ranges-period" : blockRangePeriod .String (),
82+ "-blocks-storage.tsdb.ship-interval" : "1s" ,
83+ "-blocks-storage.tsdb.retention-period" : ((blockRangePeriod * 2 ) - 1 ).String (),
84+
85+ // store gateway
86+ "-blocks-storage.bucket-store.sync-interval" : blockRangePeriod .String (),
87+ "-querier.max-fetched-series-per-query" : "1" ,
88+ })
89+
90+ minio := e2edb .NewMinio (9000 , flags ["-blocks-storage.s3.bucket-name" ])
91+ require .NoError (t , s .StartAndWaitReady (minio ))
92+
93+ // Start ingester and distributor.
94+ ingester := e2ecortex .NewIngester ("ingester" , e2ecortex .RingStoreConsul , consul .NetworkHTTPEndpoint (), flags , "" )
95+ distributor := e2ecortex .NewDistributor ("distributor" , e2ecortex .RingStoreConsul , consul .NetworkHTTPEndpoint (), flags , "" )
96+ require .NoError (t , s .StartAndWaitReady (ingester , distributor ))
97+
98+ // Wait until distributor have updated the ring.
99+ require .NoError (t , distributor .WaitSumMetrics (e2e .Equals (512 ), "cortex_ring_tokens_total" ))
100+
101+ // Start the query-frontend.
102+ queryFrontend := e2ecortex .NewQueryFrontend ("query-frontend" , flags , "" )
103+ require .NoError (t , s .Start (queryFrontend ))
104+
105+ // Start the querier
106+ querier := e2ecortex .NewQuerier ("querier" , e2ecortex .RingStoreConsul , consul .NetworkHTTPEndpoint (), mergeFlags (flags , map [string ]string {
107+ "-querier.frontend-address" : queryFrontend .NetworkGRPCEndpoint (),
108+ }), "" )
109+
110+ // Start queriers.
111+ require .NoError (t , s .StartAndWaitReady (querier ))
112+ require .NoError (t , s .WaitReady (queryFrontend ))
113+
114+ now := time .Now ()
115+ series , expectedVector := generateSeries ("series_1" , now )
116+
117+ c , err := e2ecortex .NewClient (distributor .HTTPEndpoint (), queryFrontend .HTTPEndpoint (), "" , "" , "user-1" )
118+ require .NoError (t , err )
119+
120+ res , err := c .Push (series )
121+ require .NoError (t , err )
122+ require .Equal (t , 200 , res .StatusCode )
123+
124+ result , err := c .Query ("series_1" , now )
125+ require .NoError (t , err )
126+ require .Equal (t , model .ValVector , result .Type ())
127+ require .Equal (t , expectedVector , result .(model.Vector ))
128+ }
129+
130+ func runQuerierTenantFederationTest_UseRegexResolver (t * testing.T , cfg querierTenantFederationConfig ) {
131+ const numUsers = 10
132+ const blockRangePeriod = 5 * time .Second
133+
134+ s , err := e2e .NewScenario (networkName )
135+ require .NoError (t , err )
136+ defer s .Close ()
137+
138+ memcached := e2ecache .NewMemcached ()
139+ consul := e2edb .NewConsul ()
140+ require .NoError (t , s .StartAndWaitReady (consul , memcached ))
141+
142+ flags := mergeFlags (BlocksStorageFlags (), map [string ]string {
143+ "-querier.cache-results" : "true" ,
144+ "-querier.split-queries-by-interval" : "24h" ,
145+ "-querier.query-ingesters-within" : "12h" , // Required by the test on query /series out of ingesters time range
146+ "-frontend.memcached.addresses" : "dns+" + memcached .NetworkEndpoint (e2ecache .MemcachedPort ),
147+ "-tenant-federation.enabled" : "true" ,
148+ "-tenant-federation.regex-matcher-enabled" : "true" ,
149+ "-tenant-federation.user-sync-interval" : "1s" ,
150+
151+ // to upload block quickly
152+ "-blocks-storage.tsdb.block-ranges-period" : blockRangePeriod .String (),
153+ "-blocks-storage.tsdb.ship-interval" : "1s" ,
154+ "-blocks-storage.tsdb.retention-period" : ((blockRangePeriod * 2 ) - 1 ).String (),
155+
156+ // store gateway
157+ "-blocks-storage.bucket-store.sync-interval" : blockRangePeriod .String (),
158+ "-querier.max-fetched-series-per-query" : "1" ,
159+ })
160+
161+ // Start the query-scheduler if enabled.
162+ var queryScheduler * e2ecortex.CortexService
163+ if cfg .querySchedulerEnabled {
164+ queryScheduler = e2ecortex .NewQueryScheduler ("query-scheduler" , flags , "" )
165+ require .NoError (t , s .StartAndWaitReady (queryScheduler ))
166+ flags ["-frontend.scheduler-address" ] = queryScheduler .NetworkGRPCEndpoint ()
167+ flags ["-querier.scheduler-address" ] = queryScheduler .NetworkGRPCEndpoint ()
168+ }
169+
170+ if cfg .shuffleShardingEnabled {
171+ // Use only single querier for each user.
172+ flags ["-frontend.max-queriers-per-tenant" ] = "1"
173+ }
174+
175+ minio := e2edb .NewMinio (9000 , flags ["-blocks-storage.s3.bucket-name" ])
176+ require .NoError (t , s .StartAndWaitReady (minio ))
177+
178+ // Start ingester and distributor.
179+ ingester := e2ecortex .NewIngester ("ingester" , e2ecortex .RingStoreConsul , consul .NetworkHTTPEndpoint (), flags , "" )
180+ distributor := e2ecortex .NewDistributor ("distributor" , e2ecortex .RingStoreConsul , consul .NetworkHTTPEndpoint (), flags , "" )
181+ require .NoError (t , s .StartAndWaitReady (ingester , distributor ))
182+
183+ // Wait until distributor have updated the ring.
184+ require .NoError (t , distributor .WaitSumMetrics (e2e .Equals (512 ), "cortex_ring_tokens_total" ))
185+
186+ // Push a series for each user to Cortex.
187+ now := time .Now ()
188+ expectedVectors := make ([]model.Vector , numUsers )
189+ tenantIDs := make ([]string , numUsers )
190+
191+ for u := 0 ; u < numUsers ; u ++ {
192+ tenantIDs [u ] = fmt .Sprintf ("user-%d" , u )
193+ c , err := e2ecortex .NewClient (distributor .HTTPEndpoint (), "" , "" , "" , tenantIDs [u ])
194+ require .NoError (t , err )
195+
196+ var series []prompb.TimeSeries
197+ series , expectedVectors [u ] = generateSeries ("series_1" , now )
198+ // To ship series_1 block
199+ series2 , _ := generateSeries ("series_2" , now .Add (blockRangePeriod * 2 ))
200+
201+ res , err := c .Push (series )
202+ require .NoError (t , err )
203+ require .Equal (t , 200 , res .StatusCode )
204+
205+ res , err = c .Push (series2 )
206+ require .NoError (t , err )
207+ require .Equal (t , 200 , res .StatusCode )
208+ }
209+
210+ // Start the query-frontend.
211+ queryFrontend := e2ecortex .NewQueryFrontend ("query-frontend" , flags , "" )
212+ require .NoError (t , s .Start (queryFrontend ))
213+
214+ if ! cfg .querySchedulerEnabled {
215+ flags ["-querier.frontend-address" ] = queryFrontend .NetworkGRPCEndpoint ()
216+ }
217+
218+ // Start the querier and store-gateway
219+ storeGateway := e2ecortex .NewStoreGateway ("store-gateway" , e2ecortex .RingStoreConsul , consul .NetworkHTTPEndpoint (), flags , "" )
220+ querier := e2ecortex .NewQuerier ("querier" , e2ecortex .RingStoreConsul , consul .NetworkHTTPEndpoint (), flags , "" )
221+
222+ var querier2 * e2ecortex.CortexService
223+ if cfg .shuffleShardingEnabled {
224+ querier2 = e2ecortex .NewQuerier ("querier-2" , e2ecortex .RingStoreConsul , consul .NetworkHTTPEndpoint (), flags , "" )
225+ }
226+
227+ // Start queriers.
228+ require .NoError (t , s .StartAndWaitReady (querier , storeGateway ))
229+ require .NoError (t , s .WaitReady (queryFrontend ))
230+ if cfg .shuffleShardingEnabled {
231+ require .NoError (t , s .StartAndWaitReady (querier2 ))
232+ }
233+
234+ // Wait until the querier and store-gateway have updated ring
235+ require .NoError (t , storeGateway .WaitSumMetrics (e2e .Equals (512 ), "cortex_ring_tokens_total" ))
236+ require .NoError (t , querier .WaitSumMetrics (e2e .Equals (512 * 2 ), "cortex_ring_tokens_total" ))
237+ if cfg .shuffleShardingEnabled {
238+ require .NoError (t , querier2 .WaitSumMetrics (e2e .Equals (512 * 2 ), "cortex_ring_tokens_total" ))
239+ }
240+
241+ // wait to upload blocks
242+ require .NoError (t , ingester .WaitSumMetricsWithOptions (e2e .Greater (0 ), []string {"cortex_ingester_shipper_uploads_total" }, e2e .WaitMissingMetrics ))
243+
244+ // wait to update knownUsers
245+ require .NoError (t , querier .WaitSumMetricsWithOptions (e2e .Greater (0 ), []string {"cortex_regex_resolver_last_update_run_timestamp_seconds" }), e2e .WaitMissingMetrics )
246+ if cfg .shuffleShardingEnabled {
247+ require .NoError (t , querier2 .WaitSumMetricsWithOptions (e2e .Greater (0 ), []string {"cortex_regex_resolver_last_update_run_timestamp_seconds" }), e2e .WaitMissingMetrics )
248+ }
249+
250+ // query all tenants
251+ c , err := e2ecortex .NewClient (distributor .HTTPEndpoint (), queryFrontend .HTTPEndpoint (), "" , "" , "user-.+" )
252+ require .NoError (t , err )
253+
254+ result , err := c .Query ("series_1" , now )
255+ require .NoError (t , err )
256+
257+ assert .Equal (t , mergeResults (tenantIDs , expectedVectors ), result .(model.Vector ))
258+
259+ // ensure a push to multiple tenants is failing
260+ series , _ := generateSeries ("series_1" , now )
261+ res , err := c .Push (series )
262+ require .NoError (t , err )
263+
264+ require .Equal (t , 500 , res .StatusCode )
265+
266+ // check metric label values for total queries in the query frontend
267+ require .NoError (t , queryFrontend .WaitSumMetricsWithOptions (e2e .Equals (1 ), []string {"cortex_query_frontend_queries_total" }, e2e .WithLabelMatchers (
268+ labels .MustNewMatcher (labels .MatchEqual , "user" , "user-.+" ),
269+ labels .MustNewMatcher (labels .MatchEqual , "op" , "query" ))))
270+
271+ // check metric label values for query queue length in either query frontend or query scheduler
272+ queueComponent := queryFrontend
273+ queueMetricName := "cortex_query_frontend_queue_length"
274+ if cfg .querySchedulerEnabled {
275+ queueComponent = queryScheduler
276+ queueMetricName = "cortex_query_scheduler_queue_length"
277+ }
278+ require .NoError (t , queueComponent .WaitSumMetricsWithOptions (e2e .Equals (0 ), []string {queueMetricName }, e2e .WithLabelMatchers (
279+ labels .MustNewMatcher (labels .MatchEqual , "user" , "user-.+" ))))
50280}
51281
52282func runQuerierTenantFederationTest (t * testing.T , cfg querierTenantFederationConfig ) {
0 commit comments