@@ -7,7 +7,6 @@ package kotlinx.coroutines.internal
77import kotlinx.atomicfu.*
88import kotlinx.coroutines.*
99import kotlin.coroutines.*
10- import kotlin.jvm.*
1110
1211/* *
1312 * The result of .limitedParallelism(x) call, a dispatcher
@@ -27,7 +26,7 @@ import kotlin.jvm.*
2726internal class LimitedDispatcher (
2827 private val dispatcher : CoroutineDispatcher ,
2928 private val parallelism : Int
30- ) : CoroutineDispatcher(), Runnable, Delay by (dispatcher as ? Delay ? : DefaultDelay ) {
29+ ) : CoroutineDispatcher(), Delay by (dispatcher as ? Delay ? : DefaultDelay ) {
3130
3231 // Atomic is necessary here for the sake of K/N memory ordering,
3332 // there is no need in atomic operations for this property
@@ -45,61 +44,37 @@ internal class LimitedDispatcher(
4544 return super .limitedParallelism(parallelism)
4645 }
4746
48- override fun run () {
49- var fairnessCounter = 0
50- while (true ) {
51- val task = queue.removeFirstOrNull()
52- if (task != null ) {
53- try {
54- task.run ()
55- } catch (e: Throwable ) {
56- handleCoroutineException(EmptyCoroutineContext , e)
57- }
58- // 16 is our out-of-thin-air constant to emulate fairness. Used in JS dispatchers as well
59- if (++ fairnessCounter >= 16 && dispatcher.isDispatchNeeded(this )) {
60- // Do "yield" to let other views to execute their runnable as well
61- // Note that we do not decrement 'runningWorkers' as we still committed to do our part of work
62- dispatcher.dispatch(this , this )
63- return
64- }
65- continue
66- }
67-
68- synchronized(workerAllocationLock) {
69- runningWorkers.decrementAndGet()
70- if (queue.size == 0 ) return
71- runningWorkers.incrementAndGet()
72- fairnessCounter = 0
73- }
74- }
75- }
76-
7747 override fun dispatch (context : CoroutineContext , block : Runnable ) {
78- dispatchInternal(block) {
79- dispatcher.dispatch(this , this )
48+ dispatchInternal(block) { worker ->
49+ dispatcher.dispatch(this , worker )
8050 }
8151 }
8252
8353 @InternalCoroutinesApi
8454 override fun dispatchYield (context : CoroutineContext , block : Runnable ) {
85- dispatchInternal(block) {
86- dispatcher.dispatchYield(this , this )
55+ dispatchInternal(block) { worker ->
56+ dispatcher.dispatchYield(this , worker )
8757 }
8858 }
8959
90- private inline fun dispatchInternal (block : Runnable , dispatch : () -> Unit ) {
60+ /* *
61+ * Tries to dispatch the given [block].
62+ * If there are not enough workers, it starts a new one via [startWorker].
63+ */
64+ private inline fun dispatchInternal (block : Runnable , startWorker : (Worker ) -> Unit ) {
9165 // Add task to queue so running workers will be able to see that
92- if (addAndTryDispatching(block)) return
93- /*
94- * Protect against the race when the number of workers is enough,
95- * but one (because of synchronized serialization) attempts to complete,
96- * and we just observed the number of running workers smaller than the actual
97- * number (hit right between `--runningWorkers` and `++runningWorkers` in `run()`)
98- */
66+ queue.addLast(block)
67+ if (runningWorkers.value >= parallelism) return
68+ // allocation may fail if some workers were launched in parallel or a worker temporarily decreased
69+ // `runningWorkers` when they observed an empty queue.
9970 if (! tryAllocateWorker()) return
100- dispatch()
71+ val task = obtainTaskOrDeallocateWorker() ? : return
72+ startWorker(Worker (task))
10173 }
10274
75+ /* *
76+ * Tries to obtain the permit to start a new worker.
77+ */
10378 private fun tryAllocateWorker (): Boolean {
10479 synchronized(workerAllocationLock) {
10580 if (runningWorkers.value >= parallelism) return false
@@ -108,9 +83,49 @@ internal class LimitedDispatcher(
10883 }
10984 }
11085
111- private fun addAndTryDispatching (block : Runnable ): Boolean {
112- queue.addLast(block)
113- return runningWorkers.value >= parallelism
86+ /* *
87+ * Obtains the next task from the queue, or logically deallocates the worker if the queue is empty.
88+ */
89+ private fun obtainTaskOrDeallocateWorker (): Runnable ? {
90+ while (true ) {
91+ when (val nextTask = queue.removeFirstOrNull()) {
92+ null -> synchronized(workerAllocationLock) {
93+ runningWorkers.decrementAndGet()
94+ if (queue.size == 0 ) return null
95+ runningWorkers.incrementAndGet()
96+ }
97+ else -> return nextTask
98+ }
99+ }
100+ }
101+
102+ /* *
103+ * A worker that polls the queue and runs tasks until there are no more of them.
104+ *
105+ * It always stores the next task to run. This is done in order to prevent the possibility of the fairness
106+ * re-dispatch happening when there are no more tasks in the queue. This is important because, after all the
107+ * actual tasks are done, nothing prevents the user from closing the dispatcher and making it incorrect to
108+ * perform any more dispatches.
109+ */
110+ private inner class Worker (private var currentTask : Runnable ) : Runnable {
111+ override fun run () {
112+ var fairnessCounter = 0
113+ while (true ) {
114+ try {
115+ currentTask.run ()
116+ } catch (e: Throwable ) {
117+ handleCoroutineException(EmptyCoroutineContext , e)
118+ }
119+ currentTask = obtainTaskOrDeallocateWorker() ? : return
120+ // 16 is our out-of-thin-air constant to emulate fairness. Used in JS dispatchers as well
121+ if (++ fairnessCounter >= 16 && dispatcher.isDispatchNeeded(this @LimitedDispatcher)) {
122+ // Do "yield" to let other views execute their runnable as well
123+ // Note that we do not decrement 'runningWorkers' as we are still committed to our part of work
124+ dispatcher.dispatch(this @LimitedDispatcher, this )
125+ return
126+ }
127+ }
128+ }
114129 }
115130}
116131
0 commit comments