@@ -15,6 +15,10 @@ import (
1515 "testing"
1616 "time"
1717
18+ "go.uber.org/atomic"
19+
20+ "google.golang.org/grpc"
21+
1822 "github.com/go-kit/kit/log"
1923 "github.com/go-kit/kit/log/level"
2024 "github.com/gorilla/mux"
@@ -135,7 +139,45 @@ func newManager(t *testing.T, cfg Config) (*DefaultMultiTenantManager, func()) {
135139 return manager , cleanup
136140}
137141
138- func newRuler (t * testing.T , cfg Config ) (* Ruler , func ()) {
142+ type mockRulerClientsPool struct {
143+ ClientsPool
144+ cfg Config
145+ rulerAddrMap map [string ]* Ruler
146+ numberOfCalls atomic.Int32
147+ }
148+
149+ type mockRulerClient struct {
150+ ruler * Ruler
151+ numberOfCalls * atomic.Int32
152+ }
153+
154+ func (c * mockRulerClient ) Rules (ctx context.Context , in * RulesRequest , _ ... grpc.CallOption ) (* RulesResponse , error ) {
155+ c .numberOfCalls .Inc ()
156+ return c .ruler .Rules (ctx , in )
157+ }
158+
159+ func (p * mockRulerClientsPool ) GetClientFor (addr string ) (RulerClient , error ) {
160+ for _ , r := range p .rulerAddrMap {
161+ if r .lifecycler .GetInstanceAddr () == addr {
162+ return & mockRulerClient {
163+ ruler : r ,
164+ numberOfCalls : & p .numberOfCalls ,
165+ }, nil
166+ }
167+ }
168+
169+ return nil , fmt .Errorf ("unable to find ruler for add %s" , addr )
170+ }
171+
172+ func newMockClientsPool (cfg Config , logger log.Logger , reg prometheus.Registerer , rulerAddrMap map [string ]* Ruler ) * mockRulerClientsPool {
173+ return & mockRulerClientsPool {
174+ ClientsPool : newRulerClientPool (cfg .ClientTLSConfig , logger , reg ),
175+ cfg : cfg ,
176+ rulerAddrMap : rulerAddrMap ,
177+ }
178+ }
179+
180+ func buildRuler (t * testing.T , cfg Config , rulerAddrMap map [string ]* Ruler ) (* Ruler , func ()) {
139181 engine , noopQueryable , pusher , logger , overrides , cleanup := testSetup (t , cfg )
140182 storage , err := NewLegacyRuleStore (cfg .StoreConfig , promRules.FileLoader {}, log .NewNopLogger ())
141183 require .NoError (t , err )
@@ -145,21 +187,21 @@ func newRuler(t *testing.T, cfg Config) (*Ruler, func()) {
145187 manager , err := NewDefaultMultiTenantManager (cfg , managerFactory , reg , log .NewNopLogger ())
146188 require .NoError (t , err )
147189
148- ruler , err := NewRuler (
190+ ruler , err := newRuler (
149191 cfg ,
150192 manager ,
151193 reg ,
152194 logger ,
153195 storage ,
154196 overrides ,
197+ newMockClientsPool (cfg , logger , reg , rulerAddrMap ),
155198 )
156199 require .NoError (t , err )
157-
158200 return ruler , cleanup
159201}
160202
161203func newTestRuler (t * testing.T , cfg Config ) (* Ruler , func ()) {
162- ruler , cleanup := newRuler (t , cfg )
204+ ruler , cleanup := buildRuler (t , cfg , nil )
163205 require .NoError (t , services .StartAndAwaitRunning (context .Background (), ruler ))
164206
165207 // Ensure all rules are loaded before usage
@@ -252,6 +294,171 @@ func compareRuleGroupDescToStateDesc(t *testing.T, expected *rulespb.RuleGroupDe
252294 }
253295}
254296
297+ func TestGetRules (t * testing.T ) {
298+ // ruler ID -> (user ID -> list of groups).
299+ type expectedRulesMap map [string ]map [string ]rulespb.RuleGroupList
300+
301+ type testCase struct {
302+ sharding bool
303+ shardingStrategy string
304+ shuffleShardSize int
305+ }
306+
307+ expectedRules := expectedRulesMap {
308+ "ruler1" : map [string ]rulespb.RuleGroupList {
309+ "user1" : {
310+ & rulespb.RuleGroupDesc {User : "user1" , Namespace : "namespace" , Name : "first" , Interval : 10 * time .Second },
311+ & rulespb.RuleGroupDesc {User : "user1" , Namespace : "namespace" , Name : "second" , Interval : 10 * time .Second },
312+ },
313+ "user2" : {
314+ & rulespb.RuleGroupDesc {User : "user2" , Namespace : "namespace" , Name : "third" , Interval : 10 * time .Second },
315+ },
316+ },
317+ "ruler2" : map [string ]rulespb.RuleGroupList {
318+ "user1" : {
319+ & rulespb.RuleGroupDesc {User : "user1" , Namespace : "namespace" , Name : "third" , Interval : 10 * time .Second },
320+ },
321+ "user2" : {
322+ & rulespb.RuleGroupDesc {User : "user2" , Namespace : "namespace" , Name : "first" , Interval : 10 * time .Second },
323+ & rulespb.RuleGroupDesc {User : "user2" , Namespace : "namespace" , Name : "second" , Interval : 10 * time .Second },
324+ },
325+ },
326+ "ruler3" : map [string ]rulespb.RuleGroupList {
327+ "user3" : {
328+ & rulespb.RuleGroupDesc {User : "user3" , Namespace : "namespace" , Name : "third" , Interval : 10 * time .Second },
329+ },
330+ "user2" : {
331+ & rulespb.RuleGroupDesc {User : "user2" , Namespace : "namespace" , Name : "forth" , Interval : 10 * time .Second },
332+ & rulespb.RuleGroupDesc {User : "user2" , Namespace : "namespace" , Name : "fifty" , Interval : 10 * time .Second },
333+ },
334+ },
335+ }
336+
337+ testCases := map [string ]testCase {
338+ "No Sharding" : {
339+ sharding : false ,
340+ },
341+ "Default Sharding" : {
342+ sharding : true ,
343+ shardingStrategy : util .ShardingStrategyDefault ,
344+ },
345+ "Shuffle Sharding and ShardSize = 2" : {
346+ sharding : true ,
347+ shuffleShardSize : 2 ,
348+ shardingStrategy : util .ShardingStrategyShuffle ,
349+ },
350+ }
351+
352+ for name , tc := range testCases {
353+ t .Run (name , func (t * testing.T ) {
354+ kvStore , cleanUp := consul .NewInMemoryClient (ring .GetCodec (), log .NewNopLogger ())
355+ t .Cleanup (func () { assert .NoError (t , cleanUp .Close ()) })
356+ allRulesByUser := map [string ]rulespb.RuleGroupList {}
357+ allRulesByRuler := map [string ]rulespb.RuleGroupList {}
358+ allTokensByRuler := map [string ][]uint32 {}
359+ rulerAddrMap := map [string ]* Ruler {}
360+
361+ createRuler := func (id string ) * Ruler {
362+ cfg , cleanUp := defaultRulerConfig (t , newMockRuleStore (allRulesByUser ))
363+ t .Cleanup (cleanUp )
364+
365+ cfg .ShardingStrategy = tc .shardingStrategy
366+ cfg .EnableSharding = tc .sharding
367+
368+ cfg .Ring = RingConfig {
369+ InstanceID : id ,
370+ InstanceAddr : id ,
371+ KVStore : kv.Config {
372+ Mock : kvStore ,
373+ },
374+ }
375+
376+ r , cleanUp := buildRuler (t , cfg , rulerAddrMap )
377+ r .limits = ruleLimits {evalDelay : 0 , tenantShard : tc .shuffleShardSize }
378+ t .Cleanup (cleanUp )
379+ rulerAddrMap [id ] = r
380+ if r .ring != nil {
381+ require .NoError (t , services .StartAndAwaitRunning (context .Background (), r .ring ))
382+ t .Cleanup (r .ring .StopAsync )
383+ }
384+ return r
385+ }
386+
387+ for rID , r := range expectedRules {
388+ createRuler (rID )
389+ for user , rules := range r {
390+ allRulesByUser [user ] = append (allRulesByUser [user ], rules ... )
391+ allRulesByRuler [rID ] = append (allRulesByRuler [rID ], rules ... )
392+ allTokensByRuler [rID ] = generateTokenForGroups (rules , 1 )
393+ }
394+ }
395+
396+ if tc .sharding {
397+ err := kvStore .CAS (context .Background (), ring .RulerRingKey , func (in interface {}) (out interface {}, retry bool , err error ) {
398+ d , _ := in .(* ring.Desc )
399+ if d == nil {
400+ d = ring .NewDesc ()
401+ }
402+ for rID , tokens := range allTokensByRuler {
403+ d .AddIngester (rID , rulerAddrMap [rID ].lifecycler .GetInstanceAddr (), "" , tokens , ring .ACTIVE , time .Now ())
404+ }
405+ return d , true , nil
406+ })
407+ require .NoError (t , err )
408+ // Wait a bit to make sure ruler's ring is updated.
409+ time .Sleep (100 * time .Millisecond )
410+ }
411+
412+ forEachRuler := func (f func (rID string , r * Ruler )) {
413+ for rID , r := range rulerAddrMap {
414+ f (rID , r )
415+ }
416+ }
417+
418+ // Sync Rules
419+ forEachRuler (func (_ string , r * Ruler ) {
420+ r .syncRules (context .Background (), rulerSyncReasonInitial )
421+ })
422+
423+ for u := range allRulesByUser {
424+ ctx := user .InjectOrgID (context .Background (), u )
425+ forEachRuler (func (_ string , r * Ruler ) {
426+ rules , err := r .GetRules (ctx )
427+ require .NoError (t , err )
428+ require .Equal (t , len (allRulesByUser [u ]), len (rules ))
429+ if tc .sharding {
430+ mockPoolLClient := r .clientsPool .(* mockRulerClientsPool )
431+
432+ // Right now we are calling all rules even with shuffle sharding
433+ require .Equal (t , int32 (len (rulerAddrMap )), mockPoolLClient .numberOfCalls .Load ())
434+ mockPoolLClient .numberOfCalls .Store (0 )
435+ }
436+ })
437+ }
438+
439+ totalLoadedRules := 0
440+ totalConfiguredRules := 0
441+
442+ forEachRuler (func (rID string , r * Ruler ) {
443+ localRules , err := r .listRules (context .Background ())
444+ require .NoError (t , err )
445+ for _ , rules := range localRules {
446+ totalLoadedRules += len (rules )
447+ }
448+ totalConfiguredRules += len (allRulesByRuler [rID ])
449+ })
450+
451+ if tc .sharding {
452+ require .Equal (t , totalConfiguredRules , totalLoadedRules )
453+ } else {
454+ // Not sharding means that all rules will be loaded on all rulers
455+ numberOfRulers := len (rulerAddrMap )
456+ require .Equal (t , totalConfiguredRules * numberOfRulers , totalLoadedRules )
457+ }
458+ })
459+ }
460+ }
461+
255462func TestSharding (t * testing.T ) {
256463 const (
257464 user1 = "user1"
@@ -666,7 +873,7 @@ func TestSharding(t *testing.T) {
666873 DisabledTenants : tc .disabledUsers ,
667874 }
668875
669- r , cleanup := newRuler (t , cfg )
876+ r , cleanup := buildRuler (t , cfg , nil )
670877 r .limits = ruleLimits {evalDelay : 0 , tenantShard : tc .shuffleShardSize }
671878 t .Cleanup (cleanup )
672879
@@ -814,6 +1021,16 @@ func TestDeleteTenantRuleGroups(t *testing.T) {
8141021 }
8151022}
8161023
1024+ func generateTokenForGroups (groups []* rulespb.RuleGroupDesc , offset uint32 ) []uint32 {
1025+ var tokens []uint32
1026+
1027+ for _ , g := range groups {
1028+ tokens = append (tokens , tokenForGroup (g )+ offset )
1029+ }
1030+
1031+ return tokens
1032+ }
1033+
8171034func callDeleteTenantAPI (t * testing.T , api * Ruler , userID string ) {
8181035 ctx := user .InjectOrgID (context .Background (), userID )
8191036
0 commit comments