@@ -544,16 +544,14 @@ func (cs *clusterState) rebalanceStores(
544544 NodeID : ss .NodeID ,
545545 StoreID : ss .StoreID ,
546546 }
547- leaseChanges := MakeLeaseTransferChanges (
547+ replicaChanges := MakeLeaseTransferChanges (
548548 rangeID , rstate .replicas , rstate .load , addTarget , removeTarget )
549- if err := cs .preCheckOnApplyReplicaChanges (leaseChanges [:]); err != nil {
550- panic (errors .Wrapf (err , "pre-check failed for lease transfer %v" , leaseChanges ))
549+ leaseChange := MakePendingRangeChange (rangeID , replicaChanges [:])
550+ if err := cs .preCheckOnApplyReplicaChanges (leaseChange .pendingReplicaChanges ); err != nil {
551+ panic (errors .Wrapf (err , "pre-check failed for lease transfer %v" , leaseChange ))
551552 }
552- pendingChanges := cs .createPendingChanges (leaseChanges [:]... )
553- changes = append (changes , PendingRangeChange {
554- RangeID : rangeID ,
555- pendingReplicaChanges : pendingChanges [:],
556- })
553+ cs .addPendingRangeChange (leaseChange )
554+ changes = append (changes , leaseChange )
557555 leaseTransferCount ++
558556 if changes [len (changes )- 1 ].IsChangeReplicas () || ! changes [len (changes )- 1 ].IsTransferLease () {
559557 panic (fmt .Sprintf ("lease transfer is invalid: %v" , changes [len (changes )- 1 ]))
@@ -763,15 +761,13 @@ func (cs *clusterState) rebalanceStores(
763761 }
764762 replicaChanges := makeRebalanceReplicaChanges (
765763 rangeID , rstate .replicas , rstate .load , addTarget , removeTarget )
766- if err = cs .preCheckOnApplyReplicaChanges (replicaChanges [:]); err != nil {
764+ rangeChange := MakePendingRangeChange (rangeID , replicaChanges [:])
765+ if err = cs .preCheckOnApplyReplicaChanges (rangeChange .pendingReplicaChanges ); err != nil {
767766 panic (errors .Wrapf (err , "pre-check failed for replica changes: %v for %v" ,
768767 replicaChanges , rangeID ))
769768 }
770- pendingChanges := cs .createPendingChanges (replicaChanges [:]... )
771- changes = append (changes , PendingRangeChange {
772- RangeID : rangeID ,
773- pendingReplicaChanges : pendingChanges [:],
774- })
769+ cs .addPendingRangeChange (rangeChange )
770+ changes = append (changes , rangeChange )
775771 rangeMoveCount ++
776772 log .KvDistribution .VInfof (ctx , 2 ,
777773 "result(success): rebalancing r%v from s%v to s%v [change: %v] with resulting loads source: %v target: %v" ,
@@ -807,93 +803,73 @@ func (a *allocatorState) SetStore(store StoreAttributesAndLocality) {
807803 a .cs .setStore (store )
808804}
809805
810- // ProcessStoreLeaseholderMsg implements the Allocator interface.
806+ // ProcessStoreLoadMsg implements the Allocator interface.
811807func (a * allocatorState ) ProcessStoreLoadMsg (ctx context.Context , msg * StoreLoadMsg ) {
812808 a .mu .Lock ()
813809 defer a .mu .Unlock ()
814810 a .cs .processStoreLoadMsg (ctx , msg )
815811}
816812
817- // AdjustPendingChangesDisposition implements the Allocator interface.
818- func (a * allocatorState ) AdjustPendingChangesDisposition ( changeIDs [] ChangeID , success bool ) {
813+ // AdjustPendingChangeDisposition implements the Allocator interface.
814+ func (a * allocatorState ) AdjustPendingChangeDisposition ( change PendingRangeChange , success bool ) {
819815 a .mu .Lock ()
820816 defer a .mu .Unlock ()
821- // NB: It is possible that some of the changeIDs have already been enacted
822- // via StoreLeaseholderMsg, and even been garbage collected. So no
823- // assumption can be made about whether these changeIDs will be found in the
824- // allocator's state.
825- if ! success {
826- // Gather the changes that are found and need to be undone.
827- replicaChanges := make ([]ReplicaChange , 0 , len (changeIDs ))
828- for _ , changeID := range changeIDs {
829- change , ok := a .cs .pendingChanges [changeID ]
830- if ! ok {
831- continue
832- }
833- rs , ok := a .cs .ranges [change .rangeID ]
834- if ! ok {
835- panic (errors .AssertionFailedf ("range %v not found in cluster state" , change .rangeID ))
836- }
837- if rs .pendingChangeNoRollback {
838- // All the changes are to the same range, so return.
839- return
840- }
841- replicaChanges = append (replicaChanges , change .ReplicaChange )
842- }
843- if len (replicaChanges ) == 0 {
844- return
817+ rs , ok := a .cs .ranges [change .RangeID ]
818+ if ! ok {
819+ // Range no longer exists. This can happen if the StoreLeaseholderMsg
820+ // which included the effect of the change that transferred the lease away
821+ // was already processed, causing the range to no longer be tracked by the
822+ // allocator.
823+ return
824+ }
825+ if ! success && rs .pendingChangeNoRollback {
826+ // Not allowed to undo.
827+ return
828+ }
829+ // NB: It is possible that some of the changes have already been enacted via
830+ // StoreLeaseholderMsg, and even been garbage collected. So no assumption
831+ // can be made about whether these changes will be found in the allocator's
832+ // state. We gather the found changes.
833+ var changes []* pendingReplicaChange
834+ for _ , c := range change .pendingReplicaChanges {
835+ ch , ok := a .cs .pendingChanges [c .ChangeID ]
836+ if ! ok {
837+ continue
845838 }
846- // Check that we can undo these changes. If not, log and return.
847- if err := a .cs .preCheckOnUndoReplicaChanges (replicaChanges ); err != nil {
848- // TODO(sumeer): we should be able to panic here, once the interface
849- // contract says that all the proposed changes must be included in
850- // changeIDs. Without that contract, there may be a pair of changes
851- // (remove replica and lease from s1), (add replica and lease to s2),
852- // and the caller can provide the first changeID only, and the undo
853- // would cause two leaseholders. The pre-check would catch that here.
854- log .KvDistribution .Infof (context .Background (), "did not undo change %v: due to %v" , changeIDs , err )
855- return
839+ changes = append (changes , ch )
840+ }
841+ if len (changes ) == 0 {
842+ return
843+ }
844+ if ! success {
845+ // Check that we can undo these changes.
846+ if err := a .cs .preCheckOnUndoReplicaChanges (changes ); err != nil {
847+ panic (err )
856848 }
857849 }
858-
859- for _ , changeID := range changeIDs {
860- // We set !requireFound, since some of these pending changes may no longer
861- // exist in the allocator's state. For example, a StoreLeaseholderMsg that
862- // happened after the pending change was created and before this call to
863- // AdjustPendingChangesDisposition may have already removed the pending
864- // change.
850+ for _ , c := range changes {
865851 if success {
866- // TODO(sumeer): this code is implicitly assuming that all the changes
867- // on the rangeState are being enacted. And that is true of the current
868- // callers. We should explicitly state the assumption in the interface.
869- // Because if only some are being enacted, we ought to set
870- // pendingChangeNoRollback, and we don't bother to.
871- a .cs .pendingChangeEnacted (changeID , a .cs .ts .Now (), false )
852+ a .cs .pendingChangeEnacted (c .ChangeID , a .cs .ts .Now ())
872853 } else {
873- a .cs .undoPendingChange (changeID , false )
854+ a .cs .undoPendingChange (c . ChangeID )
874855 }
875856 }
876857}
877858
878- // RegisterExternalChanges implements the Allocator interface. All changes should
879- // correspond to the same range, panic otherwise.
880- func (a * allocatorState ) RegisterExternalChanges (changes []ReplicaChange ) []ChangeID {
859+ // RegisterExternalChange implements the Allocator interface.
860+ func (a * allocatorState ) RegisterExternalChange (change PendingRangeChange ) (ok bool ) {
881861 a .mu .Lock ()
882862 defer a .mu .Unlock ()
883- if err := a .cs .preCheckOnApplyReplicaChanges (changes ); err != nil {
863+ if err := a .cs .preCheckOnApplyReplicaChanges (change . pendingReplicaChanges ); err != nil {
884864 a .mmaMetrics .ExternalFailedToRegister .Inc (1 )
885865 log .KvDistribution .Infof (context .Background (),
886866 "did not register external changes: due to %v" , err )
887- return nil
867+ return false
888868 } else {
889869 a .mmaMetrics .ExternaRegisterSuccess .Inc (1 )
890870 }
891- pendingChanges := a .cs .createPendingChanges (changes ... )
892- changeIDs := make ([]ChangeID , len (pendingChanges ))
893- for i , pendingChange := range pendingChanges {
894- changeIDs [i ] = pendingChange .ChangeID
895- }
896- return changeIDs
871+ a .cs .addPendingRangeChange (change )
872+ return true
897873}
898874
899875// ComputeChanges implements the Allocator interface.
0 commit comments