@@ -101,6 +101,8 @@ StorageType == [
101101 bucketRefs : [ BucketIds ->
102102 [ ro : Nat , rw : Nat , ro_lock : BOOLEAN , rw_lock : BOOLEAN ] ] ,
103103 gcAck : [ BucketIds -> SUBSET Storages ] ,
104+ sendWaitTarget : [ BucketIds -> [ Storages -> Nat ] ] ,
105+ sendingBuckets : SUBSET BucketIds ,
104106 \* For limiting tests, it cannot be done outside of the module, since
105107 \* a test doesn't know, whether the bucket send or ref actually happened.
106108 errinj : [
@@ -154,6 +156,8 @@ StorageInit ==
154156 bucketRefs |-> [ b \in BucketIds |->
155157 [ ro |-> 0 , rw |-> 0 , ro_lock |-> FALSE , rw_lock |-> FALSE ] ] ,
156158 gcAck |-> [ b \in BucketIds |-> { } ] ,
159+ sendWaitTarget |-> [ b \in BucketIds |-> [ s \in Storages |-> 0 ] ] ,
160+ sendingBuckets |-> { } ,
157161 errinj |-> [
158162 bucketSendCount |-> 0 ,
159163 bucketRWRefCount |-> 0 ,
@@ -182,6 +186,8 @@ StorageState(i) == [
182186 buckets |-> storages [ i ] . buckets ,
183187 bucketRefs |-> storages [ i ] . bucketRefs ,
184188 gcAck |-> storages [ i ] . gcAck ,
189+ sendWaitTarget |-> storages [ i ] . sendWaitTarget ,
190+ sendingBuckets |-> storages [ i ] . sendingBuckets ,
185191 errinj |-> storages [ i ] . errinj
186192]
187193
@@ -192,7 +198,9 @@ StorageStateApply(i, state) ==
192198 VarSet ( i , "buckets" , state . buckets ,
193199 VarSet ( i , "bucketRefs" , state . bucketRefs ,
194200 VarSet ( i , "gcAck" , state . gcAck ,
195- VarSet ( i , "errinj" , state . errinj , storages ) ) ) ) ) ) )
201+ VarSet ( i , "sendWaitTarget" , state . sendWaitTarget ,
202+ VarSet ( i , "sendingBuckets" , state . sendingBuckets ,
203+ VarSet ( i , "errinj" , state . errinj , storages ) ) ) ) ) ) ) ) )
196204 /\ network ' =
197205 [ s \in Storages |->
198206 [ t \in Storages |->
@@ -291,6 +299,37 @@ BucketUnRef(state, bucket, mode) ==
291299(* Bucket sending *)
292300(************************************************************************** *)
293301
302+ AllReplicasCaughtUp ( state , b ) ==
303+ LET rs == ReplicasetOf ( state . id )
304+ replicas == OtherReplicasInReplicaset ( state . id )
305+ target == state . sendWaitTarget [ b ]
306+ m == state . id
307+ IN
308+ \A r \in replicas : storages [ r ] . vclock [ m ] >= target [ m ]
309+
310+ BucketSendWaitAndSend ( state , b , j ) ==
311+ IF IsMaster ( state )
312+ /\ state . buckets [ b ] . status = "SENDING"
313+ /\ ReplicasetOf ( j ) = state . buckets [ b ] . destination
314+ /\ b \in state . transferingBuckets
315+ /\ b \in state . sendingBuckets
316+ /\ j # state . id
317+ /\ AllReplicasCaughtUp ( state , b )
318+ THEN
319+ LET msg == [ type |-> "BUCKET_RECV" ,
320+ content |-> [ bucket |-> b , final |-> FALSE ] ]
321+ IN [ state EXCEPT
322+ ! . networkSend [ j ] = Append ( @ , msg ) ,
323+ ! . sendingBuckets = @ \ { b } ]
324+ ELSE
325+ state
326+
327+ BucketDropFromSending ( state , b ) ==
328+ IF b \in state . sendingBuckets THEN
329+ [ state EXCEPT ! . sendingBuckets = @ \ { b } ]
330+ ELSE
331+ state
332+
294333BucketSendStart ( state , b , j ) ==
295334 IF IsMaster ( state ) /\ state . buckets [ b ] . status = "ACTIVE" /\
296335 state . bucketRefs [ b ] . rw = 0 /\ ~ state . bucketRefs [ b ] . rw_lock /\
@@ -300,10 +339,11 @@ BucketSendStart(state, b, j) ==
300339 ! . transferingBuckets = @ \union { b } ,
301340 ! . errinj . bucketSendCount = @ + 1 ] IN
302341 LET state2 == BucketStatusChange (
303- state1 , state . id , b , "SENDING" , ReplicasetOf ( j ) ) IN
304- LET msg == [ type |-> "BUCKET_RECV" ,
305- content |-> [ bucket |-> b , final |-> FALSE ] ] IN
306- [ state2 EXCEPT ! . networkSend [ j ] = Append ( @ , msg ) ]
342+ state1 , state . id , b , "SENDING" , ReplicasetOf ( j ) )
343+ state3 == [ state2 EXCEPT
344+ ! . sendWaitTarget [ b ] = state2 . vclock ,
345+ ! . sendingBuckets = @ \union { b } ]
346+ IN BucketSendWaitAndSend ( state3 , b , j )
307347 ELSE state
308348
309349BucketRecvStart ( state , j ) ==
@@ -635,11 +675,13 @@ Next ==
635675 \/ StorageStateApply ( i , BucketRef ( state , b , mode ) )
636676 \/ StorageStateApply ( i , BucketUnRef ( state , b , mode ) )
637677 \/ \E j \in Storages , b \in BucketIds :
638- StorageStateApply ( i , BucketSendStart ( state , b , j ) )
678+ \/ StorageStateApply ( i , BucketSendStart ( state , b , j ) )
679+ \/ StorageStateApply ( i , BucketSendWaitAndSend ( state , b , j ) )
639680 \/ \E b \in BucketIds :
640681 \/ StorageStateApply ( i , RecoverySendStatRequest ( state , b ) )
641682 \/ StorageStateApply ( i , GcDropGarbage ( state , b ) )
642683 \/ StorageStateApply ( i , GcSendTestRequest ( state , b ) )
643684 \/ StorageStateApply ( i , BucketDropFromTransfering ( state , b ) )
685+ \/ StorageStateApply ( i , BucketDropFromSending ( state , b ) )
644686
645687================================================================================
0 commit comments