@@ -6,12 +6,25 @@ package kotlinx.coroutines.internal
66
77import kotlinx.atomicfu.*
88import kotlinx.coroutines.*
9+ import org.junit.runner.*
10+ import org.junit.runners.*
911import java.util.concurrent.*
1012import kotlin.concurrent.*
1113import kotlin.test.*
1214
1315// Tests many short queues to stress copy/resize
14- class LockFreeTaskQueueStressTest : TestBase () {
16+ @RunWith(Parameterized ::class )
17+ class LockFreeTaskQueueStressTest (
18+ private val nConsumers : Int
19+ ) : TestBase() {
20+ companion object {
21+ @Parameterized.Parameters (name = " nConsumers={0}" )
22+ @JvmStatic
23+ fun params (): Collection <Int > = listOf (1 , 3 )
24+ }
25+
26+ private val singleConsumer = nConsumers == 1
27+
1528 private val nSeconds = 3 * stressTestMultiplier
1629 private val nProducers = 4
1730 private val batchSize = 100
@@ -25,7 +38,7 @@ class LockFreeTaskQueueStressTest : TestBase() {
2538 private val done = atomic(0 )
2639 private val doneProducers = atomic(0 )
2740
28- private val barrier = CyclicBarrier (nProducers + 2 )
41+ private val barrier = CyclicBarrier (nProducers + nConsumers + 1 )
2942
3043 private class Item (val producer : Int , val index : Long )
3144
@@ -34,7 +47,7 @@ class LockFreeTaskQueueStressTest : TestBase() {
3447 val threads = mutableListOf<Thread >()
3548 threads + = thread(name = " Pacer" , start = false ) {
3649 while (done.value == 0 ) {
37- queue.value = LockFreeTaskQueue (false )
50+ queue.value = LockFreeTaskQueue (singleConsumer )
3851 batch.value = 0
3952 doneProducers.value = 0
4053 barrier.await() // start consumers & producers
@@ -44,25 +57,30 @@ class LockFreeTaskQueueStressTest : TestBase() {
4457 println (" Pacer done" )
4558 barrier.await() // wakeup the rest
4659 }
47- threads + = thread(name = " Consumer" , start = false ) {
48- while (true ) {
49- barrier.await()
50- val queue = queue.value ? : break
60+ threads + = List (nConsumers) { consumer ->
61+ thread(name = " Consumer-$consumer " , start = false ) {
5162 while (true ) {
52- val item = queue.removeFirstOrNull()
53- if (item == null ) {
54- if (doneProducers.value == nProducers && queue.isEmpty) break // that's it
55- continue // spin to retry
63+ barrier.await()
64+ val queue = queue.value ? : break
65+ while (true ) {
66+ val item = queue.removeFirstOrNull()
67+ if (item == null ) {
68+ if (doneProducers.value == nProducers && queue.isEmpty) break // that's it
69+ continue // spin to retry
70+ }
71+ consumed.incrementAndGet()
72+ if (singleConsumer) {
73+ // This check only properly works in single-consumer case
74+ val eItem = expected[item.producer]++
75+ if (eItem != item.index) error(" Expected $eItem but got ${item.index} from Producer-${item.producer} " )
76+ }
5677 }
57- consumed.incrementAndGet()
58- val eItem = expected[item.producer]++
59- if (eItem != item.index) error(" Expected $eItem but got ${item.index} from Producer-${item.producer} " )
78+ barrier.await()
6079 }
61- barrier.await( )
80+ println ( " Consumer- $consumer done " )
6281 }
63- println (" Consumer done" )
6482 }
65- val producers = List (nProducers) { producer ->
83+ threads + = List (nProducers) { producer ->
6684 thread(name = " Producer-$producer " , start = false ) {
6785 var index = 0L
6886 while (true ) {
@@ -79,7 +97,6 @@ class LockFreeTaskQueueStressTest : TestBase() {
7997 println (" Producer-$producer done" )
8098 }
8199 }
82- threads + = producers
83100 threads.forEach {
84101 it.setUncaughtExceptionHandler { t, e ->
85102 System .err.println (" Thread $t failed: $e " )
0 commit comments