@@ -8,7 +8,11 @@ import kotlinx.coroutines.internal.*
88import kotlinx.coroutines.selects.*
99import kotlinx.coroutines.sync.*
1010import org.junit.*
11+ import org.junit.Test
1112import java.util.concurrent.*
13+ import java.util.concurrent.atomic.AtomicBoolean
14+ import java.util.concurrent.atomic.AtomicInteger
15+ import kotlin.test.*
1216
1317class MutexCancellationStressTest : TestBase () {
1418 @Test
@@ -18,13 +22,16 @@ class MutexCancellationStressTest : TestBase() {
1822 val mutexOwners = Array (mutexJobNumber) { " $it " }
1923 val dispatcher = Executors .newFixedThreadPool(mutexJobNumber + 2 ).asCoroutineDispatcher()
2024 var counter = 0
21- val counterLocal = Array (mutexJobNumber) { LocalAtomicInt (0 ) }
22- val completed = LocalAtomicInt ( 0 )
25+ val counterLocal = Array (mutexJobNumber) { AtomicInteger (0 ) }
26+ val completed = AtomicBoolean ( false )
2327 val mutexJobLauncher: (jobNumber: Int ) -> Job = { jobId ->
2428 val coroutineName = " MutexJob-$jobId "
25- launch(dispatcher + CoroutineName (coroutineName)) {
26- while (completed.value == 0 ) {
29+ // ATOMIC to always have a chance to proceed
30+ launch(dispatcher + CoroutineName (coroutineName), CoroutineStart .ATOMIC ) {
31+ while (! completed.get()) {
32+ // Stress out holdsLock
2733 mutex.holdsLock(mutexOwners[(jobId + 1 ) % mutexJobNumber])
34+ // Stress out lock-like primitives
2835 if (mutex.tryLock(mutexOwners[jobId])) {
2936 counterLocal[jobId].incrementAndGet()
3037 counter++
@@ -47,30 +54,32 @@ class MutexCancellationStressTest : TestBase() {
4754 val mutexJobs = (0 until mutexJobNumber).map { mutexJobLauncher(it) }.toMutableList()
4855 val checkProgressJob = launch(dispatcher + CoroutineName (" checkProgressJob" )) {
4956 var lastCounterLocalSnapshot = (0 until mutexJobNumber).map { 0 }
50- while (completed.value == 0 ) {
51- delay(1000 )
57+ while (! completed.get()) {
58+ delay(500 )
59+ // If we've caught the completion after delay, then there is a chance no progress were made whatsoever, bail out
60+ if (completed.get()) return @launch
5261 val c = counterLocal.map { it.value }
5362 for (i in 0 until mutexJobNumber) {
54- assert (c[i] > lastCounterLocalSnapshot[i]) { " No progress in MutexJob-$i " }
63+ assert (c[i] > lastCounterLocalSnapshot[i]) { " No progress in MutexJob-$i , last observed state: ${c[i]} " }
5564 }
5665 lastCounterLocalSnapshot = c
5766 }
5867 }
5968 val cancellationJob = launch(dispatcher + CoroutineName (" cancellationJob" )) {
6069 var cancellingJobId = 0
61- while (completed.value == 0 ) {
70+ while (! completed.get() ) {
6271 val jobToCancel = mutexJobs.removeFirst()
6372 jobToCancel.cancelAndJoin()
6473 mutexJobs + = mutexJobLauncher(cancellingJobId)
6574 cancellingJobId = (cancellingJobId + 1 ) % mutexJobNumber
6675 }
6776 }
6877 delay(2000L * stressTestMultiplier)
69- completed.value = 1
78+ completed.set( true )
7079 cancellationJob.join()
7180 mutexJobs.forEach { it.join() }
7281 checkProgressJob.join()
73- check (counter == counterLocal.sumOf { it.value })
82+ assertEquals (counter, counterLocal.sumOf { it.value })
7483 dispatcher.close()
7584 }
7685}
0 commit comments