Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 33 additions & 21 deletions pkg/kv/kvserver/allocator/mmaprototype/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,34 +43,29 @@ type Allocator interface {
// associated node in the cluster.
ProcessStoreLoadMsg(ctx context.Context, msg *StoreLoadMsg)

// TODO(sumeer): only a subset of the fields in
// pendingReplicaChange/PendingRangeChange are relevant to the caller. Hide
// the remaining.

// Methods related to making changes.

// AdjustPendingChangesDisposition is optional feedback to inform the
// allocator of success or failure of proposed changes. For successful
// changes, this is a faster way to know about success than waiting for the
// next ProcessNodeLoadResponse from the local node. For failed changes, in
// the absence of this feedback, proposed changes that have not been enacted
// in N seconds will be garbage collected and assumed to have failed.
// AdjustPendingChangeDisposition is optional feedback to inform the
// allocator of success or failure of proposed changes to a range. For
// successful changes, this is a faster way to know about success than
// waiting for the next ProcessNodeLoadResponse from the local node. For
// failed changes, in the absence of this feedback, proposed changes that
// have not been enacted in N seconds will be garbage collected and assumed
// to have failed.
//
// Calls to AdjustPendingChangesDisposition must be correctly sequenced with
// Calls to AdjustPendingChangeDisposition must be correctly sequenced with
// full state updates from the local node provided in
// ProcessNodeLoadResponse.
//
// REQUIRES: len(changes) > 0 and all changes are to the same range.
AdjustPendingChangesDisposition(changes []ChangeID, success bool)
AdjustPendingChangeDisposition(change PendingRangeChange, success bool)

// RegisterExternalChanges informs this allocator about yet to complete
// RegisterExternalChange informs this allocator about yet to complete
// changes to the cluster which were not initiated by this allocator. The
// caller is returned a list of ChangeIDs, corresponding 1:1 to each replica
// change provided as an argument. The returned list of ChangeIDs should then
// be used to call AdjustPendingChangesDisposition when the changes are
// completed, either successfully or not. All changes should correspond to the
// same range.
RegisterExternalChanges(changes []ReplicaChange) []ChangeID
// ownership of all state inside change is handed off to the callee. If ok
// is true, the change was registered, and the caller should subsequently
// use the same change in a subsequent call to
// AdjustPendingChangeDisposition when the changes are completed, either
// successfully or not. If ok is false, the change was not registered.
RegisterExternalChange(change PendingRangeChange) (ok bool)

// ComputeChanges is called periodically and frequently, say every 10s.
//
Expand Down Expand Up @@ -152,6 +147,23 @@ type Allocator interface {
//
// TODO(sumeer): remove once the integration is properly done.
KnownStores() map[roachpb.StoreID]struct{}

// BuildMMARebalanceAdvisor is called by the allocator sync to build a
// MMARebalanceAdvisor for the given existing store and candidates. The
// advisor should be later passed to IsInConflictWithMMA to determine if a
// given candidate is in conflict with the existing store.
//
// TODO(sumeer): merge the above comment with the comment in the
// implementation.
BuildMMARebalanceAdvisor(existing roachpb.StoreID, cands []roachpb.StoreID) *MMARebalanceAdvisor

// IsInConflictWithMMA is called by the allocator sync to determine if the
// given candidate is in conflict with the existing store.
//
// TODO(sumeer): merge the above comment with the comment in the
// implementation.
IsInConflictWithMMA(
ctx context.Context, cand roachpb.StoreID, advisor *MMARebalanceAdvisor, cpuOnly bool) bool
}

// Avoid unused lint errors.
Expand Down
128 changes: 52 additions & 76 deletions pkg/kv/kvserver/allocator/mmaprototype/allocator_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,16 +544,14 @@ func (cs *clusterState) rebalanceStores(
NodeID: ss.NodeID,
StoreID: ss.StoreID,
}
leaseChanges := MakeLeaseTransferChanges(
replicaChanges := MakeLeaseTransferChanges(
rangeID, rstate.replicas, rstate.load, addTarget, removeTarget)
if err := cs.preCheckOnApplyReplicaChanges(leaseChanges[:]); err != nil {
panic(errors.Wrapf(err, "pre-check failed for lease transfer %v", leaseChanges))
leaseChange := MakePendingRangeChange(rangeID, replicaChanges[:])
if err := cs.preCheckOnApplyReplicaChanges(leaseChange.pendingReplicaChanges); err != nil {
panic(errors.Wrapf(err, "pre-check failed for lease transfer %v", leaseChange))
}
pendingChanges := cs.createPendingChanges(leaseChanges[:]...)
changes = append(changes, PendingRangeChange{
RangeID: rangeID,
pendingReplicaChanges: pendingChanges[:],
})
cs.addPendingRangeChange(leaseChange)
changes = append(changes, leaseChange)
leaseTransferCount++
if changes[len(changes)-1].IsChangeReplicas() || !changes[len(changes)-1].IsTransferLease() {
panic(fmt.Sprintf("lease transfer is invalid: %v", changes[len(changes)-1]))
Expand Down Expand Up @@ -763,15 +761,13 @@ func (cs *clusterState) rebalanceStores(
}
replicaChanges := makeRebalanceReplicaChanges(
rangeID, rstate.replicas, rstate.load, addTarget, removeTarget)
if err = cs.preCheckOnApplyReplicaChanges(replicaChanges[:]); err != nil {
rangeChange := MakePendingRangeChange(rangeID, replicaChanges[:])
if err = cs.preCheckOnApplyReplicaChanges(rangeChange.pendingReplicaChanges); err != nil {
panic(errors.Wrapf(err, "pre-check failed for replica changes: %v for %v",
replicaChanges, rangeID))
}
pendingChanges := cs.createPendingChanges(replicaChanges[:]...)
changes = append(changes, PendingRangeChange{
RangeID: rangeID,
pendingReplicaChanges: pendingChanges[:],
})
cs.addPendingRangeChange(rangeChange)
changes = append(changes, rangeChange)
rangeMoveCount++
log.KvDistribution.VInfof(ctx, 2,
"result(success): rebalancing r%v from s%v to s%v [change: %v] with resulting loads source: %v target: %v",
Expand Down Expand Up @@ -807,93 +803,73 @@ func (a *allocatorState) SetStore(store StoreAttributesAndLocality) {
a.cs.setStore(store)
}

// ProcessStoreLeaseholderMsg implements the Allocator interface.
// ProcessStoreLoadMsg implements the Allocator interface.
func (a *allocatorState) ProcessStoreLoadMsg(ctx context.Context, msg *StoreLoadMsg) {
a.mu.Lock()
defer a.mu.Unlock()
a.cs.processStoreLoadMsg(ctx, msg)
}

// AdjustPendingChangesDisposition implements the Allocator interface.
func (a *allocatorState) AdjustPendingChangesDisposition(changeIDs []ChangeID, success bool) {
// AdjustPendingChangeDisposition implements the Allocator interface.
func (a *allocatorState) AdjustPendingChangeDisposition(change PendingRangeChange, success bool) {
a.mu.Lock()
defer a.mu.Unlock()
// NB: It is possible that some of the changeIDs have already been enacted
// via StoreLeaseholderMsg, and even been garbage collected. So no
// assumption can be made about whether these changeIDs will be found in the
// allocator's state.
if !success {
// Gather the changes that are found and need to be undone.
replicaChanges := make([]ReplicaChange, 0, len(changeIDs))
for _, changeID := range changeIDs {
change, ok := a.cs.pendingChanges[changeID]
if !ok {
continue
}
rs, ok := a.cs.ranges[change.rangeID]
if !ok {
panic(errors.AssertionFailedf("range %v not found in cluster state", change.rangeID))
}
if rs.pendingChangeNoRollback {
// All the changes are to the same range, so return.
return
}
replicaChanges = append(replicaChanges, change.ReplicaChange)
}
if len(replicaChanges) == 0 {
return
rs, ok := a.cs.ranges[change.RangeID]
if !ok {
// Range no longer exists. This can happen if the StoreLeaseholderMsg
// which included the effect of the change that transferred the lease away
// was already processed, causing the range to no longer be tracked by the
// allocator.
return
}
if !success && rs.pendingChangeNoRollback {
// Not allowed to undo.
return
}
// NB: It is possible that some of the changes have already been enacted via
// StoreLeaseholderMsg, and even been garbage collected. So no assumption
// can be made about whether these changes will be found in the allocator's
// state. We gather the found changes.
var changes []*pendingReplicaChange
for _, c := range change.pendingReplicaChanges {
ch, ok := a.cs.pendingChanges[c.ChangeID]
if !ok {
continue
}
// Check that we can undo these changes. If not, log and return.
if err := a.cs.preCheckOnUndoReplicaChanges(replicaChanges); err != nil {
// TODO(sumeer): we should be able to panic here, once the interface
// contract says that all the proposed changes must be included in
// changeIDs. Without that contract, there may be a pair of changes
// (remove replica and lease from s1), (add replica and lease to s2),
// and the caller can provide the first changeID only, and the undo
// would cause two leaseholders. The pre-check would catch that here.
log.KvDistribution.Infof(context.Background(), "did not undo change %v: due to %v", changeIDs, err)
return
changes = append(changes, ch)
}
if len(changes) == 0 {
return
}
if !success {
// Check that we can undo these changes.
if err := a.cs.preCheckOnUndoReplicaChanges(changes); err != nil {
panic(err)
}
}

for _, changeID := range changeIDs {
// We set !requireFound, since some of these pending changes may no longer
// exist in the allocator's state. For example, a StoreLeaseholderMsg that
// happened after the pending change was created and before this call to
// AdjustPendingChangesDisposition may have already removed the pending
// change.
for _, c := range changes {
if success {
// TODO(sumeer): this code is implicitly assuming that all the changes
// on the rangeState are being enacted. And that is true of the current
// callers. We should explicitly state the assumption in the interface.
// Because if only some are being enacted, we ought to set
// pendingChangeNoRollback, and we don't bother to.
a.cs.pendingChangeEnacted(changeID, a.cs.ts.Now(), false)
a.cs.pendingChangeEnacted(c.ChangeID, a.cs.ts.Now())
} else {
a.cs.undoPendingChange(changeID, false)
a.cs.undoPendingChange(c.ChangeID)
}
}
}

// RegisterExternalChanges implements the Allocator interface. All changes should
// correspond to the same range, panic otherwise.
func (a *allocatorState) RegisterExternalChanges(changes []ReplicaChange) []ChangeID {
// RegisterExternalChange implements the Allocator interface.
func (a *allocatorState) RegisterExternalChange(change PendingRangeChange) (ok bool) {
a.mu.Lock()
defer a.mu.Unlock()
if err := a.cs.preCheckOnApplyReplicaChanges(changes); err != nil {
if err := a.cs.preCheckOnApplyReplicaChanges(change.pendingReplicaChanges); err != nil {
a.mmaMetrics.ExternalFailedToRegister.Inc(1)
log.KvDistribution.Infof(context.Background(),
"did not register external changes: due to %v", err)
return nil
return false
} else {
a.mmaMetrics.ExternaRegisterSuccess.Inc(1)
}
pendingChanges := a.cs.createPendingChanges(changes...)
changeIDs := make([]ChangeID, len(pendingChanges))
for i, pendingChange := range pendingChanges {
changeIDs[i] = pendingChange.ChangeID
}
return changeIDs
a.cs.addPendingRangeChange(change)
return true
}

// ComputeChanges implements the Allocator interface.
Expand Down
Loading