From 8551696acad50a10263d83abca9caf24b1ea3398 Mon Sep 17 00:00:00 2001 From: Nikita Zheleztsov Date: Thu, 30 Oct 2025 23:11:28 +0300 Subject: [PATCH 1/6] tla/storage: initial implementation Run the tests with: ```sh JAVA_OPTS="-DTLA-Library=../../src" tlc DoubledBucketsSmallTest.tla -config DoubledBucketsSmallTest.cfg -workers 6 -continue -checkpoint 10 ``` DoubledBucketsSmallTest reliably reproduces the doubled bucket problem, which is related to master change. DoubledBucketsTest requires constraints to become useful. NO_DOC=tla NO_TEST=tla --- .gitignore | 4 + proofs/tla/src/storage.tla | 645 ++++++++++++++++++ proofs/tla/src/utils.tla | 10 + .../test/storage/DoubledBucketsSmallTest.cfg | 25 + .../test/storage/DoubledBucketsSmallTest.tla | 61 ++ .../tla/test/storage/DoubledBucketsTest.cfg | 26 + .../tla/test/storage/DoubledBucketsTest.tla | 74 ++ proofs/tla/test/storage/MasterDoubledTest.cfg | 20 + proofs/tla/test/storage/MasterDoubledTest.tla | 93 +++ .../tla/test/storage/RecoveryGarbageTest.cfg | 24 + .../tla/test/storage/RecoveryGarbageTest.tla | 107 +++ proofs/tla/test/storage/RecoveryTest.cfg | 22 + proofs/tla/test/storage/RecoveryTest.tla | 101 +++ proofs/tla/test/storage/ReplicationTest.cfg | 21 + proofs/tla/test/storage/ReplicationTest.tla | 84 +++ proofs/tla/test/storage/StartTest.cfg | 20 + proofs/tla/test/storage/StartTest.tla | 17 + .../tla/test/storage/StrayTCPDoubledTest.cfg | 24 + .../tla/test/storage/StrayTCPDoubledTest.tla | 151 ++++ 19 files changed, 1529 insertions(+) create mode 100644 proofs/tla/src/storage.tla create mode 100644 proofs/tla/src/utils.tla create mode 100644 proofs/tla/test/storage/DoubledBucketsSmallTest.cfg create mode 100644 proofs/tla/test/storage/DoubledBucketsSmallTest.tla create mode 100644 proofs/tla/test/storage/DoubledBucketsTest.cfg create mode 100644 proofs/tla/test/storage/DoubledBucketsTest.tla create mode 100644 proofs/tla/test/storage/MasterDoubledTest.cfg create mode 100644 proofs/tla/test/storage/MasterDoubledTest.tla create mode 100644 proofs/tla/test/storage/RecoveryGarbageTest.cfg create mode 100644 proofs/tla/test/storage/RecoveryGarbageTest.tla create mode 100644 proofs/tla/test/storage/RecoveryTest.cfg create mode 100644 proofs/tla/test/storage/RecoveryTest.tla create mode 100644 proofs/tla/test/storage/ReplicationTest.cfg create mode 100644 proofs/tla/test/storage/ReplicationTest.tla create mode 100644 proofs/tla/test/storage/StartTest.cfg create mode 100644 proofs/tla/test/storage/StartTest.tla create mode 100644 proofs/tla/test/storage/StrayTCPDoubledTest.cfg create mode 100644 proofs/tla/test/storage/StrayTCPDoubledTest.tla diff --git a/.gitignore b/.gitignore index 4555999a..c36aecac 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,7 @@ build .rocks build.luarocks .DS_Store + +# TLA+ files +states/ +*_TTrace_* diff --git a/proofs/tla/src/storage.tla b/proofs/tla/src/storage.tla new file mode 100644 index 00000000..58c1355e --- /dev/null +++ b/proofs/tla/src/storage.tla @@ -0,0 +1,645 @@ +-------------------------------- MODULE storage -------------------------------- + +(* + * This module specifies VShard's storage module. It focuses on user requests, + * which refs and unrefs buckets, and rebalancing services. The only + * reconfiguration, which can happen - master change, there's no limit, how + * many masters may be in one replicaset, since failover and configuration + * update never cannot guarantee one leader in the cluster at one moment (even + * raft). + *) + +EXTENDS Sequences, Integers, FiniteSets, TLC + +-------------------------------------------------------------------------------- +\* Declaration. +-------------------------------------------------------------------------------- + +CONSTANTS + Storages, \* Set of all storage IDs. + ReplicaSets, \* Set of all replicaset IDs. + BucketIds, \* Set of all bucket IDs. + StorageAssignments,\* [rs |-> SUBSET Storages] - servers to replicaset. + BucketAssignments, \* [rs |-> SUBSET BucketIds] - buckets to replicaset. + MasterAssignments \* [rs |-> SUBSET Storages] - masters to replicaset. + +(******************************************************************************) +(* Constants. *) +(******************************************************************************) + +NULL == "NULL" + +\* Pin logic is too trivial, it's not needed in TLA. +BucketState == {"ACTIVE", "SENDING", "SENT", "RECEIVING", "GARBAGE", NULL} +WritableStates == {"ACTIVE"} +ReadableStates == WritableStates \union {"SENDING"} +TransferStates == {"SENDING", "RECEIVING"} + +(******************************************************************************) +(* Protocol description. *) +(******************************************************************************) + +\* Model of the network: network[sender][receiver]. +VARIABLE network + +MessageType == { + \* bucket_send(). + "BUCKET_RECV", + "BUCKET_RECV_RESPONSE", + \* recovery_step_by_step(). + "RECOVERY_BUCKET_STAT", + "RECOVERY_BUCKET_STAT_RESPONSE", + \* gc_bucket_process_sent_one_batch_xc(). + "BUCKET_TEST_GC", + "BUCKET_TEST_GC_RESPONSE", + \* Replication of the _bucket space. + "REPLICATION_BUCKET" +} + +BucketRecvContent == + [bucket: BucketIds, final: BOOLEAN] +BucketRecvResponseContent == + [bucket: BucketIds, status: BOOLEAN] +RecoveryBucketStatContent == + [bucket : BucketIds] +RecoveryBucketStatResponseContent == + [bucket: BucketIds, status: BucketState, transferring: BOOLEAN] +BucketTestGcContent == + [bucket : BucketIds] +BucketTestGcResponseContent == + [bucket: BucketIds, can_gc: BOOLEAN] +ReplicationBucketContent == + [bucket: BucketIds, status: BucketState, + destination: ReplicaSets \union {NULL}] + +MessageContent == + BucketRecvContent \union + BucketRecvResponseContent \union + RecoveryBucketStatContent \union + RecoveryBucketStatResponseContent \union + BucketTestGcContent \union + BucketTestGcResponseContent \union + ReplicationBucketContent + +Message == + [type : MessageType, content : MessageContent] + +(******************************************************************************) +(* Storage description. *) +(******************************************************************************) + +\* Storage model. +VARIABLES storages, storageToReplicaset + +StorageType == [ + vclock : [Storages -> Nat], + status : {"master", "replica"}, + transferingBuckets : SUBSET BucketIds, + buckets : + [BucketIds -> + [status : BucketState, destination: ReplicaSets \union {NULL}]], + bucketRefs : [BucketIds -> + [ro : Nat, rw : Nat, ro_lock : BOOLEAN, rw_lock : BOOLEAN]], + gcAck : [BucketIds -> SUBSET Storages], + \* For limiting tests, it cannot be done outside of the module, since + \* a test doesn't know, whether the bucket send or ref actually happened. + errinj : [ + bucketSendCount : Nat, + bucketRWRefCount : Nat, + bucketRORefCount : Nat, + bucketRWUnRefCount : Nat, + bucketROUnRefCount : Nat, + networkReorderCount : Nat, + networkDropCount : Nat + ] +] + +-------------------------------------------------------------------------------- +\* Invariants. +-------------------------------------------------------------------------------- + +NetworkTypeInv == network \in [Storages -> [Storages -> Seq(Message)]] +StorageTypeInv == storages \in [Storages -> StorageType] +StorageToReplicasetTypeInv == storageToReplicaset \in [Storages -> ReplicaSets] + +-------------------------------------------------------------------------------- +\* Helpers. +-------------------------------------------------------------------------------- + +\* Sets root.variable = value for server i. +VarSet(i, variable, value, root) == + [root EXCEPT ![i] = [root[i] EXCEPT ![variable] = value]] + +IsMaster(state) == state.status = "master" +ReplicasetOf(id) == storageToReplicaset[id] +OtherReplicasInReplicaset(id) == {s \in Storages : + storageToReplicaset[s] = storageToReplicaset[id]} \ {id} + +-------------------------------------------------------------------------------- +\* Implementation. +-------------------------------------------------------------------------------- + +StorageInit == + storages = [i \in Storages |-> [ + vclock |-> [j \in Storages |-> 0], + status |-> IF \E rs \in ReplicaSets : i \in MasterAssignments[rs] + THEN "master" ELSE "replica", + transferingBuckets |-> {}, + buckets |-> [b \in BucketIds |-> + LET rs_for_s == + CHOOSE rs \in ReplicaSets : i \in StorageAssignments[rs] + IN IF b \in BucketAssignments[rs_for_s] + THEN [status |-> "ACTIVE", destination |-> NULL] + ELSE [status |-> NULL, destination |-> NULL]], + bucketRefs |-> [b \in BucketIds |-> + [ro |-> 0, rw |-> 0, ro_lock |-> FALSE, rw_lock |-> FALSE]], + gcAck |-> [b \in BucketIds |-> {}], + errinj |-> [ + bucketSendCount |-> 0, + bucketRWRefCount |-> 0, + bucketRORefCount |-> 0, + bucketRWUnRefCount |-> 0, + bucketROUnRefCount |-> 0, + networkReorderCount |-> 0, + networkDropCount |-> 0 + ] + ]] + +Init == + /\ StorageInit + /\ storageToReplicaset = [s \in Storages |-> + CHOOSE rs \in ReplicaSets : s \in StorageAssignments[rs]] + /\ network = [i \in Storages |-> [j \in Storages |-> <<>>]] + +StorageState(i) == [ + networkSend |-> network[i], + networkReceive |-> [n \in Storages |-> network[n][i]], + + id |-> i, + vclock |-> storages[i].vclock, + status |-> storages[i].status, + transferingBuckets |-> storages[i].transferingBuckets, + buckets |-> storages[i].buckets, + bucketRefs |-> storages[i].bucketRefs, + gcAck |-> storages[i].gcAck, + errinj |-> storages[i].errinj +] + +StorageStateApply(i, state) == + /\ storages' = VarSet(i, "vclock", state.vclock, + VarSet(i, "status", state.status, + VarSet(i, "transferingBuckets", state.transferingBuckets, + VarSet(i, "buckets", state.buckets, + VarSet(i, "bucketRefs", state.bucketRefs, + VarSet(i, "gcAck", state.gcAck, + VarSet(i, "errinj", state.errinj, storages))))))) + /\ network' = + [s \in Storages |-> + [t \in Storages |-> + IF s = i THEN + \* Messages sent from i + state.networkSend[t] + ELSE IF t = i THEN + \* Messages received by i + state.networkReceive[s] + ELSE + \* Unchanged + network[s][t] + ] + ] + /\ UNCHANGED <> + +(***************************************************************************) +(* Replication *) +(***************************************************************************) + +BucketStatusChange(state, from, bucket, status, destination) == + LET ref_before == state.bucketRefs[bucket] + ref_after == [ref_before EXCEPT + !.ro_lock = ~(status \in ReadableStates), + !.rw_lock = ~(status \in WritableStates)] + state1 == [state EXCEPT + !.buckets[bucket] = [status |-> status, destination |-> destination], + !.bucketRefs[bucket] = ref_after, + !.vclock[from] = @ + 1] + a == Assert(state.status = "master", "Bucket change on non-master") IN + IF from = state.id /\ OtherReplicasInReplicaset(state.id) # {} THEN + \* If this node is the source of the change, + \* replicate to all other nodes in replicaset + LET replication_msg == [type |-> "REPLICATION_BUCKET", + content |-> [bucket |-> bucket, status |-> status, + destination |-> destination]] IN + [state1 EXCEPT !.networkSend = [ + j \in Storages |-> IF j \in OtherReplicasInReplicaset(state.id) + THEN Append(state1.networkSend[j], replication_msg) + ELSE state1.networkSend[j]]] + ELSE + state1 + +ProcessReplicationBucket(state, j) == + LET msg == Head(state.networkReceive[j]) IN + IF msg.type = "REPLICATION_BUCKET" THEN + LET stateNew == BucketStatusChange(state, j, msg.content.bucket, + msg.content.status, msg.content.destination) IN + [stateNew EXCEPT !.networkReceive[j] = Tail(@)] + ELSE state + +(***************************************************************************) +(* Failover master change *) +(***************************************************************************) + +BecomeMaster(state) == + IF ~IsMaster(state) THEN [state EXCEPT !.status = "master"] ELSE state + +BecomeReplica(state) == + IF IsMaster(state) THEN [state EXCEPT !.status = "replica"] ELSE state + +(***************************************************************************) +(* Storage call *) +(***************************************************************************) + +BucketRef(state, bucket, mode) == + LET ref == state.bucketRefs[bucket] + bucketState == state.buckets[bucket].status IN + IF mode = "read" THEN + \* Read request allowed if bucket is readable and not locked. + IF ~(bucketState \in ReadableStates) \/ ref.ro_lock THEN state + ELSE [state EXCEPT + !.bucketRefs[bucket].ro = @ + 1, + !.errinj.bucketRORefCount = @ + 1] + ELSE + \* Write request allowed only if bucket writable and storage is master. + IF ~(bucketState \in WritableStates) \/ ref.rw_lock \/ ~IsMaster(state) + THEN state ELSE [state EXCEPT + !.bucketRefs[bucket].rw = @ + 1, + !.errinj.bucketRWRefCount = @ + 1] + +BucketUnRef(state, bucket, mode) == + LET ref == state.bucketRefs[bucket] IN + IF mode = "read" THEN + IF ref.ro = 0 THEN state + ELSE [state EXCEPT + !.bucketRefs[bucket].ro = @ - 1, + !.errinj.bucketROUnRefCount = @ + 1] + ELSE + IF ref.rw = 0 THEN state + ELSE [state EXCEPT + !.bucketRefs[bucket].rw = @ - 1, + !.errinj.bucketRWUnRefCount = @ + 1] + +(***************************************************************************) +(* Bucket sending *) +(***************************************************************************) + +BucketSendStart(state, b, j) == + IF IsMaster(state) /\ state.buckets[b].status = "ACTIVE" /\ + state.bucketRefs[b].rw = 0 /\ ~state.bucketRefs[b].rw_lock /\ + b \notin state.transferingBuckets /\ j # state.id + THEN LET state1 == [state EXCEPT + !.bucketRefs[b].rw_lock = TRUE, + !.transferingBuckets = @ \union {b}, + !.errinj.bucketSendCount = @ + 1] IN + LET state2 == BucketStatusChange( + state1, state.id, b, "SENDING", ReplicasetOf(j)) IN + LET msg == [type |-> "BUCKET_RECV", + content |-> [bucket |-> b, final |-> FALSE]] IN + [state2 EXCEPT !.networkSend[j] = Append(@, msg)] + ELSE state + +BucketRecvStart(state, j) == + LET msg == Head(state.networkReceive[j]) IN + IF msg.type = "BUCKET_RECV" /\ ~msg.content.final THEN + LET b == msg.content.bucket IN + IF IsMaster(state) /\ state.buckets[b].status = NULL /\ + b \notin state.transferingBuckets + THEN + \* Valid handoff start - become RECEIVING and ack success. + LET state1 == [state EXCEPT + !.networkReceive[j] = Tail(@)] IN + LET state2 == BucketStatusChange( + state1, state.id, b, "RECEIVING", ReplicasetOf(j)) IN + LET response == [type |-> "BUCKET_RECV_RESPONSE", + content |-> [bucket |-> b, status |-> TRUE]] IN + [state2 EXCEPT !.networkSend[j] = Append(@, response)] + ELSE + \* Error: reply with failure response instead of silent drop. + LET response == [type |-> "BUCKET_RECV_RESPONSE", + content |-> [bucket |-> b, status |-> FALSE]] IN + [state EXCEPT + !.networkReceive[j] = Tail(@), + !.networkSend[j] = Append(@, response)] + ELSE state \* Leave non-BUCKET_RECV messages in queue + +BucketSendFinish(state, j) == + LET msg == Head(state.networkReceive[j]) IN + IF msg.type = "BUCKET_RECV_RESPONSE" THEN + LET b == msg.content.bucket + ok == msg.content.status + IN + IF IsMaster(state) /\ b \in state.transferingBuckets THEN + LET state1 == [state EXCEPT !.networkReceive[j] = Tail(@)] IN + IF ok THEN + \* Receiver accepted the bucket - mark as SENT + \* and send final message. + LET state2 == BucketStatusChange( + state1, state.id, b, "SENT", ReplicasetOf(j)) IN + LET final_msg == + [ type |-> "BUCKET_RECV", + content |-> [bucket |-> b, final |-> TRUE] ] IN + [ state2 EXCEPT + !.transferingBuckets = @ \ {b}, + !.networkSend[j] = Append(@, final_msg) + ] + ELSE + \* Receiver rejected - stop transfer and clear state. + [ state1 EXCEPT + !.transferingBuckets = @ \ {b} ] + ELSE + \* Not master or bucket not being transferred -> drop message. + [ state EXCEPT !.networkReceive[j] = Tail(@) ] + ELSE + \* Leave other message types untouched. + state + +BucketRecvFinish(state, j) == + LET msg == Head(state.networkReceive[j]) IN + IF msg.type = "BUCKET_RECV" /\ msg.content.final THEN + LET b == msg.content.bucket IN + IF IsMaster(state) /\ state.buckets[b].status = "RECEIVING" /\ + b \notin state.transferingBuckets + THEN LET state1 == [state EXCEPT + !.networkReceive[j] = Tail(@)] IN + LET state2 == + BucketStatusChange(state1, state.id, b, "ACTIVE", NULL) IN + [state2 EXCEPT !.bucketRefs[b].rw_lock = FALSE] + ELSE [state EXCEPT !.networkReceive[j] = Tail(@)] \* Drop if not master + ELSE state \* Leave non-BUCKET_RECV messages in queue + +\* Timeout happens, bucket_send() fails. +BucketDropFromTransfering(state, b) == + IF b \in state.transferingBuckets THEN + [state EXCEPT !.transferingBuckets = @ \ {b}] + ELSE + state + +(***************************************************************************) +(* Recovery *) +(***************************************************************************) + +RecoverySendStatRequest(state, b) == + LET dest_rs == state.buckets[b].destination IN + IF IsMaster(state) + /\ state.buckets[b].status \in {"SENDING", "RECEIVING"} + /\ ~(b \in state.transferingBuckets) + /\ dest_rs # NULL + THEN + \* Choose any storage in the destination replicaset. + LET candidates == {s \in Storages : + storageToReplicaset[s] = dest_rs} + dest == CHOOSE s \in candidates : TRUE + msg == [type |-> "RECOVERY_BUCKET_STAT", + content |-> [bucket |-> b]] + IN [state EXCEPT !.networkSend[dest] = Append(@, msg)] + ELSE + state + +ProcessRecoveryStatRequest(state, j) == + LET msg == Head(state.networkReceive[j]) IN + IF msg.type # "RECOVERY_BUCKET_STAT" THEN state + ELSE + IF ~IsMaster(state) THEN + \* Drop message if this node is not a master. + [state EXCEPT !.networkReceive[j] = Tail(@)] + ELSE + LET b == msg.content.bucket + reply == [type |-> "RECOVERY_BUCKET_STAT_RESPONSE", + content |-> [ + bucket |-> b, + status |-> state.buckets[b].status, + transferring |-> (b \in state.transferingBuckets) + ]] + IN [state EXCEPT + !.networkReceive[j] = Tail(@), + !.networkSend[j] = Append(@, reply)] + +ProcessRecoveryStatResponse(state, j) == + LET msg == Head(state.networkReceive[j]) IN + IF msg.type # "RECOVERY_BUCKET_STAT_RESPONSE" THEN state + ELSE + IF ~IsMaster(state) THEN + \* Drop message if this node is not a master. + [state EXCEPT !.networkReceive[j] = Tail(@)] + ELSE + LET b == msg.content.bucket + remoteStatus == msg.content.status + remoteTransf == msg.content.transferring + localStatus == state.buckets[b].status + IN + IF ~(localStatus \in TransferStates) THEN + [state EXCEPT !.networkReceive[j] = Tail(@)] + \* Recovery policy: sender adjusts state after getting peer's status. + ELSE IF localStatus = "SENDING" /\ remoteStatus \in {"ACTIVE"} THEN + LET state1 == [state EXCEPT !.networkReceive[j] = Tail(@)] IN + BucketStatusChange(state1, state.id, b, "SENT", state.buckets[b].destination) + ELSE IF localStatus = "RECEIVING" /\ + (remoteStatus \in WritableStates + \/ (remoteStatus = "SENDING" /\ ~remoteTransf)) THEN + LET state1 == [state EXCEPT !.networkReceive[j] = Tail(@)] IN + BucketStatusChange(state1, state.id, b, "GARBAGE", NULL) + ELSE IF (b \notin state.transferingBuckets) + /\ (remoteStatus \in {"SENT", "GARBAGE"} \/ remoteStatus = NULL) THEN + LET state1 == [state EXCEPT !.networkReceive[j] = Tail(@)] IN + BucketStatusChange(state1, state.id, b, "ACTIVE", NULL) + ELSE + [state EXCEPT !.networkReceive[j] = Tail(@)] + +(***************************************************************************) +(* Garbage Collector *) +(***************************************************************************) + +\* Master-only background process. +\* It checks SENT buckets that are not transferring and have no refs. +\* It asks all replicas in the same replicaset if they have any RO refs. +\* If all replicas report can_gc = TRUE, it marks the bucket as GARBAGE. +\* Later, GC will delete GARBAGE buckets (make them NULL). + +------------------------------------------------------------------------------ +\* 1. Send GC check request. +------------------------------------------------------------------------------ + +TryBucketGarbage(state, b) == + LET replicas == OtherReplicasInReplicaset(state.id) IN + IF state.gcAck[b] = replicas + /\ state.buckets[b].status = "SENT" + /\ state.bucketRefs[b].ro = 0 + /\ state.bucketRefs[b].rw = 0 + /\ ~(b \in state.transferingBuckets) + THEN + \* Reset acks and mark bucket as GARBAGE + LET state1 == [state EXCEPT !.gcAck[b] = {}] IN + BucketStatusChange(state1, state.id, b, "GARBAGE", NULL) + ELSE + state + +GcSendTestRequest(state, b) == + IF IsMaster(state) + /\ state.buckets[b].status = "SENT" + /\ ~(b \in state.transferingBuckets) + /\ state.bucketRefs[b].ro = 0 + /\ state.bucketRefs[b].rw = 0 + THEN + LET rs == ReplicasetOf(state.id) + dests == OtherReplicasInReplicaset(state.id) + msg == [type |-> "BUCKET_TEST_GC", + content |-> [bucket |-> b]] + IN + IF dests = {} THEN + \* No other replicas - mark immediately if eligible + TryBucketGarbage(state, b) + ELSE + [state EXCEPT + !.networkSend = + [j \in Storages |-> + IF j \in dests + THEN Append(state.networkSend[j], msg) + ELSE state.networkSend[j]]] + ELSE + state + +------------------------------------------------------------------------------ +\* 2. Process GC test request. +------------------------------------------------------------------------------ + +ProcessGcTestRequest(state, j) == + LET msg == Head(state.networkReceive[j]) IN + IF msg.type # "BUCKET_TEST_GC" THEN state + ELSE + IF ~IsMaster(state) THEN + \* Drop if not master (only masters handle messages) + [state EXCEPT !.networkReceive[j] = Tail(@)] + ELSE + LET b == msg.content.bucket + \* can_gc is true if this replica has no ro refs and bucket is SENT or GARBAGE + can_gc == (state.bucketRefs[b].ro = 0) + /\ (state.buckets[b].status \in {"SENT", "GARBAGE", NULL}) + reply == [type |-> "BUCKET_TEST_GC_RESPONSE", + content |-> [bucket |-> b, can_gc |-> can_gc]] + IN [state EXCEPT + !.networkReceive[j] = Tail(@), + !.networkSend[j] = Append(@, reply)] + +------------------------------------------------------------------------------ +\* 3. Process GC test responses. +------------------------------------------------------------------------------ + +ProcessGcTestResponse(state, j) == + LET msg == Head(state.networkReceive[j]) IN + IF msg.type # "BUCKET_TEST_GC_RESPONSE" THEN state + ELSE + IF ~IsMaster(state) THEN + [state EXCEPT !.networkReceive[j] = Tail(@)] + ELSE + LET b == msg.content.bucket + can_gc == msg.content.can_gc + acks_before == state.gcAck[b] + acks_after == + IF can_gc THEN acks_before \union {j} ELSE acks_before + state1 == [state EXCEPT + !.networkReceive[j] = Tail(@), + !.gcAck[b] = acks_after] + IN TryBucketGarbage(state1, b) + +------------------------------------------------------------------------------ +\* 4. Delete (NULL) GARBAGE buckets. +------------------------------------------------------------------------------ + +GcDropGarbage(state, b) == + IF IsMaster(state) /\ state.buckets[b].status = "GARBAGE" THEN + BucketStatusChange(state, state.id, b, NULL, NULL) + ELSE state + +(***************************************************************************) +(* Network *) +(***************************************************************************) + +\* Is a message a replication one? +IsReplication(m) == m.type = "REPLICATION_BUCKET" + +\* Forbid reordering two replication msgs. +CanSwap(m1, m2) == ~(IsReplication(m1) /\ IsReplication(m2)) + +\* Safe to drop this message? +CanDrop(m) == ~IsReplication(m) + +\* Remove element at position k (1-based). +\* If k out-of-range, return the seq unchanged. +SeqRemoveAt(S, k) == + IF k < 1 \/ k > Len(S) THEN S + ELSE + LET pre == IF k = 1 THEN << >> ELSE SubSeq(S, 1, k-1) + post == IF k = Len(S) THEN << >> ELSE SubSeq(S, k+1, Len(S)) + IN pre \o post + +SeqSwapAdjacent(s, k) == + IF k < 1 \/ k >= Len(s) THEN s + ELSE + LET a == s[k] + b == s[k+1] + IN [ s EXCEPT ![k] = b, ![k+1] = a ] + +\* Reorder: pick any channel and swap one adjacent pair, +\* as long as we are not swapping two REPLICATION_BUCKETs. +ReorderOneNetworkMessage == + \E s \in Storages, t \in Storages : + LET Q == network[s][t] IN + /\ Len(Q) >= 2 + /\ \E k \in 1..(Len(Q)-1) : + CanSwap(Q[k], Q[k+1]) /\ + network' = [network EXCEPT ![s][t] = SeqSwapAdjacent(Q, k)] /\ + storages' = [storages EXCEPT ![s].errinj.networkReorderCount = @ + 1] + /\ UNCHANGED <> + +\* Drop: pick any non-replication message at any position and remove it. +DropOneNetworkMessage == + \E s \in Storages, t \in Storages : + LET Q == network[s][t] + DroppableIdx == { k \in 1..Len(Q) : CanDrop(Q[k]) } + IN /\ DroppableIdx # {} + /\ \E k \in DroppableIdx : + /\ network' = [network EXCEPT ![s][t] = SeqRemoveAt(Q, k)] + /\ storages' = [storages EXCEPT ![s].errinj.networkDropCount = @ + 1] + /\ UNCHANGED <> + +(***************************************************************************) +(* Main actions *) +(***************************************************************************) + +Next == + \/ ReorderOneNetworkMessage + \/ DropOneNetworkMessage + \/ \E i \in Storages : + LET state == StorageState(i) + IN \/ StorageStateApply(i, BecomeMaster(state)) + \/ StorageStateApply(i, BecomeReplica(state)) + \/ \E j \in Storages : + /\ Len(state.networkReceive[j]) > 0 + /\ \/ StorageStateApply(i, ProcessReplicationBucket(state, j)) + \/ StorageStateApply(i, BucketSendFinish(state, j)) + \/ StorageStateApply(i, BucketRecvStart(state, j)) + \/ StorageStateApply(i, BucketRecvFinish(state, j)) + \/ StorageStateApply(i, ProcessRecoveryStatRequest(state, j)) + \/ StorageStateApply(i, ProcessRecoveryStatResponse(state, j)) + \/ StorageStateApply(i, ProcessGcTestRequest(state, j)) + \/ StorageStateApply(i, ProcessGcTestResponse(state, j)) + \/ \E b \in BucketIds, mode \in {"read", "write"} : + \/ StorageStateApply(i, BucketRef(state, b, mode)) + \/ StorageStateApply(i, BucketUnRef(state, b, mode)) + \/ \E j \in Storages, b \in BucketIds : + StorageStateApply(i, BucketSendStart(state, b, j)) + \/ \E b \in BucketIds : + \/ StorageStateApply(i, RecoverySendStatRequest(state, b)) + \/ StorageStateApply(i, GcDropGarbage(state, b)) + \/ StorageStateApply(i, GcSendTestRequest(state, b)) + \/ StorageStateApply(i, BucketDropFromTransfering(state, b)) + +================================================================================ diff --git a/proofs/tla/src/utils.tla b/proofs/tla/src/utils.tla new file mode 100644 index 00000000..d7ffbbaf --- /dev/null +++ b/proofs/tla/src/utils.tla @@ -0,0 +1,10 @@ +------------------------------- MODULE utils ----------------------------------- + +EXTENDS Naturals, FiniteSets + +RECURSIVE SetSum(_) +SetSum(set) == IF set = {} THEN 0 ELSE + LET x == CHOOSE x \in set: TRUE + IN x + SetSum(set \ {x}) + +================================================================================ diff --git a/proofs/tla/test/storage/DoubledBucketsSmallTest.cfg b/proofs/tla/test/storage/DoubledBucketsSmallTest.cfg new file mode 100644 index 00000000..64efddb1 --- /dev/null +++ b/proofs/tla/test/storage/DoubledBucketsSmallTest.cfg @@ -0,0 +1,25 @@ +INIT Init +NEXT Next + +CONSTANTS + Storages <- StoragesC + ReplicaSets <- ReplicaSetsC + BucketIds <- BucketIdsC + StorageAssignments <- StorageAssignmentsC + BucketAssignments <- BucketAssignmentsC + MasterAssignments <- MasterAssignmentsC + b1 = b1 + b2 = b2 + +SYMMETRY Symmetry + +INVARIANTS + NetworkTypeInv + StorageTypeInv + StorageToReplicasetTypeInv + NoActiveSimultaneousInv + +CONSTRAINTS + SendLimitConstraint + NetworkBoundConstraint + TwoMastersConstraint diff --git a/proofs/tla/test/storage/DoubledBucketsSmallTest.tla b/proofs/tla/test/storage/DoubledBucketsSmallTest.tla new file mode 100644 index 00000000..df870351 --- /dev/null +++ b/proofs/tla/test/storage/DoubledBucketsSmallTest.tla @@ -0,0 +1,61 @@ +-------------------------MODULE DoubledBucketsSmallTest ------------------------ +EXTENDS storage, utils + +CONSTANTS b1, b2 + +StoragesC == {"s1", "s2", "s3", "s4"} +ReplicaSetsC == {"rs1", "rs2"} +BucketIdsC == {b1, b2} +StorageAssignmentsC == [rs1 |-> {"s1", "s2"}, + rs2 |-> {"s3", "s4"}] +BucketAssignmentsC == [rs1 |-> {b1}, + rs2 |-> {b2}] +MasterAssignmentsC == [rs1 |-> {"s1"}, + rs2 |-> {"s3"}] + +(***************************************************************************) +(* CONSTRAINTS *) +(***************************************************************************) + +MAX_TOTAL_SENDS == 1 + +\* 1. Limit total bucket sends - prevent endless transfers. +SendLimitConstraint == + LET totalSends == + SetSum({ storages[i].errinj.bucketSendCount : i \in Storages }) + IN totalSends =< MAX_TOTAL_SENDS + +\* 2. Allow only a small number of concurrent masters. +TwoMastersConstraint == + Cardinality({s \in Storages : storages[s].status = "master"}) =< 2 + +\* 3. Keep network bounded - avoid message explosion. +NetworkBoundConstraint == + \A s1, s2 \in StoragesC : + Len(network[s1][s2]) =< 2 + +(***************************************************************************) +(* SYMMETRY *) +(***************************************************************************) + +Symmetry == + Permutations(BucketIdsC) + +(***************************************************************************) +(* STATE INVARIANTS *) +(***************************************************************************) + +NoActiveSimultaneousInv == + \* No bucket can be ACTIVE in storages belonging to different ReplicaSets + \A b \in BucketIds : + \A rs1, rs2 \in ReplicaSets : + rs1 # rs2 => + ~(\E s1, s2 \in Storages : + storageToReplicaset[s1] = rs1 /\ + storageToReplicaset[s2] = rs2 /\ + storages[s1].status = "master" /\ + storages[s2].status = "master" /\ + storages[s1].buckets[b].status = "ACTIVE" /\ + storages[s2].buckets[b].status = "ACTIVE") + +================================================================================ diff --git a/proofs/tla/test/storage/DoubledBucketsTest.cfg b/proofs/tla/test/storage/DoubledBucketsTest.cfg new file mode 100644 index 00000000..9d193b8e --- /dev/null +++ b/proofs/tla/test/storage/DoubledBucketsTest.cfg @@ -0,0 +1,26 @@ +INIT Init +NEXT Next + +CONSTANTS + Storages <- StoragesC + ReplicaSets <- ReplicaSetsC + BucketIds <- BucketIdsC + StorageAssignments <- StorageAssignmentsC + BucketAssignments <- BucketAssignmentsC + MasterAssignments <- MasterAssignmentsC + b1 = b1 + b2 = b2 + b3 = b3 + +SYMMETRY Symmetry + +INVARIANTS + NetworkTypeInv + StorageTypeInv + StorageToReplicasetTypeInv + NoActiveSimultaneousInv + +CONSTRAINT + SendLimitConstraint + NetworkBoundConstraint + RefConstraint diff --git a/proofs/tla/test/storage/DoubledBucketsTest.tla b/proofs/tla/test/storage/DoubledBucketsTest.tla new file mode 100644 index 00000000..0c3050f4 --- /dev/null +++ b/proofs/tla/test/storage/DoubledBucketsTest.tla @@ -0,0 +1,74 @@ +------------------------- MODULE DoubledBucketsTest ---------------------------- +EXTENDS storage, utils + +CONSTANTS b1, b2, b3 + +StoragesC == {"s1", "s2", "s3"} +ReplicaSetsC == {"rs1", "rs2", "rs3"} +BucketIdsC == {b1, b2, b3} +StorageAssignmentsC == [rs1 |-> {"s1"}, + rs2 |-> {"s2"}, + rs3 |-> {"s3"}] +BucketAssignmentsC == [rs1 |-> {b1}, + rs2 |-> {b2}, + rs3 |-> {b3}] +MasterAssignmentsC == [rs1 |-> {"s1"}, + rs2 |-> {"s2"}, + rs3 |-> {"s3"}] + +(***************************************************************************) +(* CONSTRAINTS *) +(***************************************************************************) + +MAX_TOTAL_SENDS == 2 + +\* 1. Limit total bucket sends - prevent endless transfers. +SendLimitConstraint == + LET totalSends == + SetSum({ storages[i].errinj.bucketSendCount : i \in StoragesC }) + IN totalSends =< MAX_TOTAL_SENDS + +\* 2. Keep network bounded - avoid message explosion. +NetworkBoundConstraint == + /\ \A s1, s2 \in StoragesC : + Len(network[s1][s2]) =< 3 + /\ \A s \in StoragesC : + /\ storages[s].errinj.networkReorderCount <= 1 + /\ storages[s].errinj.networkDropCount <= 1 + +RefConstraint == + \A s1 \in StoragesC : + /\ storages[s1].errinj.bucketRWRefCount <= 0 + /\ storages[s1].errinj.bucketRORefCount <= 0 + /\ storages[s1].errinj.bucketRWUnRefCount <= 0 + /\ storages[s1].errinj.bucketROUnRefCount <= 0 + +(***************************************************************************) +(* SYMMETRY *) +(***************************************************************************) + +Symmetry == + Permutations(BucketIdsC) + +(***************************************************************************) +(* STATE INVARIANTS *) +(***************************************************************************) + +NoActiveSimultaneousInv == + \* No bucket can be ACTIVE in storages belonging to different ReplicaSets + \A b \in BucketIds : + \A rs1, rs2 \in ReplicaSets : + rs1 # rs2 => + ~(\E s1, s2 \in Storages : + storageToReplicaset[s1] = rs1 /\ + storageToReplicaset[s2] = rs2 /\ + storages[s1].status = "master" /\ + storages[s2].status = "master" /\ + storages[s1].buckets[b].status = "ACTIVE" /\ + storages[s2].buckets[b].status = "ACTIVE") + +(***************************************************************************) +(* MODEL CHECKING PROPERTIES *) +(***************************************************************************) + +================================================================================ diff --git a/proofs/tla/test/storage/MasterDoubledTest.cfg b/proofs/tla/test/storage/MasterDoubledTest.cfg new file mode 100644 index 00000000..4cfa3c88 --- /dev/null +++ b/proofs/tla/test/storage/MasterDoubledTest.cfg @@ -0,0 +1,20 @@ +INIT TestInit +NEXT TestNext + +CONSTANTS + Storages <- StoragesC + ReplicaSets <- ReplicaSetsC + BucketIds <- BucketIdsC + StorageAssignments <- StorageAssignmentsC + BucketAssignments <- BucketAssignmentsC + MasterAssignments <- MasterAssignmentsC + b1 = b1 + b2 = b2 + b3 = b3 + b4 = b4 + +INVARIANTS + NetworkTypeInv + StorageTypeInv + StorageToReplicasetTypeInv + NoActiveSimultaneousInv diff --git a/proofs/tla/test/storage/MasterDoubledTest.tla b/proofs/tla/test/storage/MasterDoubledTest.tla new file mode 100644 index 00000000..f5811fa9 --- /dev/null +++ b/proofs/tla/test/storage/MasterDoubledTest.tla @@ -0,0 +1,93 @@ +-------------------------- MODULE MasterDoubledTest ---------------------------- +EXTENDS storage, TLC + +(***************************************************************************) +(* Cluster: two replica sets, two storages each. *) +(* We'll force rs1 to send a bucket to rs2, lose replication, and failover.*) +(***************************************************************************) + +CONSTANTS + b1, b2, b3, b4 + +StoragesC == {"s1", "s2", "s3", "s4"} +ReplicaSetsC == {"rs1", "rs2"} +BucketIdsC == {b1, b2, b3, b4} +StorageAssignmentsC == + [rs1 |-> {"s1", "s2"}, + rs2 |-> {"s3", "s4"}] +BucketAssignmentsC == + [rs1 |-> {b1, b2}, + rs2 |-> {b3, b4}] +MasterAssignmentsC == + [rs1 |-> {"s1"}, + rs2 |-> {"s3"}] + +(***************************************************************************) +(* Variables and initialization *) +(***************************************************************************) + +VARIABLE phase + +TestInit == + /\ Init + /\ phase = 1 + +(***************************************************************************) +(* Phase-driven Next *) +(***************************************************************************) + +TestNext == + \/ /\ phase = 1 + /\ ~( + storages["s1"].buckets[b1].status = "SENT" + /\ storages["s3"].buckets[b1].status = "ACTIVE" + ) + /\ \E i \in {"s1"}, j \in {"s3"}, b \in {b1} : + \/ StorageStateApply(i, BucketSendStart(StorageState(i), b, j)) + \/ /\ Len(StorageState(j).networkReceive[i]) > 0 + /\ \/ StorageStateApply(j, BucketRecvStart(StorageState(j), i)) + \/ StorageStateApply(j, BucketRecvFinish(StorageState(j), i)) + \/ /\ Len(StorageState(i).networkReceive[j]) > 0 + /\ StorageStateApply(i, BucketSendFinish(StorageState(i), j)) + /\ UNCHANGED <> + + \/ /\ phase = 1 + /\ storages["s1"].buckets[b1].status = "SENT" + /\ storages["s3"].buckets[b1].status = "ACTIVE" + /\ UNCHANGED <> + /\ phase' = 3 + + \/ /\ phase = 3 + /\ \E i \in {"s2"} : + /\ StorageStateApply(i, BecomeMaster(StorageState(i))) + /\ PrintT("Phase 3: failover, s2 becomes master") + /\ phase' = 4 + + \/ /\ phase = 4 + /\ UNCHANGED <> + /\ PrintT("Phase 4: check for double ACTIVE") + +(***************************************************************************) +(* Spec *) +(***************************************************************************) + +Spec == + TestInit /\ [][TestNext]_<> + +(***************************************************************************) +(* STATE INVARIANTS *) +(***************************************************************************) + +NoActiveSimultaneousInv == + \* No bucket can be ACTIVE in storages belonging to different ReplicaSets + \A b \in BucketIds : + \A rs1, rs2 \in ReplicaSets : + rs1 # rs2 => + ~(\E s1, s2 \in Storages : + storageToReplicaset[s1] = rs1 /\ + storageToReplicaset[s2] = rs2 /\ + storages[s1].status = "master" /\ + storages[s2].status = "master" /\ + storages[s1].buckets[b].status = "ACTIVE" /\ + storages[s2].buckets[b].status = "ACTIVE") +=============================================================================== diff --git a/proofs/tla/test/storage/RecoveryGarbageTest.cfg b/proofs/tla/test/storage/RecoveryGarbageTest.cfg new file mode 100644 index 00000000..7704169f --- /dev/null +++ b/proofs/tla/test/storage/RecoveryGarbageTest.cfg @@ -0,0 +1,24 @@ +CONSTANTS + Storages <- StoragesC + ReplicaSets <- ReplicaSetsC + BucketIds <- BucketIdsC + StorageAssignments <- StorageAssignmentsC + BucketAssignments <- BucketAssignmentsC + MasterAssignments <- MasterAssignmentsC + b1 = b1 + b2 = b2 + b3 = b3 + b4 = b4 + +INVARIANTS + NetworkTypeInv + StorageTypeInv + StorageToReplicasetTypeInv + NoActiveSimultaneousInv + +SPECIFICATION Spec + +CONSTRAINT SendLimitConstraint + +PROPERTY + AllBucketsEventuallyStable diff --git a/proofs/tla/test/storage/RecoveryGarbageTest.tla b/proofs/tla/test/storage/RecoveryGarbageTest.tla new file mode 100644 index 00000000..ef97d4a8 --- /dev/null +++ b/proofs/tla/test/storage/RecoveryGarbageTest.tla @@ -0,0 +1,107 @@ +-------------------------- MODULE RecoveryGarbageTest -------------------------- + +\* I didn't manage to end that test in 1 hour and after 150 mln states. +\* Need much more constraints for proper execution time. But it helped me to find +\* a ton of bugs and it works, for now I'm ok with this for now. + +EXTENDS storage, utils + +(***************************************************************************) +(* Cluster setup: 2 replica sets, 1 storage each, 2 buckets per set. *) +(***************************************************************************) + +CONSTANTS + b1, b2, b3, b4 + +StoragesC == {"s1", "s2"} +ReplicaSetsC == {"rs1", "rs2"} +BucketIdsC == {b1, b2, b3, b4} +StorageAssignmentsC == + [rs1 |-> {"s1"}, + rs2 |-> {"s2"}] +BucketAssignmentsC == + [rs1 |-> {b1, b2}, + rs2 |-> {b3, b4}] +MasterAssignmentsC == + [rs1 |-> {"s1"}, + rs2 |-> {"s2"}] + +(***************************************************************************) +(* CONSTRAINTS *) +(***************************************************************************) + +MAX_TOTAL_SENDS == 2 + +\* 1. Limit total bucket sends - prevent endless transfers. +SendLimitConstraint == + LET totalSends == + SetSum({ storages[i].errinj.bucketSendCount : i \in Storages }) + IN totalSends =< MAX_TOTAL_SENDS + +(***************************************************************************) +(* Helpers *) +(***************************************************************************) + +DropFinalRecvMessage == + \E s1, s2 \in StoragesC : + /\ Len(network[s1][s2]) > 0 + /\ Head(network[s1][s2]).type = "BUCKET_RECV" + /\ Head(network[s1][s2]).content.final = TRUE + /\ network' = [network EXCEPT ![s1][s2] = Tail(@)] + /\ UNCHANGED <> + +(***************************************************************************) +(* Transitions *) +(***************************************************************************) + +NextTest == + \E i \in StoragesC : + LET st == StorageState(i) IN + \/ \E j \in StoragesC, b \in BucketIdsC : + StorageStateApply(i, BucketSendStart(st, b, j)) + \/ \E j \in StoragesC : + /\ Len(st.networkReceive[j]) > 0 + /\ \/ StorageStateApply(i, BucketRecvStart(st, j)) + \/ StorageStateApply(i, BucketSendFinish(st, j)) + \/ StorageStateApply(i, BucketRecvFinish(st, j)) + \/ StorageStateApply(i, ProcessRecoveryStatRequest(st, j)) + \/ StorageStateApply(i, ProcessRecoveryStatResponse(st, j)) + \/ StorageStateApply(i, ProcessGcTestRequest(st, j)) + \/ StorageStateApply(i, ProcessGcTestResponse(st, j)) + \/ \E b \in BucketIdsC : + \/ StorageStateApply(i, RecoverySendStatRequest(st, b)) + \/ StorageStateApply(i, GcDropGarbage(st, b)) + \/ StorageStateApply(i, GcSendTestRequest(st, b)) + \/ DropFinalRecvMessage + \/ UNCHANGED <> + +(***************************************************************************) +(* Specification *) +(***************************************************************************) + +Spec == + Init /\ [][NextTest]_<> /\ + WF_<>(NextTest) + +(***************************************************************************) +(* Properties *) +(***************************************************************************) + +AllBucketsEventuallyStable == + [] (\A s \in StoragesC, b \in BucketIdsC : + <> (storages[s].buckets[b].status \in {"ACTIVE", NULL})) + +NoActiveSimultaneousInv == + \* No bucket can be ACTIVE in storages belonging to different ReplicaSets + \A b \in BucketIds : + \A rs1, rs2 \in ReplicaSets : + rs1 # rs2 => + ~(\E s1, s2 \in Storages : + storageToReplicaset[s1] = rs1 /\ + storageToReplicaset[s2] = rs2 /\ + storages[s1].status = "master" /\ + storages[s2].status = "master" /\ + storages[s1].buckets[b].status = "ACTIVE" /\ + storages[s2].buckets[b].status = "ACTIVE") + +=============================================================================== diff --git a/proofs/tla/test/storage/RecoveryTest.cfg b/proofs/tla/test/storage/RecoveryTest.cfg new file mode 100644 index 00000000..bb6b8a49 --- /dev/null +++ b/proofs/tla/test/storage/RecoveryTest.cfg @@ -0,0 +1,22 @@ +INIT TestInit +NEXT TestNext + +CONSTANTS + Storages <- StoragesC + ReplicaSets <- ReplicaSetsC + BucketIds <- BucketIdsC + StorageAssignments <- StorageAssignmentsC + BucketAssignments <- BucketAssignmentsC + MasterAssignments <- MasterAssignmentsC + b1 = b1 + b2 = b2 + b3 = b3 + b4 = b4 + +INVARIANTS + NetworkTypeInv + StorageTypeInv + StorageToReplicasetTypeInv + NoActiveSimultaneousInv + +CHECK_DEADLOCK FALSE diff --git a/proofs/tla/test/storage/RecoveryTest.tla b/proofs/tla/test/storage/RecoveryTest.tla new file mode 100644 index 00000000..6ec825d1 --- /dev/null +++ b/proofs/tla/test/storage/RecoveryTest.tla @@ -0,0 +1,101 @@ +----------------------------- MODULE RecoveryTest ------------------------------ +EXTENDS storage, TLC + +(***************************************************************************) +(* Cluster: two replica sets, two storages each. *) +(* We'll force rs1 to send a bucket to rs2, lose replication, and failover.*) +(***************************************************************************) + +CONSTANTS + b1, b2, b3, b4 + +StoragesC == {"s1", "s2"} +ReplicaSetsC == {"rs1", "rs2"} +BucketIdsC == {b1, b2, b3, b4} +StorageAssignmentsC == + [rs1 |-> {"s1"}, + rs2 |-> {"s2"}] +BucketAssignmentsC == + [rs1 |-> {b1, b2}, + rs2 |-> {b3, b4}] +MasterAssignmentsC == + [rs1 |-> {"s1"}, + rs2 |-> {"s2"}] + +(***************************************************************************) +(* Variables and initialization *) +(***************************************************************************) + +VARIABLE phase + +TestInit == + /\ Init + /\ phase = 1 + +(***************************************************************************) +(* Phase-driven Next *) +(***************************************************************************) + +TestNext == + \/ /\ phase = 1 + /\ \E i \in {"s1"}, j \in {"s2"}, b \in {b1} : + /\ StorageStateApply(i, BucketSendStart(StorageState(i), b, j)) + \* /\ PrintT("Phase 1: bucket_send()") + /\ phase' = 2 + + \/ /\ phase = 2 + /\ storages["s1"].buckets[b1].status = "SENDING" + /\ storages["s2"].buckets[b1].status = NULL + /\ \E i \in {"s1"}, b \in {b1} : + StorageStateApply(i, BucketDropFromTransfering(StorageState(i), b)) + \* /\ PrintT("Phase 2: timeout bucket_send()") + /\ phase' = 3 + + \/ /\ phase = 3 + /\ \E i \in {"s1"}, j \in {"s2"} : + /\ network' = [network EXCEPT ![i][j] = Tail(@)] + /\ UNCHANGED <> + \* /\ PrintT("Phase 3: timeout bucket_send()") + /\ phase' = 4 + + \/ /\ phase = 4 + /\ \E i \in {"s1"}, j \in {"s2"}, b \in {b1} : + \/ StorageStateApply(i, RecoverySendStatRequest(StorageState(i), b)) + \/ /\ Len(StorageState(j).networkReceive[i]) > 0 + /\ StorageStateApply(j, ProcessRecoveryStatRequest(StorageState(j), i)) + \/ /\ Len(StorageState(i).networkReceive[j]) > 0 + /\ StorageStateApply(i, ProcessRecoveryStatResponse(StorageState(i), j)) + \* /\ PrintT("Phase 4: recover bucket") + /\ UNCHANGED <> + + \/ /\ phase = 4 + /\ storages["s1"].buckets[b1].status = "ACTIVE" + /\ storages["s2"].buckets[b1].status = NULL + /\ phase' = 5 + /\ UNCHANGED <> + \* /\ PrintT("Phase 5: stutter") + +(***************************************************************************) +(* Spec *) +(***************************************************************************) + +Spec == + TestInit /\ [][TestNext]_<> + +(***************************************************************************) +(* STATE INVARIANTS *) +(***************************************************************************) + +NoActiveSimultaneousInv == + \* No bucket can be ACTIVE in storages belonging to different ReplicaSets + \A b \in BucketIds : + \A rs1, rs2 \in ReplicaSets : + rs1 # rs2 => + ~(\E s1, s2 \in Storages : + storageToReplicaset[s1] = rs1 /\ + storageToReplicaset[s2] = rs2 /\ + storages[s1].status = "master" /\ + storages[s2].status = "master" /\ + storages[s1].buckets[b].status = "ACTIVE" /\ + storages[s2].buckets[b].status = "ACTIVE") +=============================================================================== diff --git a/proofs/tla/test/storage/ReplicationTest.cfg b/proofs/tla/test/storage/ReplicationTest.cfg new file mode 100644 index 00000000..9236ffcf --- /dev/null +++ b/proofs/tla/test/storage/ReplicationTest.cfg @@ -0,0 +1,21 @@ +INIT Init +NEXT TestNext + +CONSTANTS + Storages <- StoragesC + ReplicaSets <- ReplicaSetsC + BucketIds <- BucketIdsC + StorageAssignments <- StorageAssignmentsC + BucketAssignments <- BucketAssignmentsC + MasterAssignments <- MasterAssignmentsC + b1 = b1 + b2 = b2 + +INVARIANTS + NetworkTypeInv + StorageTypeInv + StorageToReplicasetTypeInv + ConsistencyWhenNetworkEmptyInv + +CONSTRAINTS + SendLimitConstraint diff --git a/proofs/tla/test/storage/ReplicationTest.tla b/proofs/tla/test/storage/ReplicationTest.tla new file mode 100644 index 00000000..7ca620c9 --- /dev/null +++ b/proofs/tla/test/storage/ReplicationTest.tla @@ -0,0 +1,84 @@ +-------------------------- MODULE ReplicationTest ------------------------------ + +EXTENDS storage, utils + +CONSTANTS b1, b2 + +StoragesC == {"s1", "s2", "s3"} +ReplicaSetsC == {"rs1"} +BucketIdsC == {b1, b2} +StorageAssignmentsC == [rs1 |-> {"s1", "s2", "s3"}] +BucketAssignmentsC == [rs1 |-> {b1, b2}] +MasterAssignmentsC == [rs1 |-> {"s1", "s2", "s3"}] + +(***************************************************************************) +(* CONSTRAINTS *) +(***************************************************************************) + +MAX_TOTAL_SENDS == 4 + +\* 1. Limit total bucket sends - prevent endless transfers. +SendLimitConstraint == + LET totalSends == + SetSum({ storages[i].errinj.bucketSendCount : i \in Storages }) + IN totalSends =< MAX_TOTAL_SENDS + +(***************************************************************************) +(* Specification *) +(***************************************************************************) + +TestBucketStatusChange(state, bucketId) == + IF state.status = "master" THEN + LET newState == CHOOSE newState \in BucketState : + newState /= state.buckets[bucketId].status + IN BucketStatusChange(state, state.id, bucketId, newState, NULL) + ELSE state + +TestNext == \E i \in Storages : + LET state == StorageState(i) + IN \/ \E j \in Storages : + /\ Len(state.networkReceive[j]) > 0 + /\ StorageStateApply(i, ProcessReplicationBucket(state, j)) + \/ \E b \in BucketIds : + StorageStateApply(i, TestBucketStatusChange(state, b)) + +(***************************************************************************) +(* TEMPORAL PROPERTIES *) +(***************************************************************************) + +\* Helper: check that two storages have matching bucket states +ReplicaStatesMatch(s1, s2) == + \A b \in BucketIds : + storages[s1].buckets[b].status = storages[s2].buckets[b].status /\ + storages[s1].buckets[b].destination = storages[s2].buckets[b].destination + +\* Helper: storages in a given replicaset +StoragesInReplicaset(rs) == + {s \in Storages : storageToReplicaset[s] = rs} + +\* All replicas in a replicaset have consistent bucket states +AllReplicasConsistent == + \A rs \in ReplicaSets : + \A s1, s2 \in StoragesInReplicaset(rs) : + ReplicaStatesMatch(s1, s2) + +NetworkEmpty == + \A s \in Storages : \A t \in Storages : network[s][t] = <<>> + +\* Check if two storages have matching vclocks +VClockMatch(s1, s2) == + storages[s1].vclock = storages[s2].vclock + +\* All storages in a replicaset have the same vclock +AllReplicasVClocksConsistent == + \A rs \in ReplicaSets : + \A s1, s2 \in StoragesInReplicaset(rs) : + VClockMatch(s1, s2) + +\* AllReplicasConsistent cannot be used here: We cannot guarantee data +\* consistency, when there's master-master config, data depends on the +\* order of network messages processed. +ConsistencyWhenNetworkEmptyInv == + NetworkEmpty => (AllReplicasVClocksConsistent) + +============================================================================= diff --git a/proofs/tla/test/storage/StartTest.cfg b/proofs/tla/test/storage/StartTest.cfg new file mode 100644 index 00000000..7012430d --- /dev/null +++ b/proofs/tla/test/storage/StartTest.cfg @@ -0,0 +1,20 @@ +INIT Init +NEXT Next + +CONSTANTS + Storages <- StoragesC + ReplicaSets <- ReplicaSetsC + BucketIds <- BucketIdsC + StorageAssignments <- StorageAssignmentsC + BucketAssignments <- BucketAssignmentsC + MasterAssignments <- MasterAssignmentsC + b1 = b1 + b2 = b2 + b3 = b3 + b4 = b4 + c1 = c1 + +INVARIANTS + NetworkTypeInv + StorageTypeInv + StorageToReplicasetTypeInv diff --git a/proofs/tla/test/storage/StartTest.tla b/proofs/tla/test/storage/StartTest.tla new file mode 100644 index 00000000..537eef60 --- /dev/null +++ b/proofs/tla/test/storage/StartTest.tla @@ -0,0 +1,17 @@ +-------------------------------- MODULE StartTest ------------------------------ + +EXTENDS storage + +CONSTANTS b1, b2, b3, b4, c1 + +StoragesC == {"s1", "s2", "s3", "s4"} +ReplicaSetsC == {"rs1", "rs2"} +BucketIdsC == {b1, b2, b3, b4} +StorageAssignmentsC == [rs1 |-> {"s1", "s2"}, + rs2 |-> {"s3", "s4"}] +BucketAssignmentsC == [rs1 |-> {b1, b2}, + rs2 |-> {b3, b4}] +MasterAssignmentsC == [rs1 |-> {"s1"}, + rs2 |-> {"s3"}] + +================================================================================ diff --git a/proofs/tla/test/storage/StrayTCPDoubledTest.cfg b/proofs/tla/test/storage/StrayTCPDoubledTest.cfg new file mode 100644 index 00000000..57e568e6 --- /dev/null +++ b/proofs/tla/test/storage/StrayTCPDoubledTest.cfg @@ -0,0 +1,24 @@ +INIT TestInit +NEXT TestNext + +CONSTANTS + Storages <- StoragesC + ReplicaSets <- ReplicaSetsC + BucketIds <- BucketIdsC + StorageAssignments <- StorageAssignmentsC + BucketAssignments <- BucketAssignmentsC + MasterAssignments <- MasterAssignmentsC + b1 = b1 + b2 = b2 + b3 = b3 + +INVARIANTS + NetworkTypeInv + StorageTypeInv + StorageToReplicasetTypeInv + NoActiveSimultaneousInv + +CHECK_DEADLOCK TRUE + +CONSTRAINTS + NetworkReorderConstraint diff --git a/proofs/tla/test/storage/StrayTCPDoubledTest.tla b/proofs/tla/test/storage/StrayTCPDoubledTest.tla new file mode 100644 index 00000000..58c30a66 --- /dev/null +++ b/proofs/tla/test/storage/StrayTCPDoubledTest.tla @@ -0,0 +1,151 @@ +------------------------- MODULE StrayTCPDoubledTest --------------------------- +EXTENDS storage, TLC + +CONSTANTS + b1, b2, b3 + +StoragesC == {"s1", "s2", "s3"} +ReplicaSetsC == {"rs1", "rs2", "rs3"} +BucketIdsC == {b1, b2, b3} +StorageAssignmentsC == + [rs1 |-> {"s1"}, + rs2 |-> {"s2"}, + rs3 |-> {"s3"}] +BucketAssignmentsC == + [rs1 |-> {b1}, + rs2 |-> {b2}, + rs3 |-> {b3}] +MasterAssignmentsC == + [rs1 |-> {"s1"}, + rs2 |-> {"s2"}, + rs3 |-> {"s3"}] + +(***************************************************************************) +(* Variables and initialization *) +(***************************************************************************) + +VARIABLE phase + +TestInit == + /\ Init + /\ phase = 1 + +(***************************************************************************) +(* CONSTRAINTS *) +(***************************************************************************) + +NetworkReorderConstraint == + \A s \in StoragesC : + /\ storages[s].errinj.networkReorderCount <= 1 + /\ storages[s].errinj.networkDropCount <= 1 + +(***************************************************************************) +(* Phase-driven Next *) +(***************************************************************************) + +TestNext == + \/ /\ phase = 1 + /\ \E i \in {"s1"}, j \in {"s3"}, b \in {b1} : + StorageStateApply(i, BucketSendStart(StorageState(i), b, j)) + /\ phase' = 2 + /\ UNCHANGED <> + + \/ /\ phase = 2 + /\ storages["s1"].buckets[b1].status = "SENDING" + /\ StorageStateApply("s1", BucketDropFromTransfering(StorageState("s1"), b1)) + /\ phase' = 3 + /\ UNCHANGED <> + + \/ /\ phase = 3 + /\ storages["s1"].buckets[b1].status = "SENDING" + /\ \E i \in {"s1"}, j \in {"s3"}, b \in {b1} : + \/ StorageStateApply(i, RecoverySendStatRequest(StorageState(i), b)) + \/ /\ Len(StorageState(j).networkReceive[i]) > 0 + /\ StorageStateApply(j, ProcessRecoveryStatRequest(StorageState(j), i)) + \/ /\ Len(StorageState(i).networkReceive[j]) > 0 + /\ StorageStateApply(i, ProcessRecoveryStatResponse(StorageState(i), j)) + \/ ReorderOneNetworkMessage + /\ UNCHANGED <> + + \/ /\ phase = 3 + /\ storages["s1"].buckets[b1].status = "ACTIVE" + /\ Head(network["s1"]["s3"]).type = "BUCKET_RECV" + /\ phase' = 4 + /\ UNCHANGED <> + + \/ /\ phase = 4 + /\ \E i \in {"s1"}, j \in {"s2"}, b \in {b1} : + \/ StorageStateApply(i, BucketSendStart(StorageState(i), b, j)) + \/ /\ Len(StorageState(j).networkReceive[i]) > 0 + /\ \/ StorageStateApply(j, BucketRecvStart(StorageState(j), i)) + \/ StorageStateApply(j, BucketRecvFinish(StorageState(j), i)) + \/ /\ Len(StorageState(i).networkReceive[j]) > 0 + /\ StorageStateApply(i, BucketSendFinish(StorageState(i), j)) + /\ UNCHANGED <> + + \/ /\ phase = 4 + /\ storages["s1"].buckets[b1].status = "SENT" + /\ storages["s2"].buckets[b1].status = "ACTIVE" + /\ storages["s3"].buckets[b1].status = NULL + /\ phase' = 5 + /\ UNCHANGED <> + + \/ /\ phase = 5 + /\ StorageStateApply("s1", GcSendTestRequest(StorageState("s1"), b1)) + /\ phase' = 6 + + \/ /\ phase = 6 + /\ storages["s1"].buckets[b1].status = "GARBAGE" + /\ storages["s2"].buckets[b1].status = "ACTIVE" + /\ storages["s3"].buckets[b1].status = NULL + /\ StorageStateApply("s3", BucketRecvStart(StorageState("s3"), "s1")) + /\ phase' = 7 + + \/ /\ phase = 7 + /\ storages["s1"].buckets[b1].status = "GARBAGE" + /\ storages["s2"].buckets[b1].status = "ACTIVE" + /\ storages["s3"].buckets[b1].status = "RECEIVING" + /\ phase' = 8 + /\ UNCHANGED <> + + \/ /\ phase = 8 + /\ \E i \in {"s3"}, j \in {"s1"}, b \in {b1} : + \/ DropOneNetworkMessage + \/ StorageStateApply(i, RecoverySendStatRequest(StorageState(i), b)) + \/ /\ Len(StorageState(j).networkReceive[i]) > 0 + /\ StorageStateApply(j, ProcessRecoveryStatRequest(StorageState(j), i)) + \/ /\ Len(StorageState(i).networkReceive[j]) > 0 + /\ StorageStateApply(i, ProcessRecoveryStatResponse(StorageState(i), j)) + /\ UNCHANGED <> + + \/ /\ phase = 8 + /\ storages["s1"].buckets[b1].status = "GARBAGE" + /\ storages["s2"].buckets[b1].status = "ACTIVE" + /\ storages["s3"].buckets[b1].status = "ACTIVE" + /\ phase' = 9 + /\ UNCHANGED <> + +(***************************************************************************) +(* Spec *) +(***************************************************************************) + +Spec == + TestInit /\ [][TestNext]_<> + +(***************************************************************************) +(* STATE INVARIANTS *) +(***************************************************************************) + +NoActiveSimultaneousInv == + \* No bucket can be ACTIVE in storages belonging to different ReplicaSets + \A b \in BucketIds : + \A rs1, rs2 \in ReplicaSets : + rs1 # rs2 => + ~(\E s1, s2 \in Storages : + storageToReplicaset[s1] = rs1 /\ + storageToReplicaset[s2] = rs2 /\ + storages[s1].status = "master" /\ + storages[s2].status = "master" /\ + storages[s1].buckets[b].status = "ACTIVE" /\ + storages[s2].buckets[b].status = "ACTIVE") +=============================================================================== From 29605ffa9dcaec9c1185938000425d75d8a246d7 Mon Sep 17 00:00:00 2001 From: Nikita Zheleztsov Date: Tue, 4 Nov 2025 19:00:21 +0300 Subject: [PATCH 2/6] tla/storage: implement sync after `SENDING`, fix #576 This commit implements first part of RFC for doubled buckets: syncing of replication after making a bucket `SENDING`, this protects us from the doubled bucket after master switch situation, and consequently, fixes the MasterDoubledTest and DoubledBucketsSmall tests, which reproduce that issue. Part of #576 NO_DOC=tla NO_TEST=tla --- proofs/tla/src/storage.tla | 54 ++++++++++++++++--- proofs/tla/test/storage/MasterDoubledTest.tla | 7 ++- 2 files changed, 51 insertions(+), 10 deletions(-) diff --git a/proofs/tla/src/storage.tla b/proofs/tla/src/storage.tla index 58c1355e..839d302a 100644 --- a/proofs/tla/src/storage.tla +++ b/proofs/tla/src/storage.tla @@ -101,6 +101,8 @@ StorageType == [ bucketRefs : [BucketIds -> [ro : Nat, rw : Nat, ro_lock : BOOLEAN, rw_lock : BOOLEAN]], gcAck : [BucketIds -> SUBSET Storages], + sendWaitTarget : [BucketIds -> [Storages -> Nat]], + sendingBuckets : SUBSET BucketIds, \* For limiting tests, it cannot be done outside of the module, since \* a test doesn't know, whether the bucket send or ref actually happened. errinj : [ @@ -154,6 +156,8 @@ StorageInit == bucketRefs |-> [b \in BucketIds |-> [ro |-> 0, rw |-> 0, ro_lock |-> FALSE, rw_lock |-> FALSE]], gcAck |-> [b \in BucketIds |-> {}], + sendWaitTarget |-> [b \in BucketIds |-> [s \in Storages |-> 0]], + sendingBuckets |-> {}, errinj |-> [ bucketSendCount |-> 0, bucketRWRefCount |-> 0, @@ -182,6 +186,8 @@ StorageState(i) == [ buckets |-> storages[i].buckets, bucketRefs |-> storages[i].bucketRefs, gcAck |-> storages[i].gcAck, + sendWaitTarget |-> storages[i].sendWaitTarget, + sendingBuckets |-> storages[i].sendingBuckets, errinj |-> storages[i].errinj ] @@ -192,7 +198,9 @@ StorageStateApply(i, state) == VarSet(i, "buckets", state.buckets, VarSet(i, "bucketRefs", state.bucketRefs, VarSet(i, "gcAck", state.gcAck, - VarSet(i, "errinj", state.errinj, storages))))))) + VarSet(i, "sendWaitTarget", state.sendWaitTarget, + VarSet(i, "sendingBuckets", state.sendingBuckets, + VarSet(i, "errinj", state.errinj, storages))))))))) /\ network' = [s \in Storages |-> [t \in Storages |-> @@ -291,6 +299,37 @@ BucketUnRef(state, bucket, mode) == (* Bucket sending *) (***************************************************************************) +AllReplicasCaughtUp(state, b) == + LET rs == ReplicasetOf(state.id) + replicas == OtherReplicasInReplicaset(state.id) + target == state.sendWaitTarget[b] + m == state.id + IN + \A r \in replicas : storages[r].vclock[m] >= target[m] + +BucketSendWaitAndSend(state, b, j) == + IF IsMaster(state) + /\ state.buckets[b].status = "SENDING" + /\ ReplicasetOf(j) = state.buckets[b].destination + /\ b \in state.transferingBuckets + /\ b \in state.sendingBuckets + /\ j # state.id + /\ AllReplicasCaughtUp(state, b) + THEN + LET msg == [type |-> "BUCKET_RECV", + content |-> [bucket |-> b, final |-> FALSE]] + IN [state EXCEPT + !.networkSend[j] = Append(@, msg), + !.sendingBuckets = @ \ {b}] + ELSE + state + +BucketDropFromSending(state, b) == + IF b \in state.sendingBuckets THEN + [state EXCEPT !.sendingBuckets = @ \ {b}] + ELSE + state + BucketSendStart(state, b, j) == IF IsMaster(state) /\ state.buckets[b].status = "ACTIVE" /\ state.bucketRefs[b].rw = 0 /\ ~state.bucketRefs[b].rw_lock /\ @@ -300,10 +339,11 @@ BucketSendStart(state, b, j) == !.transferingBuckets = @ \union {b}, !.errinj.bucketSendCount = @ + 1] IN LET state2 == BucketStatusChange( - state1, state.id, b, "SENDING", ReplicasetOf(j)) IN - LET msg == [type |-> "BUCKET_RECV", - content |-> [bucket |-> b, final |-> FALSE]] IN - [state2 EXCEPT !.networkSend[j] = Append(@, msg)] + state1, state.id, b, "SENDING", ReplicasetOf(j)) + state3 == [state2 EXCEPT + !.sendWaitTarget[b] = state2.vclock, + !.sendingBuckets = @ \union {b}] + IN BucketSendWaitAndSend(state3, b, j) ELSE state BucketRecvStart(state, j) == @@ -635,11 +675,13 @@ Next == \/ StorageStateApply(i, BucketRef(state, b, mode)) \/ StorageStateApply(i, BucketUnRef(state, b, mode)) \/ \E j \in Storages, b \in BucketIds : - StorageStateApply(i, BucketSendStart(state, b, j)) + \/ StorageStateApply(i, BucketSendStart(state, b, j)) + \/ StorageStateApply(i, BucketSendWaitAndSend(state, b, j)) \/ \E b \in BucketIds : \/ StorageStateApply(i, RecoverySendStatRequest(state, b)) \/ StorageStateApply(i, GcDropGarbage(state, b)) \/ StorageStateApply(i, GcSendTestRequest(state, b)) \/ StorageStateApply(i, BucketDropFromTransfering(state, b)) + \/ StorageStateApply(i, BucketDropFromSending(state, b)) ================================================================================ diff --git a/proofs/tla/test/storage/MasterDoubledTest.tla b/proofs/tla/test/storage/MasterDoubledTest.tla index f5811fa9..c4e257c0 100644 --- a/proofs/tla/test/storage/MasterDoubledTest.tla +++ b/proofs/tla/test/storage/MasterDoubledTest.tla @@ -38,12 +38,11 @@ TestInit == TestNext == \/ /\ phase = 1 - /\ ~( - storages["s1"].buckets[b1].status = "SENT" - /\ storages["s3"].buckets[b1].status = "ACTIVE" - ) /\ \E i \in {"s1"}, j \in {"s3"}, b \in {b1} : + \/ /\ Len(StorageState("s2").networkReceive[i]) > 0 + /\ StorageStateApply("s2", ProcessReplicationBucket(StorageState("s2"), i)) \/ StorageStateApply(i, BucketSendStart(StorageState(i), b, j)) + \/ StorageStateApply(i, BucketSendWaitAndSend(StorageState(i), b, j)) \/ /\ Len(StorageState(j).networkReceive[i]) > 0 /\ \/ StorageStateApply(j, BucketRecvStart(StorageState(j), i)) \/ StorageStateApply(j, BucketRecvFinish(StorageState(j), i)) From 8c174eec0cf68639a6bcbefc6c074b128811d2c2 Mon Sep 17 00:00:00 2001 From: Nikita Zheleztsov Date: Tue, 4 Nov 2025 20:05:55 +0300 Subject: [PATCH 3/6] tla/storage: implement prohibiting recovery and rebalance before sync This commit prohibits any recovery or rebalancer related stuff on the masters, which have not yet synced with the replicaset after becoming master. Part of #214 NO_DOC=tla NO_TEST=tla --- proofs/tla/src/storage.tla | 42 ++++++++++++++++++++++++++++++-------- 1 file changed, 34 insertions(+), 8 deletions(-) diff --git a/proofs/tla/src/storage.tla b/proofs/tla/src/storage.tla index 839d302a..2654ab33 100644 --- a/proofs/tla/src/storage.tla +++ b/proofs/tla/src/storage.tla @@ -103,6 +103,7 @@ StorageType == [ gcAck : [BucketIds -> SUBSET Storages], sendWaitTarget : [BucketIds -> [Storages -> Nat]], sendingBuckets : SUBSET BucketIds, + masterWaitTarget : [Storages -> Nat], \* For limiting tests, it cannot be done outside of the module, since \* a test doesn't know, whether the bucket send or ref actually happened. errinj : [ @@ -158,6 +159,7 @@ StorageInit == gcAck |-> [b \in BucketIds |-> {}], sendWaitTarget |-> [b \in BucketIds |-> [s \in Storages |-> 0]], sendingBuckets |-> {}, + masterWaitTarget |-> [s \in Storages |-> 0], errinj |-> [ bucketSendCount |-> 0, bucketRWRefCount |-> 0, @@ -188,6 +190,7 @@ StorageState(i) == [ gcAck |-> storages[i].gcAck, sendWaitTarget |-> storages[i].sendWaitTarget, sendingBuckets |-> storages[i].sendingBuckets, + masterWaitTarget |-> storages[i].masterWaitTarget, errinj |-> storages[i].errinj ] @@ -200,7 +203,8 @@ StorageStateApply(i, state) == VarSet(i, "gcAck", state.gcAck, VarSet(i, "sendWaitTarget", state.sendWaitTarget, VarSet(i, "sendingBuckets", state.sendingBuckets, - VarSet(i, "errinj", state.errinj, storages))))))))) + VarSet(i, "masterWaitTarget", state.masterWaitTarget, + VarSet(i, "errinj", state.errinj, storages)))))))))) /\ network' = [s \in Storages |-> [t \in Storages |-> @@ -256,8 +260,28 @@ ProcessReplicationBucket(state, j) == (* Failover master change *) (***************************************************************************) +\* True iff this node's current vclock has reached the stored target. +MasterSyncDone(state) == + \A s \in Storages : state.vclock[s] >= state.masterWaitTarget[s] + +\* Max of a non-empty set of Nats. +MaxNat(S) == CHOOSE m \in S : \A x \in S : m >= x + +\* Compute the one-shot target when i becomes master: +\* for every source s, the maximum vclock[s] seen among peers at that instant. +MasterCatchupTarget(i) == + [ s \in Storages |-> + IF OtherReplicasInReplicaset(i) = {} THEN 0 + ELSE MaxNat({storages[p].vclock[s] : p \in OtherReplicasInReplicaset(i)}) + ] + +\* Until master synchronizes with other repliacas, rebalancer and recovery +\* are not allowed. BecomeMaster(state) == - IF ~IsMaster(state) THEN [state EXCEPT !.status = "master"] ELSE state + IF ~IsMaster(state) THEN + LET target == MasterCatchupTarget(state.id) IN + [state EXCEPT !.status = "master", !.masterWaitTarget = target] + ELSE state BecomeReplica(state) == IF IsMaster(state) THEN [state EXCEPT !.status = "replica"] ELSE state @@ -315,6 +339,7 @@ BucketSendWaitAndSend(state, b, j) == /\ b \in state.sendingBuckets /\ j # state.id /\ AllReplicasCaughtUp(state, b) + /\ MasterSyncDone(state) THEN LET msg == [type |-> "BUCKET_RECV", content |-> [bucket |-> b, final |-> FALSE]] @@ -333,7 +358,7 @@ BucketDropFromSending(state, b) == BucketSendStart(state, b, j) == IF IsMaster(state) /\ state.buckets[b].status = "ACTIVE" /\ state.bucketRefs[b].rw = 0 /\ ~state.bucketRefs[b].rw_lock /\ - b \notin state.transferingBuckets /\ j # state.id + b \notin state.transferingBuckets /\ j # state.id /\ MasterSyncDone(state) THEN LET state1 == [state EXCEPT !.bucketRefs[b].rw_lock = TRUE, !.transferingBuckets = @ \union {b}, @@ -348,7 +373,7 @@ BucketSendStart(state, b, j) == BucketRecvStart(state, j) == LET msg == Head(state.networkReceive[j]) IN - IF msg.type = "BUCKET_RECV" /\ ~msg.content.final THEN + IF msg.type = "BUCKET_RECV" /\ ~msg.content.final /\ MasterSyncDone(state) THEN LET b == msg.content.bucket IN IF IsMaster(state) /\ state.buckets[b].status = NULL /\ b \notin state.transferingBuckets @@ -376,7 +401,7 @@ BucketSendFinish(state, j) == LET b == msg.content.bucket ok == msg.content.status IN - IF IsMaster(state) /\ b \in state.transferingBuckets THEN + IF IsMaster(state) /\ b \in state.transferingBuckets /\ MasterSyncDone(state) THEN LET state1 == [state EXCEPT !.networkReceive[j] = Tail(@)] IN IF ok THEN \* Receiver accepted the bucket - mark as SENT @@ -406,7 +431,7 @@ BucketRecvFinish(state, j) == IF msg.type = "BUCKET_RECV" /\ msg.content.final THEN LET b == msg.content.bucket IN IF IsMaster(state) /\ state.buckets[b].status = "RECEIVING" /\ - b \notin state.transferingBuckets + b \notin state.transferingBuckets /\ MasterSyncDone(state) THEN LET state1 == [state EXCEPT !.networkReceive[j] = Tail(@)] IN LET state2 == @@ -432,6 +457,7 @@ RecoverySendStatRequest(state, b) == /\ state.buckets[b].status \in {"SENDING", "RECEIVING"} /\ ~(b \in state.transferingBuckets) /\ dest_rs # NULL + /\ MasterSyncDone(state) THEN \* Choose any storage in the destination replicaset. LET candidates == {s \in Storages : @@ -447,7 +473,7 @@ ProcessRecoveryStatRequest(state, j) == LET msg == Head(state.networkReceive[j]) IN IF msg.type # "RECOVERY_BUCKET_STAT" THEN state ELSE - IF ~IsMaster(state) THEN + IF ~IsMaster(state) \/ ~MasterSyncDone(state) THEN \* Drop message if this node is not a master. [state EXCEPT !.networkReceive[j] = Tail(@)] ELSE @@ -466,7 +492,7 @@ ProcessRecoveryStatResponse(state, j) == LET msg == Head(state.networkReceive[j]) IN IF msg.type # "RECOVERY_BUCKET_STAT_RESPONSE" THEN state ELSE - IF ~IsMaster(state) THEN + IF ~IsMaster(state) \/ ~MasterSyncDone(state) THEN \* Drop message if this node is not a master. [state EXCEPT !.networkReceive[j] = Tail(@)] ELSE From 858afe4106b461e1fb7727327afad48dc6acfe07 Mon Sep 17 00:00:00 2001 From: Nikita Zheleztsov Date: Tue, 4 Nov 2025 20:23:17 +0300 Subject: [PATCH 4/6] tla/storage: implement bucket generations This commit prepares vshard bucket rebalancing algorithm to be resistant for the #214 issue. It introduces the persistent bucket generation, which is incremented on every `SENDING` status. Part of #214 NO_DOC=tla NO_TEST=tla --- proofs/tla/src/storage.tla | 56 +++++++++++++++++++++++--------------- 1 file changed, 34 insertions(+), 22 deletions(-) diff --git a/proofs/tla/src/storage.tla b/proofs/tla/src/storage.tla index 2654ab33..868d743e 100644 --- a/proofs/tla/src/storage.tla +++ b/proofs/tla/src/storage.tla @@ -57,7 +57,7 @@ MessageType == { } BucketRecvContent == - [bucket: BucketIds, final: BOOLEAN] + [bucket: BucketIds, final: BOOLEAN, generation : Nat] BucketRecvResponseContent == [bucket: BucketIds, status: BOOLEAN] RecoveryBucketStatContent == @@ -70,7 +70,7 @@ BucketTestGcResponseContent == [bucket: BucketIds, can_gc: BOOLEAN] ReplicationBucketContent == [bucket: BucketIds, status: BucketState, - destination: ReplicaSets \union {NULL}] + destination: ReplicaSets \union {NULL}, generation: Nat] MessageContent == BucketRecvContent \union @@ -97,7 +97,9 @@ StorageType == [ transferingBuckets : SUBSET BucketIds, buckets : [BucketIds -> - [status : BucketState, destination: ReplicaSets \union {NULL}]], + [status : BucketState, + destination : ReplicaSets \union {NULL}, + generation : Nat]], bucketRefs : [BucketIds -> [ro : Nat, rw : Nat, ro_lock : BOOLEAN, rw_lock : BOOLEAN]], gcAck : [BucketIds -> SUBSET Storages], @@ -152,8 +154,8 @@ StorageInit == LET rs_for_s == CHOOSE rs \in ReplicaSets : i \in StorageAssignments[rs] IN IF b \in BucketAssignments[rs_for_s] - THEN [status |-> "ACTIVE", destination |-> NULL] - ELSE [status |-> NULL, destination |-> NULL]], + THEN [status |-> "ACTIVE", destination |-> NULL, generation |-> 1] + ELSE [status |-> NULL, destination |-> NULL, generation |-> 0]], bucketRefs |-> [b \in BucketIds |-> [ro |-> 0, rw |-> 0, ro_lock |-> FALSE, rw_lock |-> FALSE]], gcAck |-> [b \in BucketIds |-> {}], @@ -225,13 +227,14 @@ StorageStateApply(i, state) == (* Replication *) (***************************************************************************) -BucketStatusChange(state, from, bucket, status, destination) == +BucketStatusChange(state, from, bucket, status, destination, generation) == LET ref_before == state.bucketRefs[bucket] ref_after == [ref_before EXCEPT !.ro_lock = ~(status \in ReadableStates), !.rw_lock = ~(status \in WritableStates)] state1 == [state EXCEPT - !.buckets[bucket] = [status |-> status, destination |-> destination], + !.buckets[bucket] = [status |-> status, destination |-> destination, + generation |-> generation], !.bucketRefs[bucket] = ref_after, !.vclock[from] = @ + 1] a == Assert(state.status = "master", "Bucket change on non-master") IN @@ -240,7 +243,7 @@ BucketStatusChange(state, from, bucket, status, destination) == \* replicate to all other nodes in replicaset LET replication_msg == [type |-> "REPLICATION_BUCKET", content |-> [bucket |-> bucket, status |-> status, - destination |-> destination]] IN + destination |-> destination, generation |-> generation]] IN [state1 EXCEPT !.networkSend = [ j \in Storages |-> IF j \in OtherReplicasInReplicaset(state.id) THEN Append(state1.networkSend[j], replication_msg) @@ -252,8 +255,8 @@ ProcessReplicationBucket(state, j) == LET msg == Head(state.networkReceive[j]) IN IF msg.type = "REPLICATION_BUCKET" THEN LET stateNew == BucketStatusChange(state, j, msg.content.bucket, - msg.content.status, msg.content.destination) IN - [stateNew EXCEPT !.networkReceive[j] = Tail(@)] + msg.content.status, msg.content.destination, msg.content.generation) + IN [stateNew EXCEPT !.networkReceive[j] = Tail(@)] ELSE state (***************************************************************************) @@ -342,7 +345,8 @@ BucketSendWaitAndSend(state, b, j) == /\ MasterSyncDone(state) THEN LET msg == [type |-> "BUCKET_RECV", - content |-> [bucket |-> b, final |-> FALSE]] + content |-> [bucket |-> b, final |-> FALSE, + generation |-> state.buckets[b].generation]] IN [state EXCEPT !.networkSend[j] = Append(@, msg), !.sendingBuckets = @ \ {b}] @@ -364,7 +368,8 @@ BucketSendStart(state, b, j) == !.transferingBuckets = @ \union {b}, !.errinj.bucketSendCount = @ + 1] IN LET state2 == BucketStatusChange( - state1, state.id, b, "SENDING", ReplicasetOf(j)) + state1, state.id, b, "SENDING", ReplicasetOf(j), + state1.buckets[b].generation + 1) state3 == [state2 EXCEPT !.sendWaitTarget[b] = state2.vclock, !.sendingBuckets = @ \union {b}] @@ -382,7 +387,8 @@ BucketRecvStart(state, j) == LET state1 == [state EXCEPT !.networkReceive[j] = Tail(@)] IN LET state2 == BucketStatusChange( - state1, state.id, b, "RECEIVING", ReplicasetOf(j)) IN + state1, state.id, b, "RECEIVING", ReplicasetOf(j), + msg.content.generation) IN LET response == [type |-> "BUCKET_RECV_RESPONSE", content |-> [bucket |-> b, status |-> TRUE]] IN [state2 EXCEPT !.networkSend[j] = Append(@, response)] @@ -406,11 +412,12 @@ BucketSendFinish(state, j) == IF ok THEN \* Receiver accepted the bucket - mark as SENT \* and send final message. - LET state2 == BucketStatusChange( - state1, state.id, b, "SENT", ReplicasetOf(j)) IN + LET state2 == BucketStatusChange(state1, state.id, b, "SENT", + ReplicasetOf(j), state1.buckets[b].generation) IN LET final_msg == [ type |-> "BUCKET_RECV", - content |-> [bucket |-> b, final |-> TRUE] ] IN + content |-> [bucket |-> b, final |-> TRUE, + generation |-> state.buckets[b].generation]] IN [ state2 EXCEPT !.transferingBuckets = @ \ {b}, !.networkSend[j] = Append(@, final_msg) @@ -435,7 +442,8 @@ BucketRecvFinish(state, j) == THEN LET state1 == [state EXCEPT !.networkReceive[j] = Tail(@)] IN LET state2 == - BucketStatusChange(state1, state.id, b, "ACTIVE", NULL) IN + BucketStatusChange(state1, state.id, b, "ACTIVE", NULL, + state1.buckets[b].generation) IN [state2 EXCEPT !.bucketRefs[b].rw_lock = FALSE] ELSE [state EXCEPT !.networkReceive[j] = Tail(@)] \* Drop if not master ELSE state \* Leave non-BUCKET_RECV messages in queue @@ -506,16 +514,19 @@ ProcessRecoveryStatResponse(state, j) == \* Recovery policy: sender adjusts state after getting peer's status. ELSE IF localStatus = "SENDING" /\ remoteStatus \in {"ACTIVE"} THEN LET state1 == [state EXCEPT !.networkReceive[j] = Tail(@)] IN - BucketStatusChange(state1, state.id, b, "SENT", state.buckets[b].destination) + BucketStatusChange(state1, state.id, b, "SENT", + state.buckets[b].destination, state1.buckets[b].generation) ELSE IF localStatus = "RECEIVING" /\ (remoteStatus \in WritableStates \/ (remoteStatus = "SENDING" /\ ~remoteTransf)) THEN LET state1 == [state EXCEPT !.networkReceive[j] = Tail(@)] IN - BucketStatusChange(state1, state.id, b, "GARBAGE", NULL) + BucketStatusChange(state1, state.id, b, "GARBAGE", NULL, + state1.buckets[b].generation) ELSE IF (b \notin state.transferingBuckets) /\ (remoteStatus \in {"SENT", "GARBAGE"} \/ remoteStatus = NULL) THEN LET state1 == [state EXCEPT !.networkReceive[j] = Tail(@)] IN - BucketStatusChange(state1, state.id, b, "ACTIVE", NULL) + BucketStatusChange(state1, state.id, b, "ACTIVE", NULL, + state1.buckets[b].generation) ELSE [state EXCEPT !.networkReceive[j] = Tail(@)] @@ -543,7 +554,8 @@ TryBucketGarbage(state, b) == THEN \* Reset acks and mark bucket as GARBAGE LET state1 == [state EXCEPT !.gcAck[b] = {}] IN - BucketStatusChange(state1, state.id, b, "GARBAGE", NULL) + BucketStatusChange(state1, state.id, b, "GARBAGE", NULL, + state1.buckets[b].generation) ELSE state @@ -621,7 +633,7 @@ ProcessGcTestResponse(state, j) == GcDropGarbage(state, b) == IF IsMaster(state) /\ state.buckets[b].status = "GARBAGE" THEN - BucketStatusChange(state, state.id, b, NULL, NULL) + BucketStatusChange(state, state.id, b, NULL, NULL, 0) ELSE state (***************************************************************************) From 99c6c51db27a2ed322bdf4d0d4b04173f896120d Mon Sep 17 00:00:00 2001 From: Nikita Zheleztsov Date: Wed, 5 Nov 2025 00:04:52 +0300 Subject: [PATCH 5/6] tla/storage: make recovery use bucket generation This commit should resolve the #214 issue. Works as follows: Recovery uses that generation in order to distinguish, which bucket is more recent, if it cannot find a bucket on the sender node. So, firstly, the node goes to the sender, if there's a bucket with any state and greater generation, local one is `GARBAGE`, we don't care about the status here. If bucket generation is equal to the local one, we use the old logic, if the bucket is missing from remote node, then fullscan all masters of the cluster. When all of the nodes replied, if there exists higher generation, the local is `GARBAGE`, `ACTIVE` otherwise. Part of #214 NO_DOC=tla NO_TEST=tla --- proofs/tla/src/storage.tla | 137 +++++++++++++++--- .../tla/test/storage/DoubledBucketsTest.tla | 6 +- .../tla/test/storage/StrayTCPDoubledTest.cfg | 2 +- .../tla/test/storage/StrayTCPDoubledTest.tla | 35 +++-- 4 files changed, 144 insertions(+), 36 deletions(-) diff --git a/proofs/tla/src/storage.tla b/proofs/tla/src/storage.tla index 868d743e..6be74151 100644 --- a/proofs/tla/src/storage.tla +++ b/proofs/tla/src/storage.tla @@ -49,6 +49,8 @@ MessageType == { \* recovery_step_by_step(). "RECOVERY_BUCKET_STAT", "RECOVERY_BUCKET_STAT_RESPONSE", + "RECOVERY_BUCKET_FULLSCAN", + "RECOVERY_BUCKET_FULLSCAN_RESPONSE", \* gc_bucket_process_sent_one_batch_xc(). "BUCKET_TEST_GC", "BUCKET_TEST_GC_RESPONSE", @@ -63,7 +65,11 @@ BucketRecvResponseContent == RecoveryBucketStatContent == [bucket : BucketIds] RecoveryBucketStatResponseContent == - [bucket: BucketIds, status: BucketState, transferring: BOOLEAN] + [bucket: BucketIds, status: BucketState, transferring: BOOLEAN, generation: Nat] +RecoveryBucketFullscanContent == + [bucket : BucketIds] +RecoveryBucketFullscanResponseContent == + [bucket: BucketIds, status: BucketState, generation: Nat] BucketTestGcContent == [bucket : BucketIds] BucketTestGcResponseContent == @@ -77,6 +83,8 @@ MessageContent == BucketRecvResponseContent \union RecoveryBucketStatContent \union RecoveryBucketStatResponseContent \union + RecoveryBucketFullscanContent \union + RecoveryBucketFullscanResponseContent \union BucketTestGcContent \union BucketTestGcResponseContent \union ReplicationBucketContent @@ -106,6 +114,8 @@ StorageType == [ sendWaitTarget : [BucketIds -> [Storages -> Nat]], sendingBuckets : SUBSET BucketIds, masterWaitTarget : [Storages -> Nat], + recoveryAck : [BucketIds -> + [ReplicaSets -> [status : BucketState, generation : Int]]], \* For limiting tests, it cannot be done outside of the module, since \* a test doesn't know, whether the bucket send or ref actually happened. errinj : [ @@ -162,6 +172,8 @@ StorageInit == sendWaitTarget |-> [b \in BucketIds |-> [s \in Storages |-> 0]], sendingBuckets |-> {}, masterWaitTarget |-> [s \in Storages |-> 0], + recoveryAck |-> [b \in BucketIds |-> + [rs \in ReplicaSets |-> [status |-> NULL, generation |-> -1]]], errinj |-> [ bucketSendCount |-> 0, bucketRWRefCount |-> 0, @@ -193,6 +205,7 @@ StorageState(i) == [ sendWaitTarget |-> storages[i].sendWaitTarget, sendingBuckets |-> storages[i].sendingBuckets, masterWaitTarget |-> storages[i].masterWaitTarget, + recoveryAck |-> storages[i].recoveryAck, errinj |-> storages[i].errinj ] @@ -206,7 +219,8 @@ StorageStateApply(i, state) == VarSet(i, "sendWaitTarget", state.sendWaitTarget, VarSet(i, "sendingBuckets", state.sendingBuckets, VarSet(i, "masterWaitTarget", state.masterWaitTarget, - VarSet(i, "errinj", state.errinj, storages)))))))))) + VarSet(i, "recoveryAck", state.recoveryAck, + VarSet(i, "errinj", state.errinj, storages))))))))))) /\ network' = [s \in Storages |-> [t \in Storages |-> @@ -466,6 +480,7 @@ RecoverySendStatRequest(state, b) == /\ ~(b \in state.transferingBuckets) /\ dest_rs # NULL /\ MasterSyncDone(state) + /\ ~(\E rs \in ReplicaSets : state.recoveryAck[b][rs].generation # -1) THEN \* Choose any storage in the destination replicaset. LET candidates == {s \in Storages : @@ -490,12 +505,32 @@ ProcessRecoveryStatRequest(state, j) == content |-> [ bucket |-> b, status |-> state.buckets[b].status, - transferring |-> (b \in state.transferingBuckets) + transferring |-> (b \in state.transferingBuckets), + generation |-> state.buckets[b].generation ]] IN [state EXCEPT !.networkReceive[j] = Tail(@), !.networkSend[j] = Append(@, reply)] +SendRecoveryFullscanRequests(state, b) == + LET msg == [type |-> "RECOVERY_BUCKET_FULLSCAN", + content |-> [bucket |-> b]] + currentRs == ReplicasetOf(state.id) + chosen == [rs \in ReplicaSets \ {currentRs} |-> + CHOOSE s \in Storages : + storageToReplicaset[s] = rs] + localGen == state.buckets[b].generation + localStatus == state.buckets[b].status + IN [state EXCEPT + !.networkSend = + [j \in Storages |-> + IF \E rs \in ReplicaSets \ {currentRs} : j = chosen[rs] + THEN Append(state.networkSend[j], msg) + ELSE state.networkSend[j]], + !.recoveryAck = [state.recoveryAck EXCEPT ![b][currentRs] = + [status |-> localStatus, generation |-> localGen]]] + + ProcessRecoveryStatResponse(state, j) == LET msg == Head(state.networkReceive[j]) IN IF msg.type # "RECOVERY_BUCKET_STAT_RESPONSE" THEN state @@ -507,28 +542,87 @@ ProcessRecoveryStatResponse(state, j) == LET b == msg.content.bucket remoteStatus == msg.content.status remoteTransf == msg.content.transferring + remoteGen == msg.content.generation localStatus == state.buckets[b].status + localGen == state.buckets[b].generation + state1 == [state EXCEPT !.networkReceive[j] = Tail(@)] IN IF ~(localStatus \in TransferStates) THEN - [state EXCEPT !.networkReceive[j] = Tail(@)] - \* Recovery policy: sender adjusts state after getting peer's status. - ELSE IF localStatus = "SENDING" /\ remoteStatus \in {"ACTIVE"} THEN - LET state1 == [state EXCEPT !.networkReceive[j] = Tail(@)] IN - BucketStatusChange(state1, state.id, b, "SENT", - state.buckets[b].destination, state1.buckets[b].generation) - ELSE IF localStatus = "RECEIVING" /\ - (remoteStatus \in WritableStates - \/ (remoteStatus = "SENDING" /\ ~remoteTransf)) THEN - LET state1 == [state EXCEPT !.networkReceive[j] = Tail(@)] IN + state1 + ELSE IF remoteGen > localGen THEN BucketStatusChange(state1, state.id, b, "GARBAGE", NULL, state1.buckets[b].generation) - ELSE IF (b \notin state.transferingBuckets) - /\ (remoteStatus \in {"SENT", "GARBAGE"} \/ remoteStatus = NULL) THEN - LET state1 == [state EXCEPT !.networkReceive[j] = Tail(@)] IN - BucketStatusChange(state1, state.id, b, "ACTIVE", NULL, - state1.buckets[b].generation) - ELSE - [state EXCEPT !.networkReceive[j] = Tail(@)] + ELSE IF remoteGen = localGen THEN + \* Recovery policy: sender adjusts state after getting peer's status. + IF localStatus = "SENDING" /\ remoteStatus \in {"ACTIVE"} THEN + BucketStatusChange(state1, state.id, b, "SENT", + state.buckets[b].destination, state1.buckets[b].generation) + ELSE IF localStatus = "RECEIVING" /\ + (remoteStatus \in WritableStates + \/ (remoteStatus = "SENDING" /\ ~remoteTransf)) THEN + BucketStatusChange(state1, state.id, b, "GARBAGE", NULL, + state1.buckets[b].generation) + ELSE IF (b \notin state.transferingBuckets) + /\ (remoteStatus \in {"SENT", "GARBAGE"}) THEN + BucketStatusChange(state1, state.id, b, "ACTIVE", NULL, + state1.buckets[b].generation) + ELSE + state1 + ELSE IF remoteGen = 0 THEN + SendRecoveryFullscanRequests(state1, b) + ELSE LET a == Assert(FALSE, "remote gen < local gen") + \* This should never happen. + IN state1 + +ProcessRecoveryFullscanRequest(state, j) == + LET msg == Head(state.networkReceive[j]) IN + IF msg.type # "RECOVERY_BUCKET_FULLSCAN" THEN state + ELSE + IF ~IsMaster(state) \/ ~MasterSyncDone(state) THEN + [state EXCEPT !.networkReceive[j] = Tail(@)] + ELSE + LET b == msg.content.bucket + reply == [type |-> "RECOVERY_BUCKET_FULLSCAN_RESPONSE", + content |-> [ + bucket |-> b, + status |-> state.buckets[b].status, + generation |-> state.buckets[b].generation]] + IN [state EXCEPT + !.networkReceive[j] = Tail(@), + !.networkSend[j] = Append(@, reply)] + +BucketClearRecoveryAck(state, b) == + [state EXCEPT !.recoveryAck[b] = + [rs \in ReplicaSets |-> [status |-> NULL, generation |-> -1]]] + +ProcessRecoveryFullscanResponse(state, j) == + LET msg == Head(state.networkReceive[j]) IN + IF msg.type # "RECOVERY_BUCKET_FULLSCAN_RESPONSE" THEN state + ELSE + LET b == msg.content.bucket + rs == storageToReplicaset[j] + remoteStatus == msg.content.status + remoteGen == msg.content.generation + localGen == state.buckets[b].generation + localStatus == state.buckets[b].status + ack_after == [state.recoveryAck EXCEPT + ![b][rs] = [status |-> remoteStatus, generation |-> remoteGen]] + state1 == [state EXCEPT + !.networkReceive[j] = Tail(@), + !.recoveryAck = ack_after] + IN LET receivedRS == {r \in ReplicaSets : ack_after[b][r].generation # -1} + allResponded == receivedRS = ReplicaSets + IN + IF ~allResponded THEN state1 + ELSE + LET gens == [r \in ReplicaSets |-> ack_after[b][r].generation] + higherGen == \E r \in ReplicaSets : gens[r] > localGen IN + IF higherGen THEN + BucketClearRecoveryAck( + BucketStatusChange(state1, state.id, b, "GARBAGE", NULL, localGen), b) + ELSE + BucketClearRecoveryAck( + BucketStatusChange(state1, state.id, b, "ACTIVE", NULL, localGen), b) (***************************************************************************) (* Garbage Collector *) @@ -707,6 +801,8 @@ Next == \/ StorageStateApply(i, BucketRecvFinish(state, j)) \/ StorageStateApply(i, ProcessRecoveryStatRequest(state, j)) \/ StorageStateApply(i, ProcessRecoveryStatResponse(state, j)) + \/ StorageStateApply(i, ProcessRecoveryFullscanRequest(state, j)) + \/ StorageStateApply(i, ProcessRecoveryFullscanResponse(state, j)) \/ StorageStateApply(i, ProcessGcTestRequest(state, j)) \/ StorageStateApply(i, ProcessGcTestResponse(state, j)) \/ \E b \in BucketIds, mode \in {"read", "write"} : @@ -721,5 +817,6 @@ Next == \/ StorageStateApply(i, GcSendTestRequest(state, b)) \/ StorageStateApply(i, BucketDropFromTransfering(state, b)) \/ StorageStateApply(i, BucketDropFromSending(state, b)) + \/ StorageStateApply(i, BucketClearRecoveryAck(state, b)) ================================================================================ diff --git a/proofs/tla/test/storage/DoubledBucketsTest.tla b/proofs/tla/test/storage/DoubledBucketsTest.tla index 0c3050f4..f41a3d54 100644 --- a/proofs/tla/test/storage/DoubledBucketsTest.tla +++ b/proofs/tla/test/storage/DoubledBucketsTest.tla @@ -31,10 +31,10 @@ SendLimitConstraint == \* 2. Keep network bounded - avoid message explosion. NetworkBoundConstraint == /\ \A s1, s2 \in StoragesC : - Len(network[s1][s2]) =< 3 + Len(network[s1][s2]) =< 2 /\ \A s \in StoragesC : - /\ storages[s].errinj.networkReorderCount <= 1 - /\ storages[s].errinj.networkDropCount <= 1 + /\ storages[s].errinj.networkReorderCount <= 2 + /\ storages[s].errinj.networkDropCount <= 2 RefConstraint == \A s1 \in StoragesC : diff --git a/proofs/tla/test/storage/StrayTCPDoubledTest.cfg b/proofs/tla/test/storage/StrayTCPDoubledTest.cfg index 57e568e6..01a5c721 100644 --- a/proofs/tla/test/storage/StrayTCPDoubledTest.cfg +++ b/proofs/tla/test/storage/StrayTCPDoubledTest.cfg @@ -18,7 +18,7 @@ INVARIANTS StorageToReplicasetTypeInv NoActiveSimultaneousInv -CHECK_DEADLOCK TRUE +CHECK_DEADLOCK FALSE CONSTRAINTS NetworkReorderConstraint diff --git a/proofs/tla/test/storage/StrayTCPDoubledTest.tla b/proofs/tla/test/storage/StrayTCPDoubledTest.tla index 58c30a66..92a74d63 100644 --- a/proofs/tla/test/storage/StrayTCPDoubledTest.tla +++ b/proofs/tla/test/storage/StrayTCPDoubledTest.tla @@ -36,8 +36,8 @@ TestInit == NetworkReorderConstraint == \A s \in StoragesC : - /\ storages[s].errinj.networkReorderCount <= 1 - /\ storages[s].errinj.networkDropCount <= 1 + /\ storages[s].errinj.networkReorderCount <= 2 + /\ storages[s].errinj.networkDropCount <= 2 (***************************************************************************) (* Phase-driven Next *) @@ -52,18 +52,24 @@ TestNext == \/ /\ phase = 2 /\ storages["s1"].buckets[b1].status = "SENDING" - /\ StorageStateApply("s1", BucketDropFromTransfering(StorageState("s1"), b1)) + /\ StorageStateApply("s1", RecoverySendStatRequest( + BucketDropFromTransfering(StorageState("s1"), b1), b1)) /\ phase' = 3 /\ UNCHANGED <> \/ /\ phase = 3 /\ storages["s1"].buckets[b1].status = "SENDING" - /\ \E i \in {"s1"}, j \in {"s3"}, b \in {b1} : - \/ StorageStateApply(i, RecoverySendStatRequest(StorageState(i), b)) + /\ \E i \in {"s1"}, j \in {"s3"}, l \in {"s2"}, b \in {b1} : \/ /\ Len(StorageState(j).networkReceive[i]) > 0 - /\ StorageStateApply(j, ProcessRecoveryStatRequest(StorageState(j), i)) + /\ \/ StorageStateApply(j, ProcessRecoveryStatRequest(StorageState(j), i)) + \/ StorageStateApply(j, ProcessRecoveryFullscanRequest(StorageState(j), i)) + \/ /\ Len(StorageState(l).networkReceive[i]) > 0 + /\ StorageStateApply(l, ProcessRecoveryFullscanRequest(StorageState(l), i)) \/ /\ Len(StorageState(i).networkReceive[j]) > 0 - /\ StorageStateApply(i, ProcessRecoveryStatResponse(StorageState(i), j)) + /\ \/ StorageStateApply(i, ProcessRecoveryStatResponse(StorageState(i), j)) + \/ StorageStateApply(i, ProcessRecoveryFullscanResponse(StorageState(i), j)) + \/ /\ Len(StorageState(i).networkReceive[l]) > 0 + /\ StorageStateApply(i, ProcessRecoveryFullscanResponse(StorageState(i), l)) \/ ReorderOneNetworkMessage /\ UNCHANGED <> @@ -105,17 +111,22 @@ TestNext == /\ storages["s1"].buckets[b1].status = "GARBAGE" /\ storages["s2"].buckets[b1].status = "ACTIVE" /\ storages["s3"].buckets[b1].status = "RECEIVING" + /\ StorageStateApply("s3", RecoverySendStatRequest(StorageState("s3"), b1)) /\ phase' = 8 - /\ UNCHANGED <> \/ /\ phase = 8 - /\ \E i \in {"s3"}, j \in {"s1"}, b \in {b1} : + /\ \E i \in {"s3"}, j \in {"s1"}, l \in {"s2"}, b \in {b1} : \/ DropOneNetworkMessage - \/ StorageStateApply(i, RecoverySendStatRequest(StorageState(i), b)) \/ /\ Len(StorageState(j).networkReceive[i]) > 0 - /\ StorageStateApply(j, ProcessRecoveryStatRequest(StorageState(j), i)) + /\ \/ StorageStateApply(j, ProcessRecoveryStatRequest(StorageState(j), i)) + \/ StorageStateApply(j, ProcessRecoveryFullscanRequest(StorageState(j), i)) + \/ /\ Len(StorageState(l).networkReceive[i]) > 0 + /\ StorageStateApply(l, ProcessRecoveryFullscanRequest(StorageState(l), i)) \/ /\ Len(StorageState(i).networkReceive[j]) > 0 - /\ StorageStateApply(i, ProcessRecoveryStatResponse(StorageState(i), j)) + /\ \/ StorageStateApply(i, ProcessRecoveryStatResponse(StorageState(i), j)) + \/ StorageStateApply(i, ProcessRecoveryFullscanResponse(StorageState(i), j)) + \/ /\ Len(StorageState(i).networkReceive[l]) > 0 + /\ StorageStateApply(i, ProcessRecoveryFullscanResponse(StorageState(i), l)) /\ UNCHANGED <> \/ /\ phase = 8 From e7ad7ef3a415aa5b40e965f3b290e923b5ba77ab Mon Sep 17 00:00:00 2001 From: Nikita Zheleztsov Date: Wed, 5 Nov 2025 13:12:39 +0300 Subject: [PATCH 6/6] tla/storage: add big doubled test This is the test, I'm going to execute on VM for week. Let's try finding the doubled buckets cases. NO_DOC=tla NO_TEST=tla --- proofs/tla/test/storage/DoubledBigTest.cfg | 27 +++++++++ proofs/tla/test/storage/DoubledBigTest.tla | 70 ++++++++++++++++++++++ 2 files changed, 97 insertions(+) create mode 100644 proofs/tla/test/storage/DoubledBigTest.cfg create mode 100644 proofs/tla/test/storage/DoubledBigTest.tla diff --git a/proofs/tla/test/storage/DoubledBigTest.cfg b/proofs/tla/test/storage/DoubledBigTest.cfg new file mode 100644 index 00000000..08728839 --- /dev/null +++ b/proofs/tla/test/storage/DoubledBigTest.cfg @@ -0,0 +1,27 @@ +INIT Init +NEXT Next + +CONSTANTS + Storages <- StoragesC + ReplicaSets <- ReplicaSetsC + BucketIds <- BucketIdsC + StorageAssignments <- StorageAssignmentsC + BucketAssignments <- BucketAssignmentsC + MasterAssignments <- MasterAssignmentsC + b1 = b1 + b2 = b2 + b3 = b3 + b4 = b4 + +SYMMETRY Symmetry + +INVARIANTS + NetworkTypeInv + StorageTypeInv + StorageToReplicasetTypeInv + NoActiveSimultaneousInv + +CONSTRAINT + SendLimitConstraint + NetworkBoundConstraint + RefConstraint diff --git a/proofs/tla/test/storage/DoubledBigTest.tla b/proofs/tla/test/storage/DoubledBigTest.tla new file mode 100644 index 00000000..5b9727db --- /dev/null +++ b/proofs/tla/test/storage/DoubledBigTest.tla @@ -0,0 +1,70 @@ +--------------------------- MODULE DoubledBigTest ------------------------------ +EXTENDS storage, utils + +CONSTANTS b1, b2, b3, b4 + +StoragesC == {"s1", "s2", "s3", "s4"} +ReplicaSetsC == {"rs1", "rs2", "rs3"} +BucketIdsC == {b1, b2, b3, b4} +StorageAssignmentsC == [rs1 |-> {"s1", "s2"}, + rs2 |-> {"s3"}, + rs3 |-> {"s4"}] +BucketAssignmentsC == [rs1 |-> {b1}, + rs2 |-> {b2}, + rs3 |-> {b3, b4}] +MasterAssignmentsC == [rs1 |-> {"s1"}, + rs2 |-> {"s3"}, + rs3 |-> {"s4"}] + +(***************************************************************************) +(* CONSTRAINTS *) +(***************************************************************************) + +MAX_TOTAL_SENDS == 3 + +\* 1. Limit total bucket sends - prevent endless transfers. +SendLimitConstraint == + LET totalSends == + SetSum({ storages[i].errinj.bucketSendCount : i \in StoragesC }) + IN totalSends =< MAX_TOTAL_SENDS + +\* 2. Keep network bounded - avoid message explosion. +NetworkBoundConstraint == + /\ \A s1, s2 \in StoragesC : + Len(network[s1][s2]) =< 3 + /\ \A s \in StoragesC : + /\ storages[s].errinj.networkReorderCount <= 2 + /\ storages[s].errinj.networkDropCount <= 2 + +RefConstraint == + \A s1 \in StoragesC : + /\ storages[s1].errinj.bucketRWRefCount <= 2 + /\ storages[s1].errinj.bucketRORefCount <= 2 + /\ storages[s1].errinj.bucketRWUnRefCount <= 2 + /\ storages[s1].errinj.bucketROUnRefCount <= 2 + +(***************************************************************************) +(* SYMMETRY *) +(***************************************************************************) + +Symmetry == + Permutations(BucketIdsC) + +(***************************************************************************) +(* STATE INVARIANTS *) +(***************************************************************************) + +NoActiveSimultaneousInv == + \* No bucket can be ACTIVE in storages belonging to different ReplicaSets + \A b \in BucketIds : + \A rs1, rs2 \in ReplicaSets : + rs1 # rs2 => + ~(\E s1, s2 \in Storages : + storageToReplicaset[s1] = rs1 /\ + storageToReplicaset[s2] = rs2 /\ + storages[s1].status = "master" /\ + storages[s2].status = "master" /\ + storages[s1].buckets[b].status = "ACTIVE" /\ + storages[s2].buckets[b].status = "ACTIVE") + +================================================================================