From 972d746a79354bbf077d75993fb88c6328746250 Mon Sep 17 00:00:00 2001 From: sumeerbhola Date: Fri, 7 Nov 2025 08:54:09 -0500 Subject: [PATCH] mma: more cleanup of pending changes The existing PendingRangeChange is now used in all the exported Allocator methods. This makes it clear that the set of changes are being treated as an atomic unit in the interface. The MMA code also reduces the switching between pendingReplicaChange, ChangeID and ReplicaChange which was confusing. It now uses pendingReplicaChange in more cases. Since we use PendingRangeChange to describe the change, and then later annotate it with the change ids, start time etc., there is some refactoring which involves producing a change using MakePendingRangeChange, and later changing its state using clusterState.addPendingRangeChange. Minor cleanup: The Allocator interface now includes integration methods implemented by allocatorState, as originally intended. Informs #157049 Epic: CRDB-55052 Release note: None --- .../allocator/mmaprototype/allocator.go | 54 +++--- .../allocator/mmaprototype/allocator_state.go | 128 ++++++------- .../allocator/mmaprototype/cluster_state.go | 175 ++++++++++-------- .../mmaprototype/cluster_state_test.go | 7 +- pkg/kv/kvserver/mma_store_rebalancer.go | 2 +- .../kvserver/mmaintegration/allocator_op.go | 13 +- .../kvserver/mmaintegration/allocator_sync.go | 53 +++--- .../kvserver/mmaintegration/mma_conversion.go | 12 +- 8 files changed, 237 insertions(+), 207 deletions(-) diff --git a/pkg/kv/kvserver/allocator/mmaprototype/allocator.go b/pkg/kv/kvserver/allocator/mmaprototype/allocator.go index adaf610613f1..a79a47c1f1cb 100644 --- a/pkg/kv/kvserver/allocator/mmaprototype/allocator.go +++ b/pkg/kv/kvserver/allocator/mmaprototype/allocator.go @@ -43,34 +43,29 @@ type Allocator interface { // associated node in the cluster. ProcessStoreLoadMsg(ctx context.Context, msg *StoreLoadMsg) - // TODO(sumeer): only a subset of the fields in - // pendingReplicaChange/PendingRangeChange are relevant to the caller. Hide - // the remaining. - // Methods related to making changes. - // AdjustPendingChangesDisposition is optional feedback to inform the - // allocator of success or failure of proposed changes. For successful - // changes, this is a faster way to know about success than waiting for the - // next ProcessNodeLoadResponse from the local node. For failed changes, in - // the absence of this feedback, proposed changes that have not been enacted - // in N seconds will be garbage collected and assumed to have failed. + // AdjustPendingChangeDisposition is optional feedback to inform the + // allocator of success or failure of proposed changes to a range. For + // successful changes, this is a faster way to know about success than + // waiting for the next ProcessNodeLoadResponse from the local node. For + // failed changes, in the absence of this feedback, proposed changes that + // have not been enacted in N seconds will be garbage collected and assumed + // to have failed. // - // Calls to AdjustPendingChangesDisposition must be correctly sequenced with + // Calls to AdjustPendingChangeDisposition must be correctly sequenced with // full state updates from the local node provided in // ProcessNodeLoadResponse. - // - // REQUIRES: len(changes) > 0 and all changes are to the same range. - AdjustPendingChangesDisposition(changes []ChangeID, success bool) + AdjustPendingChangeDisposition(change PendingRangeChange, success bool) - // RegisterExternalChanges informs this allocator about yet to complete + // RegisterExternalChange informs this allocator about yet to complete // changes to the cluster which were not initiated by this allocator. The - // caller is returned a list of ChangeIDs, corresponding 1:1 to each replica - // change provided as an argument. The returned list of ChangeIDs should then - // be used to call AdjustPendingChangesDisposition when the changes are - // completed, either successfully or not. All changes should correspond to the - // same range. - RegisterExternalChanges(changes []ReplicaChange) []ChangeID + // ownership of all state inside change is handed off to the callee. If ok + // is true, the change was registered, and the caller should subsequently + // use the same change in a subsequent call to + // AdjustPendingChangeDisposition when the changes are completed, either + // successfully or not. If ok is false, the change was not registered. + RegisterExternalChange(change PendingRangeChange) (ok bool) // ComputeChanges is called periodically and frequently, say every 10s. // @@ -152,6 +147,23 @@ type Allocator interface { // // TODO(sumeer): remove once the integration is properly done. KnownStores() map[roachpb.StoreID]struct{} + + // BuildMMARebalanceAdvisor is called by the allocator sync to build a + // MMARebalanceAdvisor for the given existing store and candidates. The + // advisor should be later passed to IsInConflictWithMMA to determine if a + // given candidate is in conflict with the existing store. + // + // TODO(sumeer): merge the above comment with the comment in the + // implementation. + BuildMMARebalanceAdvisor(existing roachpb.StoreID, cands []roachpb.StoreID) *MMARebalanceAdvisor + + // IsInConflictWithMMA is called by the allocator sync to determine if the + // given candidate is in conflict with the existing store. + // + // TODO(sumeer): merge the above comment with the comment in the + // implementation. + IsInConflictWithMMA( + ctx context.Context, cand roachpb.StoreID, advisor *MMARebalanceAdvisor, cpuOnly bool) bool } // Avoid unused lint errors. diff --git a/pkg/kv/kvserver/allocator/mmaprototype/allocator_state.go b/pkg/kv/kvserver/allocator/mmaprototype/allocator_state.go index f644ddec8b9a..7f1fc42889c6 100644 --- a/pkg/kv/kvserver/allocator/mmaprototype/allocator_state.go +++ b/pkg/kv/kvserver/allocator/mmaprototype/allocator_state.go @@ -544,16 +544,14 @@ func (cs *clusterState) rebalanceStores( NodeID: ss.NodeID, StoreID: ss.StoreID, } - leaseChanges := MakeLeaseTransferChanges( + replicaChanges := MakeLeaseTransferChanges( rangeID, rstate.replicas, rstate.load, addTarget, removeTarget) - if err := cs.preCheckOnApplyReplicaChanges(leaseChanges[:]); err != nil { - panic(errors.Wrapf(err, "pre-check failed for lease transfer %v", leaseChanges)) + leaseChange := MakePendingRangeChange(rangeID, replicaChanges[:]) + if err := cs.preCheckOnApplyReplicaChanges(leaseChange.pendingReplicaChanges); err != nil { + panic(errors.Wrapf(err, "pre-check failed for lease transfer %v", leaseChange)) } - pendingChanges := cs.createPendingChanges(leaseChanges[:]...) - changes = append(changes, PendingRangeChange{ - RangeID: rangeID, - pendingReplicaChanges: pendingChanges[:], - }) + cs.addPendingRangeChange(leaseChange) + changes = append(changes, leaseChange) leaseTransferCount++ if changes[len(changes)-1].IsChangeReplicas() || !changes[len(changes)-1].IsTransferLease() { panic(fmt.Sprintf("lease transfer is invalid: %v", changes[len(changes)-1])) @@ -763,15 +761,13 @@ func (cs *clusterState) rebalanceStores( } replicaChanges := makeRebalanceReplicaChanges( rangeID, rstate.replicas, rstate.load, addTarget, removeTarget) - if err = cs.preCheckOnApplyReplicaChanges(replicaChanges[:]); err != nil { + rangeChange := MakePendingRangeChange(rangeID, replicaChanges[:]) + if err = cs.preCheckOnApplyReplicaChanges(rangeChange.pendingReplicaChanges); err != nil { panic(errors.Wrapf(err, "pre-check failed for replica changes: %v for %v", replicaChanges, rangeID)) } - pendingChanges := cs.createPendingChanges(replicaChanges[:]...) - changes = append(changes, PendingRangeChange{ - RangeID: rangeID, - pendingReplicaChanges: pendingChanges[:], - }) + cs.addPendingRangeChange(rangeChange) + changes = append(changes, rangeChange) rangeMoveCount++ log.KvDistribution.VInfof(ctx, 2, "result(success): rebalancing r%v from s%v to s%v [change: %v] with resulting loads source: %v target: %v", @@ -807,93 +803,73 @@ func (a *allocatorState) SetStore(store StoreAttributesAndLocality) { a.cs.setStore(store) } -// ProcessStoreLeaseholderMsg implements the Allocator interface. +// ProcessStoreLoadMsg implements the Allocator interface. func (a *allocatorState) ProcessStoreLoadMsg(ctx context.Context, msg *StoreLoadMsg) { a.mu.Lock() defer a.mu.Unlock() a.cs.processStoreLoadMsg(ctx, msg) } -// AdjustPendingChangesDisposition implements the Allocator interface. -func (a *allocatorState) AdjustPendingChangesDisposition(changeIDs []ChangeID, success bool) { +// AdjustPendingChangeDisposition implements the Allocator interface. +func (a *allocatorState) AdjustPendingChangeDisposition(change PendingRangeChange, success bool) { a.mu.Lock() defer a.mu.Unlock() - // NB: It is possible that some of the changeIDs have already been enacted - // via StoreLeaseholderMsg, and even been garbage collected. So no - // assumption can be made about whether these changeIDs will be found in the - // allocator's state. - if !success { - // Gather the changes that are found and need to be undone. - replicaChanges := make([]ReplicaChange, 0, len(changeIDs)) - for _, changeID := range changeIDs { - change, ok := a.cs.pendingChanges[changeID] - if !ok { - continue - } - rs, ok := a.cs.ranges[change.rangeID] - if !ok { - panic(errors.AssertionFailedf("range %v not found in cluster state", change.rangeID)) - } - if rs.pendingChangeNoRollback { - // All the changes are to the same range, so return. - return - } - replicaChanges = append(replicaChanges, change.ReplicaChange) - } - if len(replicaChanges) == 0 { - return + rs, ok := a.cs.ranges[change.RangeID] + if !ok { + // Range no longer exists. This can happen if the StoreLeaseholderMsg + // which included the effect of the change that transferred the lease away + // was already processed, causing the range to no longer be tracked by the + // allocator. + return + } + if !success && rs.pendingChangeNoRollback { + // Not allowed to undo. + return + } + // NB: It is possible that some of the changes have already been enacted via + // StoreLeaseholderMsg, and even been garbage collected. So no assumption + // can be made about whether these changes will be found in the allocator's + // state. We gather the found changes. + var changes []*pendingReplicaChange + for _, c := range change.pendingReplicaChanges { + ch, ok := a.cs.pendingChanges[c.ChangeID] + if !ok { + continue } - // Check that we can undo these changes. If not, log and return. - if err := a.cs.preCheckOnUndoReplicaChanges(replicaChanges); err != nil { - // TODO(sumeer): we should be able to panic here, once the interface - // contract says that all the proposed changes must be included in - // changeIDs. Without that contract, there may be a pair of changes - // (remove replica and lease from s1), (add replica and lease to s2), - // and the caller can provide the first changeID only, and the undo - // would cause two leaseholders. The pre-check would catch that here. - log.KvDistribution.Infof(context.Background(), "did not undo change %v: due to %v", changeIDs, err) - return + changes = append(changes, ch) + } + if len(changes) == 0 { + return + } + if !success { + // Check that we can undo these changes. + if err := a.cs.preCheckOnUndoReplicaChanges(changes); err != nil { + panic(err) } } - - for _, changeID := range changeIDs { - // We set !requireFound, since some of these pending changes may no longer - // exist in the allocator's state. For example, a StoreLeaseholderMsg that - // happened after the pending change was created and before this call to - // AdjustPendingChangesDisposition may have already removed the pending - // change. + for _, c := range changes { if success { - // TODO(sumeer): this code is implicitly assuming that all the changes - // on the rangeState are being enacted. And that is true of the current - // callers. We should explicitly state the assumption in the interface. - // Because if only some are being enacted, we ought to set - // pendingChangeNoRollback, and we don't bother to. - a.cs.pendingChangeEnacted(changeID, a.cs.ts.Now(), false) + a.cs.pendingChangeEnacted(c.ChangeID, a.cs.ts.Now()) } else { - a.cs.undoPendingChange(changeID, false) + a.cs.undoPendingChange(c.ChangeID) } } } -// RegisterExternalChanges implements the Allocator interface. All changes should -// correspond to the same range, panic otherwise. -func (a *allocatorState) RegisterExternalChanges(changes []ReplicaChange) []ChangeID { +// RegisterExternalChange implements the Allocator interface. +func (a *allocatorState) RegisterExternalChange(change PendingRangeChange) (ok bool) { a.mu.Lock() defer a.mu.Unlock() - if err := a.cs.preCheckOnApplyReplicaChanges(changes); err != nil { + if err := a.cs.preCheckOnApplyReplicaChanges(change.pendingReplicaChanges); err != nil { a.mmaMetrics.ExternalFailedToRegister.Inc(1) log.KvDistribution.Infof(context.Background(), "did not register external changes: due to %v", err) - return nil + return false } else { a.mmaMetrics.ExternaRegisterSuccess.Inc(1) } - pendingChanges := a.cs.createPendingChanges(changes...) - changeIDs := make([]ChangeID, len(pendingChanges)) - for i, pendingChange := range pendingChanges { - changeIDs[i] = pendingChange.ChangeID - } - return changeIDs + a.cs.addPendingRangeChange(change) + return true } // ComputeChanges implements the Allocator interface. diff --git a/pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go b/pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go index 915b60ca3922..fc4de85f7b5f 100644 --- a/pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go +++ b/pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go @@ -125,6 +125,8 @@ type ReplicaState struct { // ChangeID is a unique ID, in the context of this data-structure and when // receiving updates about enactment having happened or having been rejected // (by the component responsible for change enactment). +// +// TODO(sumeer): make ChangeID private. type ChangeID uint64 type ReplicaChangeType int @@ -151,6 +153,9 @@ func (s ReplicaChangeType) String() string { } } +// ReplicaChange describes a change to a replica. +// +// TODO(sumeer): make ReplicaChange private. type ReplicaChange struct { // The load this change adds to a store. The values will be negative if the // load is being removed. @@ -158,7 +163,8 @@ type ReplicaChange struct { secondaryLoadDelta SecondaryLoadVector // target is the target {store,node} for the change. - target roachpb.ReplicationTarget + target roachpb.ReplicationTarget + // TODO(sumeer): remove rangeID. rangeID roachpb.RangeID // NB: 0 is not a valid ReplicaID, but this component does not care about @@ -379,11 +385,53 @@ func makeRebalanceReplicaChanges( // PendingRangeChange is a proposed set of change(s) to a range. It can consist // of multiple pending replica changes, such as adding or removing replicas, or // transferring the lease. +// +// NB: pendingReplicaChanges is not visible outside the package, so we can be +// certain that callers outside this package that hold a PendingRangeChange +// cannot mutate the internals other than clearing the state. +// +// Additionally, for a PendingRangeChange returned outside the package, we +// ensure that the pendingReplicaChanges slice itself is not shared with the +// rangeState.pendingChanges slice since the rangeState.pendingChanges slice +// can have entries removed from it (and swapped around as part of removal). +// +// Some the state inside each *pendingReplicaChange is mutable at arbitrary +// points in time by the code inside this package (with the relevant locking, +// of course). Currently, this state is revisedGCTime, enactedAtTime. Neither +// of it is read by the public methods on PendingRangeChange. +// +// TODO(sumeer): when we expand the set of mutable fields, make a deep copy. type PendingRangeChange struct { RangeID roachpb.RangeID pendingReplicaChanges []*pendingReplicaChange } +// MakePendingRangeChange creates a PendingRangeChange for the given rangeID +// and changes. Certain internal aspects of the change, like the change-ids, +// start time etc., are not yet initialized, since those use internal state of +// MMA. Those will be initialized by MMA when this change is later handed to +// MMA for tracking, in clusterState.addPendingRangeChange. For external +// callers of MakePendingRangeChange, this happens transitively when +// Allocator.RegisterExternalChange is called. +func MakePendingRangeChange(rangeID roachpb.RangeID, changes []ReplicaChange) PendingRangeChange { + for _, c := range changes { + if c.rangeID != rangeID { + panic(errors.AssertionFailedf("all changes must be to the same range %d != %d", + c.rangeID, rangeID)) + } + } + prcs := make([]*pendingReplicaChange, len(changes)) + for i, c := range changes { + prcs[i] = &pendingReplicaChange{ + ReplicaChange: c, + } + } + return PendingRangeChange{ + RangeID: rangeID, + pendingReplicaChanges: prcs, + } +} + func (prc PendingRangeChange) String() string { return redact.StringWithoutMarkers(prc) } @@ -416,16 +464,6 @@ func (prc PendingRangeChange) SafeFormat(w redact.SafePrinter, _ rune) { w.Print("]") } -// ChangeIDs returns the list of ChangeIDs associated with the pending range -// change. -func (prc PendingRangeChange) ChangeIDs() []ChangeID { - cids := make([]ChangeID, len(prc.pendingReplicaChanges)) - for i, c := range prc.pendingReplicaChanges { - cids[i] = c.ChangeID - } - return cids -} - // IsChangeReplicas returns true if the pending range change is a change // replicas operation. func (prc PendingRangeChange) IsChangeReplicas() bool { @@ -527,12 +565,6 @@ func (prc PendingRangeChange) LeaseTransferFrom() roachpb.StoreID { panic("unreachable") } -// TODO(sumeer): we have various methods that take slices of either ChangeIDs -// or pendingReplicaChanges or ReplicaChange, and have callers that already -// have or could first construct a slice of pendingReplicaChanges, and avoid -// various temporary slice construction and repeated map lookups. Clean this -// up. - // pendingReplicaChange is a proposed change to a single replica. Some // external entity (the leaseholder of the range) may choose to enact this // change. It may not be enacted if it will cause some invariant (like the @@ -1003,6 +1035,8 @@ type rangeState struct { // Life-cycle matches clusterState.pendingChanges. The consolidated // rangeState.pendingChanges across all ranges in clusterState.ranges will // be identical to clusterState.pendingChanges. + // + // TODO(sumeer): replace by PendingRangeChange. pendingChanges []*pendingReplicaChange // When set, the pendingChanges can not be rolled back anymore. They have // to be enacted, or discarded wholesale in favor of the latest RangeMsg @@ -1035,12 +1069,12 @@ type rangeState struct { // that the change we did not know about has been enacted. // // One may wonder how such unknown changes can happen, given that other - // components call mmaprototype.Allocator.RegisterExternalChanges. One example is + // components call mmaprototype.Allocator.RegisterExternalChange. One example is // when MMA does not currently know about a range. Say the lease gets // transferred to the local store, but MMA has not yet been called with a // StoreLeaseholderMsg, but replicateQueue already knows about this lease, // and decides to initiate a transfer of replicas between two remote stores - // (to equalize replica counts). When mmaprototype.Allocator.RegisterExternalChanges + // (to equalize replica counts). When mmaprototype.Allocator.RegisterExternalChange // is called, there is no record of this range in MMA (since it wasn't the // leaseholder), and the change is ignored. When the next // StoreLeaseholderMsg is provided to MMA it now knows about the range, and @@ -1324,7 +1358,6 @@ func (cs *clusterState) processStoreLeaseholderMsgInternal( // leaseholder. Note that this is the only code path where a subset of // pending changes for a replica can be considered enacted. var remainingChanges, enactedChanges []*pendingReplicaChange - var remainingReplicaChanges []ReplicaChange for _, change := range rs.pendingChanges { ss := cs.stores[change.target.StoreID] adjustedReplica, ok := ss.adjusted.replicas[rangeMsg.RangeID] @@ -1336,7 +1369,6 @@ func (cs *clusterState) processStoreLeaseholderMsgInternal( enactedChanges = append(enactedChanges, change) } else { remainingChanges = append(remainingChanges, change) - remainingReplicaChanges = append(remainingReplicaChanges, change.ReplicaChange) } } gcRemainingChanges := false @@ -1380,7 +1412,7 @@ func (cs *clusterState) processStoreLeaseholderMsgInternal( // example_skewed_cpu_even_ranges_mma_and_queues. I suspect the latter // is because MMA is acting faster to undo the effects of the changes // made by the replicate and lease queues. - cs.pendingChangeEnacted(change.ChangeID, now, true) + cs.pendingChangeEnacted(change.ChangeID, now) } // INVARIANT: remainingChanges and rs.pendingChanges contain the same set // of changes, though possibly in different order. @@ -1404,7 +1436,7 @@ func (cs *clusterState) processStoreLeaseholderMsgInternal( // remainingChanges, but they are not the same slice. rc := rs.pendingChanges rs.pendingChanges = nil - err := cs.preCheckOnApplyReplicaChanges(remainingReplicaChanges) + err := cs.preCheckOnApplyReplicaChanges(remainingChanges) valid = err == nil if err != nil { reason = redact.Sprint(err) @@ -1479,12 +1511,15 @@ func (cs *clusterState) processStoreLeaseholderMsgInternal( // Since this range is going away, mark all the pending changes as // enacted. This will allow the load adjustments to also be garbage // collected in the future. + // + // Gather the changeIDs, since calls to pendingChangeEnacted modify the + // rs.pendingChanges slice. changeIDs := make([]ChangeID, len(rs.pendingChanges)) for i, change := range rs.pendingChanges { changeIDs[i] = change.ChangeID } for _, changeID := range changeIDs { - cs.pendingChangeEnacted(changeID, now, true) + cs.pendingChangeEnacted(changeID, now) } // Remove from the storeStates. for _, replica := range rs.replicas { @@ -1687,29 +1722,25 @@ func (cs *clusterState) gcPendingChanges(now time.Time) { if !startTime.Add(pendingChangeGCDuration).Before(now) { continue } - var replicaChanges []ReplicaChange + if err := cs.preCheckOnUndoReplicaChanges(rs.pendingChanges); err != nil { + panic(err) + } + // Gather the changeIDs, since calls to undoPendingChange modify the + // rs.pendingChanges slice. var changeIDs []ChangeID for _, pendingChange := range rs.pendingChanges { - replicaChanges = append(replicaChanges, pendingChange.ReplicaChange) changeIDs = append(changeIDs, pendingChange.ChangeID) } - if err := cs.preCheckOnUndoReplicaChanges(replicaChanges); err != nil { - panic(err) - } for _, changeID := range changeIDs { - cs.undoPendingChange(changeID, true) + cs.undoPendingChange(changeID) } } } -func (cs *clusterState) pendingChangeEnacted(cid ChangeID, enactedAt time.Time, requireFound bool) { +func (cs *clusterState) pendingChangeEnacted(cid ChangeID, enactedAt time.Time) { change, ok := cs.pendingChanges[cid] if !ok { - if requireFound { - panic(fmt.Sprintf("change %v not found %v", cid, printMapPendingChanges(cs.pendingChanges))) - } else { - return - } + panic(fmt.Sprintf("change %v not found %v", cid, printMapPendingChanges(cs.pendingChanges))) } change.enactedAtTime = enactedAt rs, ok := cs.ranges[change.rangeID] @@ -1721,18 +1752,13 @@ func (cs *clusterState) pendingChangeEnacted(cid ChangeID, enactedAt time.Time, delete(cs.pendingChanges, change.ChangeID) } -// undoPendingChange reverses the change with ID cid, if it exists. +// undoPendingChange reverses the change with ID cid. // -// REQUIRES: if requireFound, the change exists; the change is not marked as -// no-rollback. -func (cs *clusterState) undoPendingChange(cid ChangeID, requireFound bool) { +// REQUIRES: the change is not marked as no-rollback. +func (cs *clusterState) undoPendingChange(cid ChangeID) { change, ok := cs.pendingChanges[cid] if !ok { - if requireFound { - panic(errors.AssertionFailedf("change %v not found %v", cid, printMapPendingChanges(cs.pendingChanges))) - } else { - return - } + panic(errors.AssertionFailedf("change %v not found %v", cid, printMapPendingChanges(cs.pendingChanges))) } rs, ok := cs.ranges[change.rangeID] if !ok { @@ -1787,21 +1813,22 @@ func printPendingChanges(changes []*pendingReplicaChange) string { return buf.String() } -// createPendingChanges takes a set of changes applies the changes as pending. -// The application updates the adjusted load, tracked pending changes and -// changeID to reflect the pending application. +// addPendingRangeChange takes a range change containing a set of replica +// changes, and applies the changes as pending. The application updates the +// adjusted load, tracked pending changes and changeIDs to reflect the pending +// application. It updates the *pendingReplicaChanges inside the change. // -// REQUIRES: all the changes are to the same range, and that the range has no -// pending changes. -func (cs *clusterState) createPendingChanges(changes ...ReplicaChange) []*pendingReplicaChange { - if len(changes) == 0 { - return nil - } - rangeID := changes[0].rangeID - for i := 1; i < len(changes); i++ { - if changes[i].rangeID != rangeID { +// REQUIRES: all the replica changes are to the same range, and that the range +// has no pending changes. +func (cs *clusterState) addPendingRangeChange(change PendingRangeChange) { + if len(change.pendingReplicaChanges) == 0 { + return + } + rangeID := change.RangeID + for _, c := range change.pendingReplicaChanges { + if c.rangeID != rangeID { panic(errors.AssertionFailedf("all changes must be to the same range %d != %d", - changes[i].rangeID, rangeID)) + c.rangeID, rangeID)) } } rs := cs.ranges[rangeID] @@ -1812,28 +1839,26 @@ func (cs *clusterState) createPendingChanges(changes ...ReplicaChange) []*pendin // NB: rs != nil is also required, but we also check that in a method called // below. - var pendingChanges []*pendingReplicaChange + pendingChanges := change.pendingReplicaChanges now := cs.ts.Now() - for _, change := range changes { - cs.applyReplicaChange(change, true) + for _, pendingChange := range pendingChanges { + cs.applyReplicaChange(pendingChange.ReplicaChange, true) cs.changeSeqGen++ cid := cs.changeSeqGen - pendingChange := &pendingReplicaChange{ - ChangeID: cid, - ReplicaChange: change, - startTime: now, - enactedAtTime: time.Time{}, - } - storeState := cs.stores[change.target.StoreID] - rangeState := cs.ranges[change.rangeID] + pendingChange.ChangeID = cid + pendingChange.startTime = now + pendingChange.enactedAtTime = time.Time{} + storeState := cs.stores[pendingChange.target.StoreID] + rangeState := cs.ranges[rangeID] cs.pendingChanges[cid] = pendingChange storeState.adjusted.loadPendingChanges[cid] = pendingChange rangeState.pendingChanges = append(rangeState.pendingChanges, pendingChange) rangeState.pendingChangeNoRollback = false - log.KvDistribution.VInfof(context.Background(), 3, "createPendingChanges: change_id=%v, range_id=%v, change=%v", cid, change.rangeID, change) + log.KvDistribution.VInfof(context.Background(), 3, + "addPendingRangeChange: change_id=%v, range_id=%v, change=%v", + cid, rangeID, pendingChange.ReplicaChange) pendingChanges = append(pendingChanges, pendingChange) } - return pendingChanges } // preCheckOnApplyReplicaChanges does some validation of the changes being @@ -1869,7 +1894,9 @@ func (cs *clusterState) createPendingChanges(changes ...ReplicaChange) []*pendin // // TODO(sumeer): allow arbitrary number of changes, but validate that at most // one change per store. -func (cs *clusterState) preCheckOnApplyReplicaChanges(changes []ReplicaChange) error { +// +// TODO(sumeer): change to take PendingRangeChange as parameter +func (cs *clusterState) preCheckOnApplyReplicaChanges(changes []*pendingReplicaChange) error { // preApplyReplicaChange is called before applying a change to the cluster // state. if len(changes) != 1 && len(changes) != 2 && len(changes) != 4 { @@ -1929,7 +1956,9 @@ func (cs *clusterState) preCheckOnApplyReplicaChanges(changes []ReplicaChange) e // before allowing a change to be added (including re-addition after a // StoreLeaseholderMsg), we should never have invalidity during an undo, if // all the changes are being undone. -func (cs *clusterState) preCheckOnUndoReplicaChanges(changes []ReplicaChange) error { +// +// TODO(sumeer): change to take PendingRangeChange as parameter +func (cs *clusterState) preCheckOnUndoReplicaChanges(changes []*pendingReplicaChange) error { if len(changes) == 0 { panic(errors.AssertionFailedf("no changes to undo")) } diff --git a/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_test.go b/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_test.go index 377880594b89..4ef24cb893b0 100644 --- a/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_test.go +++ b/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_test.go @@ -487,7 +487,8 @@ func TestClusterState(t *testing.T) { changes = append(changes, rebalanceChanges[:]...) } } - cs.createPendingChanges(changes...) + rangeChange := MakePendingRangeChange(rangeID, changes) + cs.addPendingRangeChange(rangeChange) return printPendingChangesTest(testingGetPendingChanges(t, cs)) case "gc-pending-changes": @@ -503,10 +504,10 @@ func TestClusterState(t *testing.T) { for _, id := range changeIDsInt { if expectPanic { require.Panics(t, func() { - cs.undoPendingChange(id, true) + cs.undoPendingChange(id) }) } else { - cs.undoPendingChange(id, true) + cs.undoPendingChange(id) } } return printPendingChangesTest(testingGetPendingChanges(t, cs)) diff --git a/pkg/kv/kvserver/mma_store_rebalancer.go b/pkg/kv/kvserver/mma_store_rebalancer.go index 2d60bbfc0e02..4f7474e73c38 100644 --- a/pkg/kv/kvserver/mma_store_rebalancer.go +++ b/pkg/kv/kvserver/mma_store_rebalancer.go @@ -154,7 +154,7 @@ func (m *mmaStoreRebalancer) applyChange( ) error { repl := m.store.GetReplicaIfExists(change.RangeID) if repl == nil { - m.as.MarkChangesAsFailed(change.ChangeIDs()) + m.as.MarkChangeAsFailed(change) return errors.Errorf("replica not found for range %d", change.RangeID) } changeID := m.as.MMAPreApply(ctx, repl.RangeUsageInfo(), change) diff --git a/pkg/kv/kvserver/mmaintegration/allocator_op.go b/pkg/kv/kvserver/mmaintegration/allocator_op.go index 3e2189358ff0..baf0e23028e3 100644 --- a/pkg/kv/kvserver/mmaintegration/allocator_op.go +++ b/pkg/kv/kvserver/mmaintegration/allocator_op.go @@ -15,11 +15,14 @@ import ( // trackedAllocatorChange represents a change registered with AllocatorSync // (e.g. lease transfer or change replicas). type trackedAllocatorChange struct { - // changeIDs are the change IDs that are registered with mma. Nil if mma is - // disabled or the change cannot be registered with mma. If changeIDs is - // nil, PostApply does not need to inform mma. Otherwise, PostApply should - // inform mma by passing changeIDs to AdjustPendingChangesDisposition. - changeIDs []mmaprototype.ChangeID + // isMMARegistered is true if the change has been successfully registered + // with mma. When true, mmaChange is the registered change, and PostApply + // should inform mma by passing mmaChange to + // AdjustPendingChangesDisposition. It is false if mma is disabled or the + // change could not be registered with mma, in which case PostApply must not + // inform mma. + isMMARegistered bool + mmaChange mmaprototype.PendingRangeChange // Usage is range load usage. usage allocator.RangeUsageInfo // Exactly one of the following two fields will be set. diff --git a/pkg/kv/kvserver/mmaintegration/allocator_sync.go b/pkg/kv/kvserver/mmaintegration/allocator_sync.go index e458beff63f6..9de1743ecc86 100644 --- a/pkg/kv/kvserver/mmaintegration/allocator_sync.go +++ b/pkg/kv/kvserver/mmaintegration/allocator_sync.go @@ -42,12 +42,12 @@ type storePool interface { // mmaState is an interface that defines the methods that the allocator sync // needs to call on the mma. Using an interface to simplify testing. type mmaState interface { - // RegisterExternalChanges is called by the allocator sync to register + // RegisterExternalChange is called by the allocator sync to register // external changes with the mma. - RegisterExternalChanges(changes []mmaprototype.ReplicaChange) []mmaprototype.ChangeID - // AdjustPendingChangesDisposition is called by the allocator sync to adjust - // the disposition of pending changes. - AdjustPendingChangesDisposition(changeIDs []mmaprototype.ChangeID, success bool) + RegisterExternalChange(change mmaprototype.PendingRangeChange) (ok bool) + // AdjustPendingChangeDisposition is called by the allocator sync to adjust + // the disposition of pending changes to a range. + AdjustPendingChangeDisposition(change mmaprototype.PendingRangeChange, success bool) // BuildMMARebalanceAdvisor is called by the allocator sync to build a // MMARebalanceAdvisor for the given existing store and candidates. The // advisor should be later passed to IsInConflictWithMMA to determine if a @@ -58,6 +58,8 @@ type mmaState interface { IsInConflictWithMMA(ctx context.Context, cand roachpb.StoreID, advisor *mmaprototype.MMARebalanceAdvisor, cpuOnly bool) bool } +var _ mmaState = mmaprototype.Allocator(nil) + // TODO(wenyihu6): make sure allocator sync can tolerate cluster setting // changes not happening consistently or atomically across components. (For // example, replicate queue may call into allocator sync when mma is enabled but @@ -148,13 +150,16 @@ func (as *AllocatorSync) NonMMAPreTransferLease( usage allocator.RangeUsageInfo, transferFrom, transferTo roachpb.ReplicationTarget, ) SyncChangeID { - var changeIDs []mmaprototype.ChangeID + var isMMARegistered bool + var mmaChange mmaprototype.PendingRangeChange if kvserverbase.LoadBasedRebalancingModeIsMMA(&as.st.SV) { - changeIDs = as.mmaAllocator.RegisterExternalChanges(convertLeaseTransferToMMA(desc, usage, transferFrom, transferTo)) + mmaChange = convertLeaseTransferToMMA(desc, usage, transferFrom, transferTo) + isMMARegistered = as.mmaAllocator.RegisterExternalChange(mmaChange) } trackedChange := trackedAllocatorChange{ - changeIDs: changeIDs, - usage: usage, + isMMARegistered: isMMARegistered, + mmaChange: mmaChange, + usage: usage, leaseTransferOp: &leaseTransferOp{ transferFrom: transferFrom.StoreID, transferTo: transferTo.StoreID, @@ -175,13 +180,16 @@ func (as *AllocatorSync) NonMMAPreChangeReplicas( changes kvpb.ReplicationChanges, leaseholderStoreID roachpb.StoreID, ) SyncChangeID { - var changeIDs []mmaprototype.ChangeID + var isMMARegistered bool + var mmaChange mmaprototype.PendingRangeChange if kvserverbase.LoadBasedRebalancingModeIsMMA(&as.st.SV) { - changeIDs = as.mmaAllocator.RegisterExternalChanges(convertReplicaChangeToMMA(desc, usage, changes, leaseholderStoreID)) + mmaChange = convertReplicaChangeToMMA(desc, usage, changes, leaseholderStoreID) + isMMARegistered = as.mmaAllocator.RegisterExternalChange(mmaChange) } trackedChange := trackedAllocatorChange{ - changeIDs: changeIDs, - usage: usage, + isMMARegistered: isMMARegistered, + mmaChange: mmaChange, + usage: usage, changeReplicasOp: &changeReplicasOp{ chgs: changes, }, @@ -202,8 +210,9 @@ func (as *AllocatorSync) MMAPreApply( pendingChange mmaprototype.PendingRangeChange, ) SyncChangeID { trackedChange := trackedAllocatorChange{ - changeIDs: pendingChange.ChangeIDs(), - usage: usage, + isMMARegistered: true, + mmaChange: pendingChange, + usage: usage, } switch { case pendingChange.IsTransferLease(): @@ -225,11 +234,11 @@ func (as *AllocatorSync) MMAPreApply( return as.addTrackedChange(trackedChange) } -// MarkChangesAsFailed marks the given change IDs as failed without going -// through allocator sync. This is used when mma changes fail before even -// registering with mma via MMAPreApply. -func (as *AllocatorSync) MarkChangesAsFailed(changeIDs []mmaprototype.ChangeID) { - as.mmaAllocator.AdjustPendingChangesDisposition(changeIDs, false /* success */) +// MarkChangeAsFailed marks the given changes to the range as failed without +// going through allocator sync. This is used when mma changes fail before +// even registering with mma via MMAPreApply. +func (as *AllocatorSync) MarkChangeAsFailed(change mmaprototype.PendingRangeChange) { + as.mmaAllocator.AdjustPendingChangeDisposition(change, false /* success */) } // PostApply is called by the lease/replicate queue to apply a change to the @@ -237,9 +246,9 @@ func (as *AllocatorSync) MarkChangesAsFailed(changeIDs []mmaprototype.ChangeID) // NonMMAPreTransferLease or NonMMAPreChangeReplicas. func (as *AllocatorSync) PostApply(syncChangeID SyncChangeID, success bool) { trackedChange := as.getTrackedChange(syncChangeID) - if changeIDs := trackedChange.changeIDs; changeIDs != nil { + if trackedChange.isMMARegistered { // Call into without checking cluster setting. - as.mmaAllocator.AdjustPendingChangesDisposition(changeIDs, success) + as.mmaAllocator.AdjustPendingChangeDisposition(trackedChange.mmaChange, success) } if !success { return diff --git a/pkg/kv/kvserver/mmaintegration/mma_conversion.go b/pkg/kv/kvserver/mmaintegration/mma_conversion.go index 1b16025915c3..bbb562a49c45 100644 --- a/pkg/kv/kvserver/mmaintegration/mma_conversion.go +++ b/pkg/kv/kvserver/mmaintegration/mma_conversion.go @@ -21,7 +21,7 @@ func convertLeaseTransferToMMA( desc *roachpb.RangeDescriptor, usage allocator.RangeUsageInfo, transferFrom, transferTo roachpb.ReplicationTarget, -) []mmaprototype.ReplicaChange { +) mmaprototype.PendingRangeChange { // TODO(wenyihu6): we are passing existing replicas to // mmaprototype.MakeLeaseTransferChanges just to get the add and remove // replica state. See if things could be cleaned up. @@ -47,17 +47,17 @@ func convertLeaseTransferToMMA( transferTo, transferFrom, ) - return replicaChanges[:] + return mmaprototype.MakePendingRangeChange(desc.RangeID, replicaChanges[:]) } -// convertReplicaChangeToMMA converts a replica change to mma replica changes. -// It will be passed to mma.RegisterExternalChanges. +// convertReplicaChangeToMMA converts a replica change to a mma range change. +// It will be passed to mma.RegisterExternalChange. func convertReplicaChangeToMMA( desc *roachpb.RangeDescriptor, usage allocator.RangeUsageInfo, changes kvpb.ReplicationChanges, leaseholderStoreID roachpb.StoreID, -) []mmaprototype.ReplicaChange { +) mmaprototype.PendingRangeChange { rLoad := mmaRangeLoad(usage) replicaChanges := make([]mmaprototype.ReplicaChange, 0, len(changes)) replicaSet := desc.Replicas() @@ -122,5 +122,5 @@ func convertReplicaChangeToMMA( panic("unimplemented change type") } } - return replicaChanges + return mmaprototype.MakePendingRangeChange(desc.RangeID, replicaChanges) }