@@ -13,6 +13,7 @@ import (
1313 "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/gossip"
1414 "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/history"
1515 "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/metrics"
16+ "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/mmaintegration"
1617 "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/op"
1718 "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/queue"
1819 "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/scheduled"
@@ -29,8 +30,8 @@ type Simulator struct {
2930 curr time.Time
3031 end time.Time
3132 // interval is the step between ticks for active simulaton components, such
32- // as the queues, store rebalancer and state changers. It should be set
33- // lower than the bgInterval, as updated occur more frequently.
33+ // as the queues, store rebalancer and state changers. It should be set lower
34+ // than the bgInterval, as updates occur more frequently.
3435 interval time.Duration
3536
3637 // The simulator can run multiple workload Generators in parallel.
@@ -47,6 +48,8 @@ type Simulator struct {
4748 sqs map [state.StoreID ]queue.RangeQueue
4849 // Store rebalancers.
4950 srs map [state.StoreID ]storerebalancer.StoreRebalancer
51+ // Multi-metric store rebalancers.
52+ mmSRs map [state.StoreID ]* mmaintegration.MMAStoreRebalancer
5053 // Store operation controllers.
5154 controllers map [state.StoreID ]op.Controller
5255
@@ -87,6 +90,7 @@ func NewSimulator(
8790 lqs := make (map [state.StoreID ]queue.RangeQueue )
8891 sqs := make (map [state.StoreID ]queue.RangeQueue )
8992 srs := make (map [state.StoreID ]storerebalancer.StoreRebalancer )
93+ mmSRs := make (map [state.StoreID ]* mmaintegration.MMAStoreRebalancer )
9094 changer := state .NewReplicaChanger ()
9195 controllers := make (map [state.StoreID ]op.Controller )
9296
@@ -103,6 +107,7 @@ func NewSimulator(
103107 sqs : sqs ,
104108 controllers : controllers ,
105109 srs : srs ,
110+ mmSRs : mmSRs ,
106111 pacers : pacers ,
107112 gossip : gossip .NewGossip (initialState , settings ),
108113 metrics : m ,
@@ -133,19 +138,24 @@ func (s *Simulator) StoreAddNotify(storeID state.StoreID, _ state.State) {
133138func (s * Simulator ) addStore (storeID state.StoreID , tick time.Time ) {
134139 allocator := s .state .Allocator (storeID )
135140 storePool := s .state .StorePool (storeID )
141+ store , _ := s .state .Store (storeID )
136142 s .rqs [storeID ] = queue .NewReplicateQueue (
137143 storeID ,
144+ store .NodeID (),
138145 s .changer ,
139146 s .settings ,
140147 allocator ,
148+ s .state .Node (store .NodeID ()).AllocatorSync (),
141149 storePool ,
142150 tick ,
143151 )
144152 s .lqs [storeID ] = queue .NewLeaseQueue (
145153 storeID ,
154+ store .NodeID (),
146155 s .changer ,
147156 s .settings ,
148157 allocator ,
158+ s .state .Node (store .NodeID ()).AllocatorSync (),
149159 storePool ,
150160 tick ,
151161 )
@@ -175,6 +185,25 @@ func (s *Simulator) addStore(storeID state.StoreID, tick time.Time) {
175185 s .settings ,
176186 storerebalancer .GetStateRaftStatusFn (s .state ),
177187 )
188+ // TODO: We add the store to every node's allocator in the cluster
189+ // immediately. This is also updated via gossip, however there is a delay
190+ // after startup. When calling `mma.ProcessStoreLeaseholderMsg` via
191+ // tickMMStoreRebalancers, there may be not be a store state for some
192+ // replicas. Setting it here ensures that the store is always present and
193+ // initiated in each node's allocator. We should instead handle this in mma,
194+ // or integration component.
195+ for _ , node := range s .state .Nodes () {
196+ node .MMAllocator ().SetStore (state .StoreAttrAndLocFromDesc (
197+ s .state .StoreDescriptors (false , storeID )[0 ]))
198+ }
199+ s .mmSRs [storeID ] = mmaintegration .NewMMAStoreRebalancer (
200+ storeID ,
201+ store .NodeID (),
202+ s .state .Node (store .NodeID ()).MMAllocator (),
203+ s .state .Node (store .NodeID ()).AllocatorSync (),
204+ s .controllers [storeID ],
205+ s .settings ,
206+ )
178207}
179208
180209// GetNextTickTime returns a simulated tick time, or an indication that the
@@ -242,6 +271,9 @@ func (s *Simulator) RunSim(ctx context.Context) {
242271 // Simulate the store rebalancer logic.
243272 s .tickStoreRebalancers (ctx , tick , stateForAlloc )
244273
274+ // Simulate the multi-metric store rebalancer logic.
275+ s .tickMMStoreRebalancers (ctx , tick , stateForAlloc )
276+
245277 // Print tick metrics.
246278 s .tickMetrics (ctx , tick )
247279 }
@@ -342,6 +374,16 @@ func (s *Simulator) tickStoreRebalancers(ctx context.Context, tick time.Time, st
342374 }
343375}
344376
377+ // tickStoreRebalancers iterates over the multi-metric store rebalancers in the
378+ // cluster and ticks their control loop.
379+ func (s * Simulator ) tickMMStoreRebalancers (ctx context.Context , tick time.Time , state state.State ) {
380+ stores := s .state .Stores ()
381+ s .shuffler (len (stores ), func (i , j int ) { stores [i ], stores [j ] = stores [j ], stores [i ] })
382+ for _ , store := range stores {
383+ s .mmSRs [store .StoreID ()].Tick (ctx , tick , state )
384+ }
385+ }
386+
345387// tickMetrics prints the metrics up to the given tick.
346388func (s * Simulator ) tickMetrics (ctx context.Context , tick time.Time ) {
347389 s .metrics .Tick (ctx , tick , s .state )
0 commit comments