diff --git a/pkg/kv/kvserver/allocator/mmaprototype/allocator.go b/pkg/kv/kvserver/allocator/mmaprototype/allocator.go index adaf610613f1..a79a47c1f1cb 100644 --- a/pkg/kv/kvserver/allocator/mmaprototype/allocator.go +++ b/pkg/kv/kvserver/allocator/mmaprototype/allocator.go @@ -43,34 +43,29 @@ type Allocator interface { // associated node in the cluster. ProcessStoreLoadMsg(ctx context.Context, msg *StoreLoadMsg) - // TODO(sumeer): only a subset of the fields in - // pendingReplicaChange/PendingRangeChange are relevant to the caller. Hide - // the remaining. - // Methods related to making changes. - // AdjustPendingChangesDisposition is optional feedback to inform the - // allocator of success or failure of proposed changes. For successful - // changes, this is a faster way to know about success than waiting for the - // next ProcessNodeLoadResponse from the local node. For failed changes, in - // the absence of this feedback, proposed changes that have not been enacted - // in N seconds will be garbage collected and assumed to have failed. + // AdjustPendingChangeDisposition is optional feedback to inform the + // allocator of success or failure of proposed changes to a range. For + // successful changes, this is a faster way to know about success than + // waiting for the next ProcessNodeLoadResponse from the local node. For + // failed changes, in the absence of this feedback, proposed changes that + // have not been enacted in N seconds will be garbage collected and assumed + // to have failed. // - // Calls to AdjustPendingChangesDisposition must be correctly sequenced with + // Calls to AdjustPendingChangeDisposition must be correctly sequenced with // full state updates from the local node provided in // ProcessNodeLoadResponse. - // - // REQUIRES: len(changes) > 0 and all changes are to the same range. - AdjustPendingChangesDisposition(changes []ChangeID, success bool) + AdjustPendingChangeDisposition(change PendingRangeChange, success bool) - // RegisterExternalChanges informs this allocator about yet to complete + // RegisterExternalChange informs this allocator about yet to complete // changes to the cluster which were not initiated by this allocator. The - // caller is returned a list of ChangeIDs, corresponding 1:1 to each replica - // change provided as an argument. The returned list of ChangeIDs should then - // be used to call AdjustPendingChangesDisposition when the changes are - // completed, either successfully or not. All changes should correspond to the - // same range. - RegisterExternalChanges(changes []ReplicaChange) []ChangeID + // ownership of all state inside change is handed off to the callee. If ok + // is true, the change was registered, and the caller should subsequently + // use the same change in a subsequent call to + // AdjustPendingChangeDisposition when the changes are completed, either + // successfully or not. If ok is false, the change was not registered. + RegisterExternalChange(change PendingRangeChange) (ok bool) // ComputeChanges is called periodically and frequently, say every 10s. // @@ -152,6 +147,23 @@ type Allocator interface { // // TODO(sumeer): remove once the integration is properly done. KnownStores() map[roachpb.StoreID]struct{} + + // BuildMMARebalanceAdvisor is called by the allocator sync to build a + // MMARebalanceAdvisor for the given existing store and candidates. The + // advisor should be later passed to IsInConflictWithMMA to determine if a + // given candidate is in conflict with the existing store. + // + // TODO(sumeer): merge the above comment with the comment in the + // implementation. + BuildMMARebalanceAdvisor(existing roachpb.StoreID, cands []roachpb.StoreID) *MMARebalanceAdvisor + + // IsInConflictWithMMA is called by the allocator sync to determine if the + // given candidate is in conflict with the existing store. + // + // TODO(sumeer): merge the above comment with the comment in the + // implementation. + IsInConflictWithMMA( + ctx context.Context, cand roachpb.StoreID, advisor *MMARebalanceAdvisor, cpuOnly bool) bool } // Avoid unused lint errors. diff --git a/pkg/kv/kvserver/allocator/mmaprototype/allocator_state.go b/pkg/kv/kvserver/allocator/mmaprototype/allocator_state.go index f644ddec8b9a..7f1fc42889c6 100644 --- a/pkg/kv/kvserver/allocator/mmaprototype/allocator_state.go +++ b/pkg/kv/kvserver/allocator/mmaprototype/allocator_state.go @@ -544,16 +544,14 @@ func (cs *clusterState) rebalanceStores( NodeID: ss.NodeID, StoreID: ss.StoreID, } - leaseChanges := MakeLeaseTransferChanges( + replicaChanges := MakeLeaseTransferChanges( rangeID, rstate.replicas, rstate.load, addTarget, removeTarget) - if err := cs.preCheckOnApplyReplicaChanges(leaseChanges[:]); err != nil { - panic(errors.Wrapf(err, "pre-check failed for lease transfer %v", leaseChanges)) + leaseChange := MakePendingRangeChange(rangeID, replicaChanges[:]) + if err := cs.preCheckOnApplyReplicaChanges(leaseChange.pendingReplicaChanges); err != nil { + panic(errors.Wrapf(err, "pre-check failed for lease transfer %v", leaseChange)) } - pendingChanges := cs.createPendingChanges(leaseChanges[:]...) - changes = append(changes, PendingRangeChange{ - RangeID: rangeID, - pendingReplicaChanges: pendingChanges[:], - }) + cs.addPendingRangeChange(leaseChange) + changes = append(changes, leaseChange) leaseTransferCount++ if changes[len(changes)-1].IsChangeReplicas() || !changes[len(changes)-1].IsTransferLease() { panic(fmt.Sprintf("lease transfer is invalid: %v", changes[len(changes)-1])) @@ -763,15 +761,13 @@ func (cs *clusterState) rebalanceStores( } replicaChanges := makeRebalanceReplicaChanges( rangeID, rstate.replicas, rstate.load, addTarget, removeTarget) - if err = cs.preCheckOnApplyReplicaChanges(replicaChanges[:]); err != nil { + rangeChange := MakePendingRangeChange(rangeID, replicaChanges[:]) + if err = cs.preCheckOnApplyReplicaChanges(rangeChange.pendingReplicaChanges); err != nil { panic(errors.Wrapf(err, "pre-check failed for replica changes: %v for %v", replicaChanges, rangeID)) } - pendingChanges := cs.createPendingChanges(replicaChanges[:]...) - changes = append(changes, PendingRangeChange{ - RangeID: rangeID, - pendingReplicaChanges: pendingChanges[:], - }) + cs.addPendingRangeChange(rangeChange) + changes = append(changes, rangeChange) rangeMoveCount++ log.KvDistribution.VInfof(ctx, 2, "result(success): rebalancing r%v from s%v to s%v [change: %v] with resulting loads source: %v target: %v", @@ -807,93 +803,73 @@ func (a *allocatorState) SetStore(store StoreAttributesAndLocality) { a.cs.setStore(store) } -// ProcessStoreLeaseholderMsg implements the Allocator interface. +// ProcessStoreLoadMsg implements the Allocator interface. func (a *allocatorState) ProcessStoreLoadMsg(ctx context.Context, msg *StoreLoadMsg) { a.mu.Lock() defer a.mu.Unlock() a.cs.processStoreLoadMsg(ctx, msg) } -// AdjustPendingChangesDisposition implements the Allocator interface. -func (a *allocatorState) AdjustPendingChangesDisposition(changeIDs []ChangeID, success bool) { +// AdjustPendingChangeDisposition implements the Allocator interface. +func (a *allocatorState) AdjustPendingChangeDisposition(change PendingRangeChange, success bool) { a.mu.Lock() defer a.mu.Unlock() - // NB: It is possible that some of the changeIDs have already been enacted - // via StoreLeaseholderMsg, and even been garbage collected. So no - // assumption can be made about whether these changeIDs will be found in the - // allocator's state. - if !success { - // Gather the changes that are found and need to be undone. - replicaChanges := make([]ReplicaChange, 0, len(changeIDs)) - for _, changeID := range changeIDs { - change, ok := a.cs.pendingChanges[changeID] - if !ok { - continue - } - rs, ok := a.cs.ranges[change.rangeID] - if !ok { - panic(errors.AssertionFailedf("range %v not found in cluster state", change.rangeID)) - } - if rs.pendingChangeNoRollback { - // All the changes are to the same range, so return. - return - } - replicaChanges = append(replicaChanges, change.ReplicaChange) - } - if len(replicaChanges) == 0 { - return + rs, ok := a.cs.ranges[change.RangeID] + if !ok { + // Range no longer exists. This can happen if the StoreLeaseholderMsg + // which included the effect of the change that transferred the lease away + // was already processed, causing the range to no longer be tracked by the + // allocator. + return + } + if !success && rs.pendingChangeNoRollback { + // Not allowed to undo. + return + } + // NB: It is possible that some of the changes have already been enacted via + // StoreLeaseholderMsg, and even been garbage collected. So no assumption + // can be made about whether these changes will be found in the allocator's + // state. We gather the found changes. + var changes []*pendingReplicaChange + for _, c := range change.pendingReplicaChanges { + ch, ok := a.cs.pendingChanges[c.ChangeID] + if !ok { + continue } - // Check that we can undo these changes. If not, log and return. - if err := a.cs.preCheckOnUndoReplicaChanges(replicaChanges); err != nil { - // TODO(sumeer): we should be able to panic here, once the interface - // contract says that all the proposed changes must be included in - // changeIDs. Without that contract, there may be a pair of changes - // (remove replica and lease from s1), (add replica and lease to s2), - // and the caller can provide the first changeID only, and the undo - // would cause two leaseholders. The pre-check would catch that here. - log.KvDistribution.Infof(context.Background(), "did not undo change %v: due to %v", changeIDs, err) - return + changes = append(changes, ch) + } + if len(changes) == 0 { + return + } + if !success { + // Check that we can undo these changes. + if err := a.cs.preCheckOnUndoReplicaChanges(changes); err != nil { + panic(err) } } - - for _, changeID := range changeIDs { - // We set !requireFound, since some of these pending changes may no longer - // exist in the allocator's state. For example, a StoreLeaseholderMsg that - // happened after the pending change was created and before this call to - // AdjustPendingChangesDisposition may have already removed the pending - // change. + for _, c := range changes { if success { - // TODO(sumeer): this code is implicitly assuming that all the changes - // on the rangeState are being enacted. And that is true of the current - // callers. We should explicitly state the assumption in the interface. - // Because if only some are being enacted, we ought to set - // pendingChangeNoRollback, and we don't bother to. - a.cs.pendingChangeEnacted(changeID, a.cs.ts.Now(), false) + a.cs.pendingChangeEnacted(c.ChangeID, a.cs.ts.Now()) } else { - a.cs.undoPendingChange(changeID, false) + a.cs.undoPendingChange(c.ChangeID) } } } -// RegisterExternalChanges implements the Allocator interface. All changes should -// correspond to the same range, panic otherwise. -func (a *allocatorState) RegisterExternalChanges(changes []ReplicaChange) []ChangeID { +// RegisterExternalChange implements the Allocator interface. +func (a *allocatorState) RegisterExternalChange(change PendingRangeChange) (ok bool) { a.mu.Lock() defer a.mu.Unlock() - if err := a.cs.preCheckOnApplyReplicaChanges(changes); err != nil { + if err := a.cs.preCheckOnApplyReplicaChanges(change.pendingReplicaChanges); err != nil { a.mmaMetrics.ExternalFailedToRegister.Inc(1) log.KvDistribution.Infof(context.Background(), "did not register external changes: due to %v", err) - return nil + return false } else { a.mmaMetrics.ExternaRegisterSuccess.Inc(1) } - pendingChanges := a.cs.createPendingChanges(changes...) - changeIDs := make([]ChangeID, len(pendingChanges)) - for i, pendingChange := range pendingChanges { - changeIDs[i] = pendingChange.ChangeID - } - return changeIDs + a.cs.addPendingRangeChange(change) + return true } // ComputeChanges implements the Allocator interface. diff --git a/pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go b/pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go index 915b60ca3922..fc4de85f7b5f 100644 --- a/pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go +++ b/pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go @@ -125,6 +125,8 @@ type ReplicaState struct { // ChangeID is a unique ID, in the context of this data-structure and when // receiving updates about enactment having happened or having been rejected // (by the component responsible for change enactment). +// +// TODO(sumeer): make ChangeID private. type ChangeID uint64 type ReplicaChangeType int @@ -151,6 +153,9 @@ func (s ReplicaChangeType) String() string { } } +// ReplicaChange describes a change to a replica. +// +// TODO(sumeer): make ReplicaChange private. type ReplicaChange struct { // The load this change adds to a store. The values will be negative if the // load is being removed. @@ -158,7 +163,8 @@ type ReplicaChange struct { secondaryLoadDelta SecondaryLoadVector // target is the target {store,node} for the change. - target roachpb.ReplicationTarget + target roachpb.ReplicationTarget + // TODO(sumeer): remove rangeID. rangeID roachpb.RangeID // NB: 0 is not a valid ReplicaID, but this component does not care about @@ -379,11 +385,53 @@ func makeRebalanceReplicaChanges( // PendingRangeChange is a proposed set of change(s) to a range. It can consist // of multiple pending replica changes, such as adding or removing replicas, or // transferring the lease. +// +// NB: pendingReplicaChanges is not visible outside the package, so we can be +// certain that callers outside this package that hold a PendingRangeChange +// cannot mutate the internals other than clearing the state. +// +// Additionally, for a PendingRangeChange returned outside the package, we +// ensure that the pendingReplicaChanges slice itself is not shared with the +// rangeState.pendingChanges slice since the rangeState.pendingChanges slice +// can have entries removed from it (and swapped around as part of removal). +// +// Some the state inside each *pendingReplicaChange is mutable at arbitrary +// points in time by the code inside this package (with the relevant locking, +// of course). Currently, this state is revisedGCTime, enactedAtTime. Neither +// of it is read by the public methods on PendingRangeChange. +// +// TODO(sumeer): when we expand the set of mutable fields, make a deep copy. type PendingRangeChange struct { RangeID roachpb.RangeID pendingReplicaChanges []*pendingReplicaChange } +// MakePendingRangeChange creates a PendingRangeChange for the given rangeID +// and changes. Certain internal aspects of the change, like the change-ids, +// start time etc., are not yet initialized, since those use internal state of +// MMA. Those will be initialized by MMA when this change is later handed to +// MMA for tracking, in clusterState.addPendingRangeChange. For external +// callers of MakePendingRangeChange, this happens transitively when +// Allocator.RegisterExternalChange is called. +func MakePendingRangeChange(rangeID roachpb.RangeID, changes []ReplicaChange) PendingRangeChange { + for _, c := range changes { + if c.rangeID != rangeID { + panic(errors.AssertionFailedf("all changes must be to the same range %d != %d", + c.rangeID, rangeID)) + } + } + prcs := make([]*pendingReplicaChange, len(changes)) + for i, c := range changes { + prcs[i] = &pendingReplicaChange{ + ReplicaChange: c, + } + } + return PendingRangeChange{ + RangeID: rangeID, + pendingReplicaChanges: prcs, + } +} + func (prc PendingRangeChange) String() string { return redact.StringWithoutMarkers(prc) } @@ -416,16 +464,6 @@ func (prc PendingRangeChange) SafeFormat(w redact.SafePrinter, _ rune) { w.Print("]") } -// ChangeIDs returns the list of ChangeIDs associated with the pending range -// change. -func (prc PendingRangeChange) ChangeIDs() []ChangeID { - cids := make([]ChangeID, len(prc.pendingReplicaChanges)) - for i, c := range prc.pendingReplicaChanges { - cids[i] = c.ChangeID - } - return cids -} - // IsChangeReplicas returns true if the pending range change is a change // replicas operation. func (prc PendingRangeChange) IsChangeReplicas() bool { @@ -527,12 +565,6 @@ func (prc PendingRangeChange) LeaseTransferFrom() roachpb.StoreID { panic("unreachable") } -// TODO(sumeer): we have various methods that take slices of either ChangeIDs -// or pendingReplicaChanges or ReplicaChange, and have callers that already -// have or could first construct a slice of pendingReplicaChanges, and avoid -// various temporary slice construction and repeated map lookups. Clean this -// up. - // pendingReplicaChange is a proposed change to a single replica. Some // external entity (the leaseholder of the range) may choose to enact this // change. It may not be enacted if it will cause some invariant (like the @@ -1003,6 +1035,8 @@ type rangeState struct { // Life-cycle matches clusterState.pendingChanges. The consolidated // rangeState.pendingChanges across all ranges in clusterState.ranges will // be identical to clusterState.pendingChanges. + // + // TODO(sumeer): replace by PendingRangeChange. pendingChanges []*pendingReplicaChange // When set, the pendingChanges can not be rolled back anymore. They have // to be enacted, or discarded wholesale in favor of the latest RangeMsg @@ -1035,12 +1069,12 @@ type rangeState struct { // that the change we did not know about has been enacted. // // One may wonder how such unknown changes can happen, given that other - // components call mmaprototype.Allocator.RegisterExternalChanges. One example is + // components call mmaprototype.Allocator.RegisterExternalChange. One example is // when MMA does not currently know about a range. Say the lease gets // transferred to the local store, but MMA has not yet been called with a // StoreLeaseholderMsg, but replicateQueue already knows about this lease, // and decides to initiate a transfer of replicas between two remote stores - // (to equalize replica counts). When mmaprototype.Allocator.RegisterExternalChanges + // (to equalize replica counts). When mmaprototype.Allocator.RegisterExternalChange // is called, there is no record of this range in MMA (since it wasn't the // leaseholder), and the change is ignored. When the next // StoreLeaseholderMsg is provided to MMA it now knows about the range, and @@ -1324,7 +1358,6 @@ func (cs *clusterState) processStoreLeaseholderMsgInternal( // leaseholder. Note that this is the only code path where a subset of // pending changes for a replica can be considered enacted. var remainingChanges, enactedChanges []*pendingReplicaChange - var remainingReplicaChanges []ReplicaChange for _, change := range rs.pendingChanges { ss := cs.stores[change.target.StoreID] adjustedReplica, ok := ss.adjusted.replicas[rangeMsg.RangeID] @@ -1336,7 +1369,6 @@ func (cs *clusterState) processStoreLeaseholderMsgInternal( enactedChanges = append(enactedChanges, change) } else { remainingChanges = append(remainingChanges, change) - remainingReplicaChanges = append(remainingReplicaChanges, change.ReplicaChange) } } gcRemainingChanges := false @@ -1380,7 +1412,7 @@ func (cs *clusterState) processStoreLeaseholderMsgInternal( // example_skewed_cpu_even_ranges_mma_and_queues. I suspect the latter // is because MMA is acting faster to undo the effects of the changes // made by the replicate and lease queues. - cs.pendingChangeEnacted(change.ChangeID, now, true) + cs.pendingChangeEnacted(change.ChangeID, now) } // INVARIANT: remainingChanges and rs.pendingChanges contain the same set // of changes, though possibly in different order. @@ -1404,7 +1436,7 @@ func (cs *clusterState) processStoreLeaseholderMsgInternal( // remainingChanges, but they are not the same slice. rc := rs.pendingChanges rs.pendingChanges = nil - err := cs.preCheckOnApplyReplicaChanges(remainingReplicaChanges) + err := cs.preCheckOnApplyReplicaChanges(remainingChanges) valid = err == nil if err != nil { reason = redact.Sprint(err) @@ -1479,12 +1511,15 @@ func (cs *clusterState) processStoreLeaseholderMsgInternal( // Since this range is going away, mark all the pending changes as // enacted. This will allow the load adjustments to also be garbage // collected in the future. + // + // Gather the changeIDs, since calls to pendingChangeEnacted modify the + // rs.pendingChanges slice. changeIDs := make([]ChangeID, len(rs.pendingChanges)) for i, change := range rs.pendingChanges { changeIDs[i] = change.ChangeID } for _, changeID := range changeIDs { - cs.pendingChangeEnacted(changeID, now, true) + cs.pendingChangeEnacted(changeID, now) } // Remove from the storeStates. for _, replica := range rs.replicas { @@ -1687,29 +1722,25 @@ func (cs *clusterState) gcPendingChanges(now time.Time) { if !startTime.Add(pendingChangeGCDuration).Before(now) { continue } - var replicaChanges []ReplicaChange + if err := cs.preCheckOnUndoReplicaChanges(rs.pendingChanges); err != nil { + panic(err) + } + // Gather the changeIDs, since calls to undoPendingChange modify the + // rs.pendingChanges slice. var changeIDs []ChangeID for _, pendingChange := range rs.pendingChanges { - replicaChanges = append(replicaChanges, pendingChange.ReplicaChange) changeIDs = append(changeIDs, pendingChange.ChangeID) } - if err := cs.preCheckOnUndoReplicaChanges(replicaChanges); err != nil { - panic(err) - } for _, changeID := range changeIDs { - cs.undoPendingChange(changeID, true) + cs.undoPendingChange(changeID) } } } -func (cs *clusterState) pendingChangeEnacted(cid ChangeID, enactedAt time.Time, requireFound bool) { +func (cs *clusterState) pendingChangeEnacted(cid ChangeID, enactedAt time.Time) { change, ok := cs.pendingChanges[cid] if !ok { - if requireFound { - panic(fmt.Sprintf("change %v not found %v", cid, printMapPendingChanges(cs.pendingChanges))) - } else { - return - } + panic(fmt.Sprintf("change %v not found %v", cid, printMapPendingChanges(cs.pendingChanges))) } change.enactedAtTime = enactedAt rs, ok := cs.ranges[change.rangeID] @@ -1721,18 +1752,13 @@ func (cs *clusterState) pendingChangeEnacted(cid ChangeID, enactedAt time.Time, delete(cs.pendingChanges, change.ChangeID) } -// undoPendingChange reverses the change with ID cid, if it exists. +// undoPendingChange reverses the change with ID cid. // -// REQUIRES: if requireFound, the change exists; the change is not marked as -// no-rollback. -func (cs *clusterState) undoPendingChange(cid ChangeID, requireFound bool) { +// REQUIRES: the change is not marked as no-rollback. +func (cs *clusterState) undoPendingChange(cid ChangeID) { change, ok := cs.pendingChanges[cid] if !ok { - if requireFound { - panic(errors.AssertionFailedf("change %v not found %v", cid, printMapPendingChanges(cs.pendingChanges))) - } else { - return - } + panic(errors.AssertionFailedf("change %v not found %v", cid, printMapPendingChanges(cs.pendingChanges))) } rs, ok := cs.ranges[change.rangeID] if !ok { @@ -1787,21 +1813,22 @@ func printPendingChanges(changes []*pendingReplicaChange) string { return buf.String() } -// createPendingChanges takes a set of changes applies the changes as pending. -// The application updates the adjusted load, tracked pending changes and -// changeID to reflect the pending application. +// addPendingRangeChange takes a range change containing a set of replica +// changes, and applies the changes as pending. The application updates the +// adjusted load, tracked pending changes and changeIDs to reflect the pending +// application. It updates the *pendingReplicaChanges inside the change. // -// REQUIRES: all the changes are to the same range, and that the range has no -// pending changes. -func (cs *clusterState) createPendingChanges(changes ...ReplicaChange) []*pendingReplicaChange { - if len(changes) == 0 { - return nil - } - rangeID := changes[0].rangeID - for i := 1; i < len(changes); i++ { - if changes[i].rangeID != rangeID { +// REQUIRES: all the replica changes are to the same range, and that the range +// has no pending changes. +func (cs *clusterState) addPendingRangeChange(change PendingRangeChange) { + if len(change.pendingReplicaChanges) == 0 { + return + } + rangeID := change.RangeID + for _, c := range change.pendingReplicaChanges { + if c.rangeID != rangeID { panic(errors.AssertionFailedf("all changes must be to the same range %d != %d", - changes[i].rangeID, rangeID)) + c.rangeID, rangeID)) } } rs := cs.ranges[rangeID] @@ -1812,28 +1839,26 @@ func (cs *clusterState) createPendingChanges(changes ...ReplicaChange) []*pendin // NB: rs != nil is also required, but we also check that in a method called // below. - var pendingChanges []*pendingReplicaChange + pendingChanges := change.pendingReplicaChanges now := cs.ts.Now() - for _, change := range changes { - cs.applyReplicaChange(change, true) + for _, pendingChange := range pendingChanges { + cs.applyReplicaChange(pendingChange.ReplicaChange, true) cs.changeSeqGen++ cid := cs.changeSeqGen - pendingChange := &pendingReplicaChange{ - ChangeID: cid, - ReplicaChange: change, - startTime: now, - enactedAtTime: time.Time{}, - } - storeState := cs.stores[change.target.StoreID] - rangeState := cs.ranges[change.rangeID] + pendingChange.ChangeID = cid + pendingChange.startTime = now + pendingChange.enactedAtTime = time.Time{} + storeState := cs.stores[pendingChange.target.StoreID] + rangeState := cs.ranges[rangeID] cs.pendingChanges[cid] = pendingChange storeState.adjusted.loadPendingChanges[cid] = pendingChange rangeState.pendingChanges = append(rangeState.pendingChanges, pendingChange) rangeState.pendingChangeNoRollback = false - log.KvDistribution.VInfof(context.Background(), 3, "createPendingChanges: change_id=%v, range_id=%v, change=%v", cid, change.rangeID, change) + log.KvDistribution.VInfof(context.Background(), 3, + "addPendingRangeChange: change_id=%v, range_id=%v, change=%v", + cid, rangeID, pendingChange.ReplicaChange) pendingChanges = append(pendingChanges, pendingChange) } - return pendingChanges } // preCheckOnApplyReplicaChanges does some validation of the changes being @@ -1869,7 +1894,9 @@ func (cs *clusterState) createPendingChanges(changes ...ReplicaChange) []*pendin // // TODO(sumeer): allow arbitrary number of changes, but validate that at most // one change per store. -func (cs *clusterState) preCheckOnApplyReplicaChanges(changes []ReplicaChange) error { +// +// TODO(sumeer): change to take PendingRangeChange as parameter +func (cs *clusterState) preCheckOnApplyReplicaChanges(changes []*pendingReplicaChange) error { // preApplyReplicaChange is called before applying a change to the cluster // state. if len(changes) != 1 && len(changes) != 2 && len(changes) != 4 { @@ -1929,7 +1956,9 @@ func (cs *clusterState) preCheckOnApplyReplicaChanges(changes []ReplicaChange) e // before allowing a change to be added (including re-addition after a // StoreLeaseholderMsg), we should never have invalidity during an undo, if // all the changes are being undone. -func (cs *clusterState) preCheckOnUndoReplicaChanges(changes []ReplicaChange) error { +// +// TODO(sumeer): change to take PendingRangeChange as parameter +func (cs *clusterState) preCheckOnUndoReplicaChanges(changes []*pendingReplicaChange) error { if len(changes) == 0 { panic(errors.AssertionFailedf("no changes to undo")) } diff --git a/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_test.go b/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_test.go index 377880594b89..4ef24cb893b0 100644 --- a/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_test.go +++ b/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_test.go @@ -487,7 +487,8 @@ func TestClusterState(t *testing.T) { changes = append(changes, rebalanceChanges[:]...) } } - cs.createPendingChanges(changes...) + rangeChange := MakePendingRangeChange(rangeID, changes) + cs.addPendingRangeChange(rangeChange) return printPendingChangesTest(testingGetPendingChanges(t, cs)) case "gc-pending-changes": @@ -503,10 +504,10 @@ func TestClusterState(t *testing.T) { for _, id := range changeIDsInt { if expectPanic { require.Panics(t, func() { - cs.undoPendingChange(id, true) + cs.undoPendingChange(id) }) } else { - cs.undoPendingChange(id, true) + cs.undoPendingChange(id) } } return printPendingChangesTest(testingGetPendingChanges(t, cs)) diff --git a/pkg/kv/kvserver/mma_store_rebalancer.go b/pkg/kv/kvserver/mma_store_rebalancer.go index 2d60bbfc0e02..4f7474e73c38 100644 --- a/pkg/kv/kvserver/mma_store_rebalancer.go +++ b/pkg/kv/kvserver/mma_store_rebalancer.go @@ -154,7 +154,7 @@ func (m *mmaStoreRebalancer) applyChange( ) error { repl := m.store.GetReplicaIfExists(change.RangeID) if repl == nil { - m.as.MarkChangesAsFailed(change.ChangeIDs()) + m.as.MarkChangeAsFailed(change) return errors.Errorf("replica not found for range %d", change.RangeID) } changeID := m.as.MMAPreApply(ctx, repl.RangeUsageInfo(), change) diff --git a/pkg/kv/kvserver/mmaintegration/allocator_op.go b/pkg/kv/kvserver/mmaintegration/allocator_op.go index 3e2189358ff0..baf0e23028e3 100644 --- a/pkg/kv/kvserver/mmaintegration/allocator_op.go +++ b/pkg/kv/kvserver/mmaintegration/allocator_op.go @@ -15,11 +15,14 @@ import ( // trackedAllocatorChange represents a change registered with AllocatorSync // (e.g. lease transfer or change replicas). type trackedAllocatorChange struct { - // changeIDs are the change IDs that are registered with mma. Nil if mma is - // disabled or the change cannot be registered with mma. If changeIDs is - // nil, PostApply does not need to inform mma. Otherwise, PostApply should - // inform mma by passing changeIDs to AdjustPendingChangesDisposition. - changeIDs []mmaprototype.ChangeID + // isMMARegistered is true if the change has been successfully registered + // with mma. When true, mmaChange is the registered change, and PostApply + // should inform mma by passing mmaChange to + // AdjustPendingChangesDisposition. It is false if mma is disabled or the + // change could not be registered with mma, in which case PostApply must not + // inform mma. + isMMARegistered bool + mmaChange mmaprototype.PendingRangeChange // Usage is range load usage. usage allocator.RangeUsageInfo // Exactly one of the following two fields will be set. diff --git a/pkg/kv/kvserver/mmaintegration/allocator_sync.go b/pkg/kv/kvserver/mmaintegration/allocator_sync.go index e458beff63f6..9de1743ecc86 100644 --- a/pkg/kv/kvserver/mmaintegration/allocator_sync.go +++ b/pkg/kv/kvserver/mmaintegration/allocator_sync.go @@ -42,12 +42,12 @@ type storePool interface { // mmaState is an interface that defines the methods that the allocator sync // needs to call on the mma. Using an interface to simplify testing. type mmaState interface { - // RegisterExternalChanges is called by the allocator sync to register + // RegisterExternalChange is called by the allocator sync to register // external changes with the mma. - RegisterExternalChanges(changes []mmaprototype.ReplicaChange) []mmaprototype.ChangeID - // AdjustPendingChangesDisposition is called by the allocator sync to adjust - // the disposition of pending changes. - AdjustPendingChangesDisposition(changeIDs []mmaprototype.ChangeID, success bool) + RegisterExternalChange(change mmaprototype.PendingRangeChange) (ok bool) + // AdjustPendingChangeDisposition is called by the allocator sync to adjust + // the disposition of pending changes to a range. + AdjustPendingChangeDisposition(change mmaprototype.PendingRangeChange, success bool) // BuildMMARebalanceAdvisor is called by the allocator sync to build a // MMARebalanceAdvisor for the given existing store and candidates. The // advisor should be later passed to IsInConflictWithMMA to determine if a @@ -58,6 +58,8 @@ type mmaState interface { IsInConflictWithMMA(ctx context.Context, cand roachpb.StoreID, advisor *mmaprototype.MMARebalanceAdvisor, cpuOnly bool) bool } +var _ mmaState = mmaprototype.Allocator(nil) + // TODO(wenyihu6): make sure allocator sync can tolerate cluster setting // changes not happening consistently or atomically across components. (For // example, replicate queue may call into allocator sync when mma is enabled but @@ -148,13 +150,16 @@ func (as *AllocatorSync) NonMMAPreTransferLease( usage allocator.RangeUsageInfo, transferFrom, transferTo roachpb.ReplicationTarget, ) SyncChangeID { - var changeIDs []mmaprototype.ChangeID + var isMMARegistered bool + var mmaChange mmaprototype.PendingRangeChange if kvserverbase.LoadBasedRebalancingModeIsMMA(&as.st.SV) { - changeIDs = as.mmaAllocator.RegisterExternalChanges(convertLeaseTransferToMMA(desc, usage, transferFrom, transferTo)) + mmaChange = convertLeaseTransferToMMA(desc, usage, transferFrom, transferTo) + isMMARegistered = as.mmaAllocator.RegisterExternalChange(mmaChange) } trackedChange := trackedAllocatorChange{ - changeIDs: changeIDs, - usage: usage, + isMMARegistered: isMMARegistered, + mmaChange: mmaChange, + usage: usage, leaseTransferOp: &leaseTransferOp{ transferFrom: transferFrom.StoreID, transferTo: transferTo.StoreID, @@ -175,13 +180,16 @@ func (as *AllocatorSync) NonMMAPreChangeReplicas( changes kvpb.ReplicationChanges, leaseholderStoreID roachpb.StoreID, ) SyncChangeID { - var changeIDs []mmaprototype.ChangeID + var isMMARegistered bool + var mmaChange mmaprototype.PendingRangeChange if kvserverbase.LoadBasedRebalancingModeIsMMA(&as.st.SV) { - changeIDs = as.mmaAllocator.RegisterExternalChanges(convertReplicaChangeToMMA(desc, usage, changes, leaseholderStoreID)) + mmaChange = convertReplicaChangeToMMA(desc, usage, changes, leaseholderStoreID) + isMMARegistered = as.mmaAllocator.RegisterExternalChange(mmaChange) } trackedChange := trackedAllocatorChange{ - changeIDs: changeIDs, - usage: usage, + isMMARegistered: isMMARegistered, + mmaChange: mmaChange, + usage: usage, changeReplicasOp: &changeReplicasOp{ chgs: changes, }, @@ -202,8 +210,9 @@ func (as *AllocatorSync) MMAPreApply( pendingChange mmaprototype.PendingRangeChange, ) SyncChangeID { trackedChange := trackedAllocatorChange{ - changeIDs: pendingChange.ChangeIDs(), - usage: usage, + isMMARegistered: true, + mmaChange: pendingChange, + usage: usage, } switch { case pendingChange.IsTransferLease(): @@ -225,11 +234,11 @@ func (as *AllocatorSync) MMAPreApply( return as.addTrackedChange(trackedChange) } -// MarkChangesAsFailed marks the given change IDs as failed without going -// through allocator sync. This is used when mma changes fail before even -// registering with mma via MMAPreApply. -func (as *AllocatorSync) MarkChangesAsFailed(changeIDs []mmaprototype.ChangeID) { - as.mmaAllocator.AdjustPendingChangesDisposition(changeIDs, false /* success */) +// MarkChangeAsFailed marks the given changes to the range as failed without +// going through allocator sync. This is used when mma changes fail before +// even registering with mma via MMAPreApply. +func (as *AllocatorSync) MarkChangeAsFailed(change mmaprototype.PendingRangeChange) { + as.mmaAllocator.AdjustPendingChangeDisposition(change, false /* success */) } // PostApply is called by the lease/replicate queue to apply a change to the @@ -237,9 +246,9 @@ func (as *AllocatorSync) MarkChangesAsFailed(changeIDs []mmaprototype.ChangeID) // NonMMAPreTransferLease or NonMMAPreChangeReplicas. func (as *AllocatorSync) PostApply(syncChangeID SyncChangeID, success bool) { trackedChange := as.getTrackedChange(syncChangeID) - if changeIDs := trackedChange.changeIDs; changeIDs != nil { + if trackedChange.isMMARegistered { // Call into without checking cluster setting. - as.mmaAllocator.AdjustPendingChangesDisposition(changeIDs, success) + as.mmaAllocator.AdjustPendingChangeDisposition(trackedChange.mmaChange, success) } if !success { return diff --git a/pkg/kv/kvserver/mmaintegration/mma_conversion.go b/pkg/kv/kvserver/mmaintegration/mma_conversion.go index 1b16025915c3..bbb562a49c45 100644 --- a/pkg/kv/kvserver/mmaintegration/mma_conversion.go +++ b/pkg/kv/kvserver/mmaintegration/mma_conversion.go @@ -21,7 +21,7 @@ func convertLeaseTransferToMMA( desc *roachpb.RangeDescriptor, usage allocator.RangeUsageInfo, transferFrom, transferTo roachpb.ReplicationTarget, -) []mmaprototype.ReplicaChange { +) mmaprototype.PendingRangeChange { // TODO(wenyihu6): we are passing existing replicas to // mmaprototype.MakeLeaseTransferChanges just to get the add and remove // replica state. See if things could be cleaned up. @@ -47,17 +47,17 @@ func convertLeaseTransferToMMA( transferTo, transferFrom, ) - return replicaChanges[:] + return mmaprototype.MakePendingRangeChange(desc.RangeID, replicaChanges[:]) } -// convertReplicaChangeToMMA converts a replica change to mma replica changes. -// It will be passed to mma.RegisterExternalChanges. +// convertReplicaChangeToMMA converts a replica change to a mma range change. +// It will be passed to mma.RegisterExternalChange. func convertReplicaChangeToMMA( desc *roachpb.RangeDescriptor, usage allocator.RangeUsageInfo, changes kvpb.ReplicationChanges, leaseholderStoreID roachpb.StoreID, -) []mmaprototype.ReplicaChange { +) mmaprototype.PendingRangeChange { rLoad := mmaRangeLoad(usage) replicaChanges := make([]mmaprototype.ReplicaChange, 0, len(changes)) replicaSet := desc.Replicas() @@ -122,5 +122,5 @@ func convertReplicaChangeToMMA( panic("unimplemented change type") } } - return replicaChanges + return mmaprototype.MakePendingRangeChange(desc.RangeID, replicaChanges) }