@@ -544,16 +544,14 @@ func (a *allocatorState) rebalanceStores(
544544 NodeID : ss .NodeID ,
545545 StoreID : ss .StoreID ,
546546 }
547- leaseChanges := MakeLeaseTransferChanges (
547+ replicaChanges := MakeLeaseTransferChanges (
548548 rangeID , rstate .replicas , rstate .load , addTarget , removeTarget )
549- if err := a .cs .preCheckOnApplyReplicaChanges (leaseChanges [:]); err != nil {
550- panic (errors .Wrapf (err , "pre-check failed for lease transfer %v" , leaseChanges ))
549+ leaseChange := MakePendingRangeChange (rangeID , replicaChanges [:])
550+ if err := a .cs .preCheckOnApplyReplicaChanges (leaseChange .pendingReplicaChanges ); err != nil {
551+ panic (errors .Wrapf (err , "pre-check failed for lease transfer %v" , leaseChange ))
551552 }
552- pendingChanges := a .cs .createPendingChanges (leaseChanges [:]... )
553- changes = append (changes , PendingRangeChange {
554- RangeID : rangeID ,
555- pendingReplicaChanges : pendingChanges [:],
556- })
553+ a .cs .addPendingRangeChange (leaseChange )
554+ changes = append (changes , leaseChange )
557555 leaseTransferCount ++
558556 if changes [len (changes )- 1 ].IsChangeReplicas () || ! changes [len (changes )- 1 ].IsTransferLease () {
559557 panic (fmt .Sprintf ("lease transfer is invalid: %v" , changes [len (changes )- 1 ]))
@@ -763,15 +761,13 @@ func (a *allocatorState) rebalanceStores(
763761 }
764762 replicaChanges := makeRebalanceReplicaChanges (
765763 rangeID , rstate .replicas , rstate .load , addTarget , removeTarget )
766- if err = a .cs .preCheckOnApplyReplicaChanges (replicaChanges [:]); err != nil {
764+ rangeChange := MakePendingRangeChange (rangeID , replicaChanges [:])
765+ if err = a .cs .preCheckOnApplyReplicaChanges (rangeChange .pendingReplicaChanges ); err != nil {
767766 panic (errors .Wrapf (err , "pre-check failed for replica changes: %v for %v" ,
768767 replicaChanges , rangeID ))
769768 }
770- pendingChanges := a .cs .createPendingChanges (replicaChanges [:]... )
771- changes = append (changes , PendingRangeChange {
772- RangeID : rangeID ,
773- pendingReplicaChanges : pendingChanges [:],
774- })
769+ a .cs .addPendingRangeChange (rangeChange )
770+ changes = append (changes , rangeChange )
775771 rangeMoveCount ++
776772 log .KvDistribution .VInfof (ctx , 2 ,
777773 "result(success): rebalancing r%v from s%v to s%v [change: %v] with resulting loads source: %v target: %v" ,
@@ -807,93 +803,71 @@ func (a *allocatorState) SetStore(store StoreAttributesAndLocality) {
807803 a .cs .setStore (store )
808804}
809805
810- // ProcessStoreLeaseholderMsg implements the Allocator interface.
806+ // ProcessStoreLoadMsg implements the Allocator interface.
811807func (a * allocatorState ) ProcessStoreLoadMsg (ctx context.Context , msg * StoreLoadMsg ) {
812808 a .mu .Lock ()
813809 defer a .mu .Unlock ()
814810 a .cs .processStoreLoadMsg (ctx , msg )
815811}
816812
817- // AdjustPendingChangesDisposition implements the Allocator interface.
818- func (a * allocatorState ) AdjustPendingChangesDisposition ( changeIDs [] ChangeID , success bool ) {
813+ // AdjustPendingChangeDisposition implements the Allocator interface.
814+ func (a * allocatorState ) AdjustPendingChangeDisposition ( change PendingRangeChange , success bool ) {
819815 a .mu .Lock ()
820816 defer a .mu .Unlock ()
821- // NB: It is possible that some of the changeIDs have already been enacted
822- // via StoreLeaseholderMsg, and even been garbage collected. So no
823- // assumption can be made about whether these changeIDs will be found in the
824- // allocator's state.
825- if ! success {
826- // Gather the changes that are found and need to be undone.
827- replicaChanges := make ([]ReplicaChange , 0 , len (changeIDs ))
828- for _ , changeID := range changeIDs {
829- change , ok := a .cs .pendingChanges [changeID ]
830- if ! ok {
831- continue
832- }
833- rs , ok := a .cs .ranges [change .rangeID ]
834- if ! ok {
835- panic (errors .AssertionFailedf ("range %v not found in cluster state" , change .rangeID ))
836- }
837- if rs .pendingChangeNoRollback {
838- // All the changes are to the same range, so return.
839- return
840- }
841- replicaChanges = append (replicaChanges , change .ReplicaChange )
842- }
843- if len (replicaChanges ) == 0 {
844- return
817+ rs , ok := a .cs .ranges [change .RangeID ]
818+ if ! ok {
819+ // Range no longer exists.
820+ return
821+ }
822+ if ! success && rs .pendingChangeNoRollback {
823+ // Not allowed to undo.
824+ return
825+ }
826+ // NB: It is possible that some of the changes have already been enacted via
827+ // StoreLeaseholderMsg, and even been garbage collected. So no assumption
828+ // can be made about whether these changes will be found in the allocator's
829+ // state. We gather the found changes.
830+ var changes []* pendingReplicaChange
831+ for _ , c := range change .pendingReplicaChanges {
832+ ch , ok := a .cs .pendingChanges [c .ChangeID ]
833+ if ! ok {
834+ continue
845835 }
846- // Check that we can undo these changes. If not, log and return.
847- if err := a . cs . preCheckOnUndoReplicaChanges ( replicaChanges ); err != nil {
848- // TODO(sumeer): we should be able to panic here, once the interface
849- // contract says that all the proposed changes must be included in
850- // changeIDs. Without that contract, there may be a pair of changes
851- // (remove replica and lease from s1), (add replica and lease to s2),
852- // and the caller can provide the first changeID only, and the undo
853- // would cause two leaseholders. The pre-check would catch that here.
854- log . KvDistribution . Infof ( context . Background (), "did not undo change %v: due to %v" , changeIDs , err )
836+ changes = append ( changes , ch )
837+ }
838+ if len ( changes ) == 0 {
839+ return
840+ }
841+ if ! success {
842+ // Check that we can undo these changes.
843+ if err := a . cs . preCheckOnUndoReplicaChanges ( changes ); err != nil {
844+ panic ( err )
855845 return
856846 }
857847 }
858-
859- for _ , changeID := range changeIDs {
860- // We set !requireFound, since some of these pending changes may no longer
861- // exist in the allocator's state. For example, a StoreLeaseholderMsg that
862- // happened after the pending change was created and before this call to
863- // AdjustPendingChangesDisposition may have already removed the pending
864- // change.
848+ for _ , c := range changes {
865849 if success {
866- // TODO(sumeer): this code is implicitly assuming that all the changes
867- // on the rangeState are being enacted. And that is true of the current
868- // callers. We should explicitly state the assumption in the interface.
869- // Because if only some are being enacted, we ought to set
870- // pendingChangeNoRollback, and we don't bother to.
871- a .cs .pendingChangeEnacted (changeID , a .cs .ts .Now (), false )
850+ a .cs .pendingChangeEnacted (c .ChangeID , a .cs .ts .Now ())
872851 } else {
873- a .cs .undoPendingChange (changeID , false )
852+ a .cs .undoPendingChange (c . ChangeID )
874853 }
875854 }
876855}
877856
878- // RegisterExternalChanges implements the Allocator interface. All changes should
879- // correspond to the same range, panic otherwise.
880- func (a * allocatorState ) RegisterExternalChanges (changes []ReplicaChange ) []ChangeID {
857+ // RegisterExternalChange implements the Allocator interface.
858+ func (a * allocatorState ) RegisterExternalChange (change PendingRangeChange ) (ok bool ) {
881859 a .mu .Lock ()
882860 defer a .mu .Unlock ()
883- if err := a .cs .preCheckOnApplyReplicaChanges (changes ); err != nil {
861+ if err := a .cs .preCheckOnApplyReplicaChanges (change . pendingReplicaChanges ); err != nil {
884862 a .mmaMetrics .ExternalFailedToRegister .Inc (1 )
885863 log .KvDistribution .Infof (context .Background (),
886864 "did not register external changes: due to %v" , err )
887- return nil
865+ return false
888866 } else {
889867 a .mmaMetrics .ExternaRegisterSuccess .Inc (1 )
890868 }
891- pendingChanges := a .cs .createPendingChanges (changes ... )
892- changeIDs := make ([]ChangeID , len (pendingChanges ))
893- for i , pendingChange := range pendingChanges {
894- changeIDs [i ] = pendingChange .ChangeID
895- }
896- return changeIDs
869+ a .cs .addPendingRangeChange (change )
870+ return true
897871}
898872
899873// ComputeChanges implements the Allocator interface.
0 commit comments